""" mod_wireguard.py -- WireGuard server configuration and interface management. Generates /etc/wireguard/.conf from config.json peers, manages server key generation, and brings WireGuard interfaces up or syncs peer changes. """ import re import subprocess from pathlib import Path import mod_shared as shared import mod_validation as validation WG_DIR = Path("/etc/wireguard") WG_KEEPALIVE = 25 # =================================================================== # Interface state # =================================================================== def wg_interface_up(iface): """Return True if the WireGuard interface exists and is up.""" result = subprocess.run(["ip", "link", "show", iface], capture_output=True, text=True) return result.returncode == 0 # =================================================================== # Key and path helpers # =================================================================== def wg_server_key_path(iface): return WG_DIR / f"{iface}.key" def wg_server_pubkey_path(iface): """Public key written to the configs dir so the Flask app can read it.""" return shared.SCRIPT_DIR / f".{iface}.pub" def wg_conf_path_for(iface): return WG_DIR / f"{iface}.conf" def generate_wg_server_key(iface): WG_DIR.mkdir(exist_ok=True) result = subprocess.run(["wg", "genkey"], capture_output=True, text=True, check=True) private = result.stdout.strip() kf = wg_server_key_path(iface) kf.write_text(private + "\n") kf.chmod(0o600) return private # =================================================================== # Config generation # =================================================================== def build_wg_server_conf(vlan, server_private_key, iface): """Build the /etc/wireguard/.conf content from config.json peers.""" info = vlan["vpn_information"] gateway = shared.resolve_vlan_options(vlan)["gateway"] network = shared.network_for(vlan) server_ip = f"{gateway}/{network.prefixlen}" listen_port = info["listen_port"] L = [ "# Generated by core.py -- do not edit manually.", "# Run: sudo python3 core.py --apply", "", "[Interface]", f"PrivateKey = {server_private_key}", f"Address = {server_ip}", f"ListenPort = {listen_port}", "", ] for peer in vlan.get("peers", []): if not peer.get("enabled", True): L += [f"# DISABLED: {peer['name']}", ""] continue L += [ f"# {peer['name']}", "[Peer]", f"PublicKey = {peer['public_key']}", f"AllowedIPs = {peer['ip']}/32", f"PersistentKeepalive = {WG_KEEPALIVE}", "", ] return "\n".join(L) # =================================================================== # Apply # =================================================================== def ensure_wg_interfaces(data): """Generate WireGuard server confs and bring up / sync all WG interfaces.""" wg_vlans = [v for v in data.get("vlans", []) if validation.is_wg(v)] if not wg_vlans: return for vlan in wg_vlans: iface = validation.derive_interface(vlan, data) print(f" [{iface}]") kf = wg_server_key_path(iface) if not kf.exists(): print(f" Generating server private key...") private = generate_wg_server_key(iface) else: private = kf.read_text().strip() pub_result = subprocess.run( ["wg", "pubkey"], input=private, capture_output=True, text=True, check=True ) public = pub_result.stdout.strip() pubkey_file = wg_server_pubkey_path(iface) pubkey_file.write_text(public + "\n") shared.chown_to_script_dir_owner(pubkey_file) print(f" Server public key: {public[:20]}...") WG_DIR.mkdir(exist_ok=True) conf_file = wg_conf_path_for(iface) new_conf = build_wg_server_conf(vlan, private, iface) listen_port = vlan["vpn_information"]["listen_port"] port_changed = False if conf_file.exists(): m = re.search(r'ListenPort\s*=\s*(\d+)', conf_file.read_text()) if m and int(m.group(1)) != listen_port: port_changed = True conf_file.write_text(new_conf) conf_file.chmod(0o600) peer_count = len([p for p in vlan.get("peers", []) if p.get("enabled", True)]) print(f" Wrote {conf_file} ({peer_count} enabled peer(s))") if not wg_interface_up(iface): print(f" Bringing up {iface}...") r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True) if r.returncode != 0: print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}") else: print(f" {iface} is up.") elif port_changed: print(f" Listen port changed - restarting {iface}...") subprocess.run(["wg-quick", "down", iface], capture_output=True, text=True) r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True) if r.returncode != 0: print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}") else: print(f" {iface} restarted.") else: print(f" Syncing peers to live {iface}...") subprocess.run(["wg", "syncconf", iface, str(conf_file)], capture_output=True, text=True)