157 lines
5.5 KiB
Python
157 lines
5.5 KiB
Python
"""
|
|
mod_wireguard.py -- WireGuard server configuration and interface management.
|
|
|
|
Generates /etc/wireguard/<iface>.conf from config.json peers, manages server
|
|
key generation, and brings WireGuard interfaces up or syncs peer changes.
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import mod_shared as shared
|
|
import mod_validation as validation
|
|
|
|
WG_DIR = Path("/etc/wireguard")
|
|
WG_KEEPALIVE = 25
|
|
|
|
|
|
# ===================================================================
|
|
# Interface state
|
|
# ===================================================================
|
|
|
|
def wg_interface_up(iface):
|
|
"""Return True if the WireGuard interface exists and is up."""
|
|
result = subprocess.run(["ip", "link", "show", iface],
|
|
capture_output=True, text=True)
|
|
return result.returncode == 0
|
|
|
|
|
|
# ===================================================================
|
|
# Key and path helpers
|
|
# ===================================================================
|
|
|
|
def wg_server_key_path(iface):
|
|
return WG_DIR / f"{iface}.key"
|
|
|
|
def wg_server_pubkey_path(iface):
|
|
"""Public key written to the configs dir so the Flask app can read it."""
|
|
return shared.SCRIPT_DIR / f".{iface}.pub"
|
|
|
|
def wg_conf_path_for(iface):
|
|
return WG_DIR / f"{iface}.conf"
|
|
|
|
def generate_wg_server_key(iface):
|
|
WG_DIR.mkdir(exist_ok=True)
|
|
result = subprocess.run(["wg", "genkey"], capture_output=True, text=True, check=True)
|
|
private = result.stdout.strip()
|
|
kf = wg_server_key_path(iface)
|
|
kf.write_text(private + "\n")
|
|
kf.chmod(0o600)
|
|
return private
|
|
|
|
|
|
# ===================================================================
|
|
# Config generation
|
|
# ===================================================================
|
|
|
|
def build_wg_server_conf(vlan, server_private_key, iface):
|
|
"""Build the /etc/wireguard/<iface>.conf content from config.json peers."""
|
|
info = vlan["vpn_information"]
|
|
gateway = shared.resolve_vlan_options(vlan)["gateway"]
|
|
network = shared.network_for(vlan)
|
|
server_ip = f"{gateway}/{network.prefixlen}"
|
|
listen_port = info["listen_port"]
|
|
|
|
L = [
|
|
"# Generated by core.py -- do not edit manually.",
|
|
"# Run: sudo python3 core.py --apply",
|
|
"",
|
|
"[Interface]",
|
|
f"PrivateKey = {server_private_key}",
|
|
f"Address = {server_ip}",
|
|
f"ListenPort = {listen_port}",
|
|
"",
|
|
]
|
|
|
|
for peer in vlan.get("peers", []):
|
|
if not peer.get("enabled", True):
|
|
L += [f"# DISABLED: {peer['name']}", ""]
|
|
continue
|
|
L += [
|
|
f"# {peer['name']}",
|
|
"[Peer]",
|
|
f"PublicKey = {peer['public_key']}",
|
|
f"AllowedIPs = {peer['ip']}/32",
|
|
f"PersistentKeepalive = {WG_KEEPALIVE}",
|
|
"",
|
|
]
|
|
|
|
return "\n".join(L)
|
|
|
|
|
|
# ===================================================================
|
|
# Apply
|
|
# ===================================================================
|
|
|
|
def ensure_wg_interfaces(data):
|
|
"""Generate WireGuard server confs and bring up / sync all WG interfaces."""
|
|
wg_vlans = [v for v in data.get("vlans", []) if validation.is_wg(v)]
|
|
if not wg_vlans:
|
|
return
|
|
|
|
for vlan in wg_vlans:
|
|
iface = validation.derive_interface(vlan, data)
|
|
print(f" [{iface}]")
|
|
|
|
kf = wg_server_key_path(iface)
|
|
if not kf.exists():
|
|
print(f" Generating server private key...")
|
|
private = generate_wg_server_key(iface)
|
|
else:
|
|
private = kf.read_text().strip()
|
|
|
|
pub_result = subprocess.run(
|
|
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
|
|
)
|
|
public = pub_result.stdout.strip()
|
|
pubkey_file = wg_server_pubkey_path(iface)
|
|
pubkey_file.write_text(public + "\n")
|
|
shared.chown_to_script_dir_owner(pubkey_file)
|
|
print(f" Server public key: {public[:20]}...")
|
|
|
|
WG_DIR.mkdir(exist_ok=True)
|
|
conf_file = wg_conf_path_for(iface)
|
|
new_conf = build_wg_server_conf(vlan, private, iface)
|
|
listen_port = vlan["vpn_information"]["listen_port"]
|
|
|
|
port_changed = False
|
|
if conf_file.exists():
|
|
m = re.search(r'ListenPort\s*=\s*(\d+)', conf_file.read_text())
|
|
if m and int(m.group(1)) != listen_port:
|
|
port_changed = True
|
|
|
|
conf_file.write_text(new_conf)
|
|
conf_file.chmod(0o600)
|
|
|
|
peer_count = len([p for p in vlan.get("peers", []) if p.get("enabled", True)])
|
|
print(f" Wrote {conf_file} ({peer_count} enabled peer(s))")
|
|
|
|
if not wg_interface_up(iface):
|
|
print(f" Bringing up {iface}...")
|
|
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
|
|
else:
|
|
print(f" {iface} is up.")
|
|
elif port_changed:
|
|
print(f" Listen port changed - restarting {iface}...")
|
|
subprocess.run(["wg-quick", "down", iface], capture_output=True, text=True)
|
|
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
|
|
else:
|
|
print(f" {iface} restarted.")
|
|
else:
|
|
print(f" Syncing peers to live {iface}...")
|
|
subprocess.run(["wg", "syncconf", iface, str(conf_file)], capture_output=True, text=True)
|