linuxrouter/routlin/mod_wireguard.py
2026-06-05 01:48:27 -04:00

157 lines
5.5 KiB
Python

"""
mod_wireguard.py -- WireGuard server configuration and interface management.
Generates /etc/wireguard/<iface>.conf from config.json peers, manages server
key generation, and brings WireGuard interfaces up or syncs peer changes.
"""
import re
import subprocess
from pathlib import Path
import mod_shared as shared
import mod_validation as validation
WG_DIR = Path("/etc/wireguard")
WG_KEEPALIVE = 25
# ===================================================================
# Interface state
# ===================================================================
def wg_interface_up(iface):
"""Return True if the WireGuard interface exists and is up."""
result = subprocess.run(["ip", "link", "show", iface],
capture_output=True, text=True)
return result.returncode == 0
# ===================================================================
# Key and path helpers
# ===================================================================
def wg_server_key_path(iface):
return WG_DIR / f"{iface}.key"
def wg_server_pubkey_path(iface):
"""Public key written to the configs dir so the Flask app can read it."""
return shared.SCRIPT_DIR / f".{iface}.pub"
def wg_conf_path_for(iface):
return WG_DIR / f"{iface}.conf"
def generate_wg_server_key(iface):
WG_DIR.mkdir(exist_ok=True)
result = subprocess.run(["wg", "genkey"], capture_output=True, text=True, check=True)
private = result.stdout.strip()
kf = wg_server_key_path(iface)
kf.write_text(private + "\n")
kf.chmod(0o600)
return private
# ===================================================================
# Config generation
# ===================================================================
def build_wg_server_conf(vlan, server_private_key, iface):
"""Build the /etc/wireguard/<iface>.conf content from config.json peers."""
info = vlan["vpn_information"]
gateway = shared.resolve_vlan_options(vlan)["gateway"]
network = shared.network_for(vlan)
server_ip = f"{gateway}/{network.prefixlen}"
listen_port = info["listen_port"]
L = [
"# Generated by core.py -- do not edit manually.",
"# Run: sudo python3 core.py --apply",
"",
"[Interface]",
f"PrivateKey = {server_private_key}",
f"Address = {server_ip}",
f"ListenPort = {listen_port}",
"",
]
for peer in vlan.get("peers", []):
if not peer.get("enabled", True):
L += [f"# DISABLED: {peer['name']}", ""]
continue
L += [
f"# {peer['name']}",
"[Peer]",
f"PublicKey = {peer['public_key']}",
f"AllowedIPs = {peer['ip']}/32",
f"PersistentKeepalive = {WG_KEEPALIVE}",
"",
]
return "\n".join(L)
# ===================================================================
# Apply
# ===================================================================
def ensure_wg_interfaces(data):
"""Generate WireGuard server confs and bring up / sync all WG interfaces."""
wg_vlans = [v for v in data.get("vlans", []) if validation.is_wg(v)]
if not wg_vlans:
return
for vlan in wg_vlans:
iface = validation.derive_interface(vlan, data)
print(f" [{iface}]")
kf = wg_server_key_path(iface)
if not kf.exists():
print(f" Generating server private key...")
private = generate_wg_server_key(iface)
else:
private = kf.read_text().strip()
pub_result = subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
)
public = pub_result.stdout.strip()
pubkey_file = wg_server_pubkey_path(iface)
pubkey_file.write_text(public + "\n")
shared.chown_to_script_dir_owner(pubkey_file)
print(f" Server public key: {public[:20]}...")
WG_DIR.mkdir(exist_ok=True)
conf_file = wg_conf_path_for(iface)
new_conf = build_wg_server_conf(vlan, private, iface)
listen_port = vlan["vpn_information"]["listen_port"]
port_changed = False
if conf_file.exists():
m = re.search(r'ListenPort\s*=\s*(\d+)', conf_file.read_text())
if m and int(m.group(1)) != listen_port:
port_changed = True
conf_file.write_text(new_conf)
conf_file.chmod(0o600)
peer_count = len([p for p in vlan.get("peers", []) if p.get("enabled", True)])
print(f" Wrote {conf_file} ({peer_count} enabled peer(s))")
if not wg_interface_up(iface):
print(f" Bringing up {iface}...")
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} is up.")
elif port_changed:
print(f" Listen port changed - restarting {iface}...")
subprocess.run(["wg-quick", "down", iface], capture_output=True, text=True)
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} restarted.")
else:
print(f" Syncing peers to live {iface}...")
subprocess.run(["wg", "syncconf", iface, str(conf_file)], capture_output=True, text=True)