linuxrouter/router/vpn.py

1001 lines
37 KiB
Python

#!/usr/bin/env python3
"""
vpn.py -- Manage WireGuard VPN server and peers.
Reads core.json for VPN interface configuration. Any VLAN whose interface
name starts with "wg" is treated as a WireGuard interface. Peer data is
stored in per-interface dotfiles (.vpn-wg0, .vpn-wg1, etc.) in the same
directory as this script.
Server private keys:
Stored at /etc/wireguard/<iface>.key (root read-only, 600).
Generated once on --apply if not already present.
Peer key model (WireGuard is symmetric):
Each peer generates a keypair. The peer's PRIVATE key is embedded in
their client config file and must be transferred to them securely.
The peer's PUBLIC key is stored in the dotfile and written into the
WireGuard conf. The server's public key is embedded in each client config.
Client config files:
Generated as vpn-client-<name>.conf in the same directory as vpn.py.
Permissions: 600 (root read-only).
Contains the peer's private key -- transfer to the client by secure means
(e.g. encrypted email, USB), then delete from this server.
Usage:
sudo python3 vpn.py --add-peer Add a new peer interactively
sudo python3 vpn.py --manage-peers List and manage existing peers
sudo python3 vpn.py --apply Write WireGuard config and sync peers
sudo python3 vpn.py --disable Stop WireGuard on all interfaces
sudo python3 vpn.py --status Show WireGuard service and interface status
sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats
"""
import ipaddress
import json
import os
import re
import subprocess
import sys
import argparse
from pathlib import Path
from datetime import datetime, timezone
SCRIPT_DIR = Path(__file__).parent
DHCP_CONFIG_FILE = SCRIPT_DIR / "core.json"
DDNS_CONFIG_FILE = SCRIPT_DIR / "ddns.json"
WG_DIR = Path("/etc/wireguard")
KEEPALIVE = 25
# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
def die(msg):
print(f"ERROR: {msg}")
sys.exit(1)
def check_root():
if os.geteuid() != 0:
die("This script must be run as root (sudo).")
def chown_to_script_dir_owner(path):
"""Chown a file to the owner of the script directory.
Keeps SCRIPT_DIR files user-owned even when running as root.
/etc/wireguard files are intentionally excluded — they stay root-owned.
"""
try:
stat = SCRIPT_DIR.stat()
os.chown(path, stat.st_uid, stat.st_gid)
except OSError:
pass # non-fatal
def run(cmd, check=True, capture=True):
return subprocess.run(cmd, capture_output=capture, text=True, check=check)
def wg_available():
return subprocess.run(["which", "wg"], capture_output=True).returncode == 0
def sense_mtu(dhcp_data):
"""
Derive the recommended WireGuard tunnel MTU from the WAN interface MTU.
WireGuard adds 60 bytes of overhead for IPv4, so tunnel MTU = link MTU - 60.
Falls back to 1420 (correct for standard 1500 MTU links) if sensing fails.
Reads wan_interface from core.json general block.
"""
wan = dhcp_data.get("general", {}).get("wan_interface", "")
if wan:
try:
result = subprocess.run(
["ip", "link", "show", wan],
capture_output=True, text=True
)
m = re.search(r"\bmtu\s+(\d+)", result.stdout)
if m:
return int(m.group(1)) - 60
except Exception:
pass
return 1420
def check_wireguard_tools():
"""Ensure wireguard-tools is installed; prompt to install via apt if not."""
if wg_available():
return
print("wireguard-tools is not installed (provides wg and wg-quick).")
print()
while True:
choice = input("Install wireguard-tools now via apt? [y/N]: ").strip().lower()
if choice in ("y", "yes"):
break
if choice in ("n", "no", ""):
die("Cannot continue without wireguard-tools. Install it and retry.")
result = subprocess.run(["apt-get", "install", "-y", "wireguard-tools"])
if result.returncode != 0:
die("Package installation failed. Install wireguard-tools manually and retry.")
def _fmt_bytes(n):
if n < 1024:
return f"{n} B"
elif n < 1024 ** 2:
return f"{n / 1024:.1f} KB"
elif n < 1024 ** 3:
return f"{n / 1024**2:.1f} MB"
else:
return f"{n / 1024**3:.2f} GB"
# ------------------------------------------------------------------------------
# Load core.json / dotfiles
# ------------------------------------------------------------------------------
def load_dhcp():
if not DHCP_CONFIG_FILE.exists():
die(f"Config file not found: {DHCP_CONFIG_FILE}")
with open(DHCP_CONFIG_FILE) as f:
return json.load(f)
def wg_interfaces(dhcp_data):
"""Return list of VLAN dicts whose interface name starts with 'wg'."""
return [v for v in dhcp_data.get("vlans", [])
if v.get("interface", "").startswith("wg")]
def dotfile_path(iface):
return SCRIPT_DIR / f".vpn-{iface}"
def vpi(vlan):
"""Return the vpn_information dict for a WG VLAN (plain object, not list)."""
return vlan["vpn_information"]
def server_key_path(iface):
return WG_DIR / f"{iface}.key"
def wg_conf_path(iface):
return WG_DIR / f"{iface}.conf"
def load_peers(iface):
"""Load peers list from the dotfile. Returns [] if file does not exist."""
path = dotfile_path(iface)
if not path.exists():
return []
with open(path) as f:
return json.load(f).get("peers", [])
def save_peers(iface, peers):
"""Write peers list to the dotfile with 600 permissions."""
path = dotfile_path(iface)
with open(path, "w") as f:
json.dump({"peers": peers}, f, indent=2)
f.write("\n")
path.chmod(0o600)
chown_to_script_dir_owner(path)
# ------------------------------------------------------------------------------
# IP allocation
# ------------------------------------------------------------------------------
def next_available_ip(vlan, peers):
"""
Find the first available peer IP in the wg VLAN subnet, starting from .2.
Skips the gateway IP. Scans .2-.254 for the first gap.
"""
gateway = vpi(vlan)["gateway"]
gw_net = ipaddress.IPv4Interface(f"{gateway}/24")
network = gw_net.network
base = int(network.network_address)
used = {int(ipaddress.IPv4Address(gateway))}
for peer in peers:
try:
used.add(int(ipaddress.IPv4Interface(peer["ip"]).ip))
except (KeyError, ValueError):
pass
for offset in range(2, 255):
candidate = ipaddress.IPv4Address(base + offset)
if int(candidate) not in used:
return f"{candidate}/32"
die(f"No available IPs in VPN subnet {network} (all .2-.254 allocated).")
# ------------------------------------------------------------------------------
# Key management
# ------------------------------------------------------------------------------
def generate_server_key(iface):
"""Generate server private key and store at WG_DIR/<iface>.key (600)."""
WG_DIR.mkdir(exist_ok=True)
private = run(["wg", "genkey"]).stdout.strip()
kf = server_key_path(iface)
kf.write_text(private + "\n")
kf.chmod(0o600)
print(f"Server private key generated: {kf}")
def get_server_public_key(iface):
"""Derive and return the server's public key from the stored private key."""
kf = server_key_path(iface)
if not kf.exists():
die(f"Server private key not found at {kf}. Run --apply first.")
private = kf.read_text().strip()
return subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
).stdout.strip()
def generate_peer_keypair():
"""Generate and return (private_key, public_key) for a peer."""
private = run(["wg", "genkey"]).stdout.strip()
public = subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
).stdout.strip()
return private, public
# ------------------------------------------------------------------------------
# Endpoint resolution
# ------------------------------------------------------------------------------
def resolve_endpoint(listen_port):
"""
Resolve the public endpoint for client configs.
Resolution order:
1. First enabled provider hostname from ddns.json
2. Manual entry prompt if ddns.json is missing or has no enabled provider
The user is always shown the resolved value and given the opportunity to
edit it before it is used.
"""
import readline
candidate = None
source = None
if DDNS_CONFIG_FILE.exists():
with open(DDNS_CONFIG_FILE) as f:
ddns = json.load(f)
for provider in ddns.get("providers", []):
if provider.get("enabled") is not True:
continue
ptype = provider.get("provider", "").lower()
if ptype == "noip":
hostnames = provider.get("hostnames", [])
if hostnames:
candidate = f"{hostnames[0]}:{listen_port}"
source = "ddns.json"
break
elif ptype == "duckdns":
subdomains = provider.get("subdomains", [])
if subdomains:
candidate = f"{subdomains[0]}.duckdns.org:{listen_port}"
source = "ddns.json"
break
if not candidate:
print("No enabled DDNS provider found in ddns.json.")
else:
print(f"ddns.json not found at {DDNS_CONFIG_FILE}.")
if candidate:
prompt = f"Public endpoint (from {source}): "
else:
print("Please enter the public endpoint manually.")
prompt = "Public endpoint (hostname:port): "
while True:
try:
readline.set_startup_hook(
lambda: readline.insert_text(candidate) if candidate else None
)
entry = input(prompt).strip()
finally:
readline.set_startup_hook(None)
if not entry:
print(" Endpoint cannot be empty.")
continue
if ":" not in entry:
entry = f"{entry}:{listen_port}"
return entry
# ------------------------------------------------------------------------------
# Split-tunnel route computation
# ------------------------------------------------------------------------------
def split_tunnel_routes(dhcp_data):
"""
Return a list of CIDR strings for all VLANs defined in core.json.
WG VLANs use vpn_information.gateway to derive their subnet.
Used as AllowedIPs in client configs when split_tunnel is true.
"""
routes = []
for v in dhcp_data.get("vlans", []):
if v.get("interface", "").startswith("wg"):
gw = vpi(v)["gateway"]
net = ipaddress.IPv4Network(f"{gw}/24", strict=False)
routes.append(str(net))
else:
d = v["dhcp"]
net = ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False)
routes.append(str(net))
return routes
# ------------------------------------------------------------------------------
# Client config
# ------------------------------------------------------------------------------
def build_client_conf(peer, private_key, server_public_key, endpoint,
allowed_ips, dns, domain, mtu):
dns_line = f"DNS = {dns}, {domain}" if domain else f"DNS = {dns}"
return "\n".join([
"[Interface]",
f"PrivateKey = {private_key}",
f"Address = {peer['ip']}",
dns_line,
f"MTU = {mtu}",
"",
"[Peer]",
f"PublicKey = {server_public_key}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
f"PersistentKeepalive = {KEEPALIVE}",
"",
])
def write_client_conf(peer, private_key, server_public_key, endpoint,
allowed_ips, dns, domain, mtu):
conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
content = build_client_conf(peer, private_key, server_public_key,
endpoint, allowed_ips, dns, domain, mtu)
conf_path.write_text(content)
conf_path.chmod(0o600)
chown_to_script_dir_owner(conf_path)
return conf_path
# ------------------------------------------------------------------------------
# WireGuard server conf
# ------------------------------------------------------------------------------
def build_wg_conf(vlan, peers, server_private_key):
iface = vlan["interface"]
info = vpi(vlan)
gateway = info["gateway"]
gw_net = ipaddress.IPv4Interface(f"{gateway}/24")
server_ip = f"{gateway}/{gw_net.network.prefixlen}"
listen_port = info["listen_port"]
lines = [
"# Generated by vpn.py -- do not edit manually.",
"# Run: sudo python3 vpn.py --apply",
"",
"[Interface]",
f"PrivateKey = {server_private_key}",
f"Address = {server_ip}",
f"ListenPort = {listen_port}",
"",
]
for peer in peers:
if peer.get("enabled") is True:
lines += [
f"# {peer['name']}",
"[Peer]",
f"PublicKey = {peer['public_key']}",
f"AllowedIPs = {peer['ip']}",
"",
]
return "\n".join(lines)
# ------------------------------------------------------------------------------
# Live peer sync
# ------------------------------------------------------------------------------
def sync_peers_live(iface, peers):
"""
Sync live WireGuard peers to match the dotfile state without a service
restart. Adds peers present in the dotfile (enabled) but not live;
removes peers live but not in the enabled dotfile set.
"""
result = run(["wg", "show", iface, "dump"], check=False)
if result.returncode != 0:
return # interface not up yet
lines = result.stdout.strip().splitlines()
live_keys = set()
for line in lines[1:]: # first line is the server interface itself
parts = line.split("\t")
if parts:
live_keys.add(parts[0])
enabled_peers = {
p["public_key"]: p
for p in peers
if p.get("enabled") is True
}
dotfile_keys = set(enabled_peers.keys())
for key in dotfile_keys - live_keys:
peer = enabled_peers[key]
run(["wg", "set", iface, "peer", key, "allowed-ips", peer["ip"]])
print(f" Added peer: {peer['name']} ({peer['ip']})")
for key in live_keys - dotfile_keys:
run(["wg", "set", iface, "peer", key, "remove"])
print(f" Removed peer: {key[:16]}...")
# ------------------------------------------------------------------------------
# Interface selection
# ------------------------------------------------------------------------------
def validate_wg_vlans(wg_vlans):
"""Die with a clear message if any wg VLAN is missing a valid vpn_information block."""
for vlan in wg_vlans:
iface = vlan.get("interface", "?")
info = vlan.get("vpn_information")
if not isinstance(info, dict):
die(f"Interface '{iface}' is missing a vpn_information block in core.json. "
f"Add: \"vpn_information\": {{\"listen_port\": 51820, \"gateway\": \"...\"}}")
if not isinstance(info.get("listen_port"), int):
die(f"Interface '{iface}' vpn_information is missing a valid listen_port in core.json.")
if not info.get("gateway"):
die(f"Interface '{iface}' vpn_information is missing gateway in core.json.")
def pick_wg_interface(wg_vlans):
"""
If only one wg interface exists, return it immediately.
Otherwise, print a numbered list and prompt the user to pick.
"""
if len(wg_vlans) == 1:
return wg_vlans[0]
print("Available WireGuard interfaces:")
for i, vlan in enumerate(wg_vlans, 1):
lp = vpi(vlan)["listen_port"]
print(f" {i}. {vlan['interface']} ({vlan['name']}, port {lp})")
print()
while True:
choice = input("Select interface number: ").strip()
try:
idx = int(choice) - 1
if 0 <= idx < len(wg_vlans):
return wg_vlans[idx]
except ValueError:
pass
print(" Invalid selection.")
# ------------------------------------------------------------------------------
# --add-peer
# ------------------------------------------------------------------------------
def cmd_add_peer(dhcp_data):
check_root()
check_wireguard_tools()
wg_vlans = wg_interfaces(dhcp_data)
vlan = pick_wg_interface(wg_vlans)
iface = vlan["interface"]
peers = load_peers(iface)
# -- Resolve endpoint -------------------------------------------------------
endpoint = resolve_endpoint(vpi(vlan)["listen_port"])
# -- Peer name -------------------------------------------------------------
print()
while True:
name = input("Peer name (e.g. norman-laptop): ").strip()
if not name:
print(" Name cannot be empty.")
continue
if any(p["name"] == name for p in peers):
print(f" A peer named '{name}' already exists for {iface}.")
continue
break
# -- Peer IP ---------------------------------------------------------------
prefill = next_available_ip(vlan, peers)
print(f"Peer IP [{prefill}]: ", end="", flush=True)
ip_input = input().strip()
peer_ip = ip_input if ip_input else prefill
d = vpi(vlan)
gateway = d["gateway"]
network = ipaddress.IPv4Network(f"{gateway}/24", strict=False)
try:
iface_ip = ipaddress.IPv4Interface(peer_ip)
if iface_ip.ip not in network:
die(f"IP '{peer_ip}' is not within VPN subnet {network}.")
if iface_ip.ip == ipaddress.IPv4Address(gateway):
die(f"IP '{peer_ip}' is the server gateway. Choose a different IP.")
for p in peers:
if ipaddress.IPv4Interface(p["ip"]).ip == iface_ip.ip:
die(f"IP '{peer_ip}' is already assigned to peer '{p['name']}'.")
except ValueError as e:
die(f"Invalid IP '{peer_ip}': {e}")
# -- Generate keypair -------------------------------------------------------
print(f"\nGenerating keypair for '{name}'...")
private_key, public_key = generate_peer_keypair()
# -- Ensure server key exists -----------------------------------------------
kf = server_key_path(iface)
if not kf.exists():
print(f"Server private key not found for {iface} -- generating now...")
generate_server_key(iface)
server_public_key = get_server_public_key(iface)
# -- Split tunnel prompt ---------------------------------------------------
print()
st_input = input("Split tunnel? Route only VPN subnets (not all traffic) through WireGuard. [Y/n]: ").strip().lower()
split_tunnel = st_input != "n"
if split_tunnel:
allowed_ips = ", ".join(split_tunnel_routes(dhcp_data))
else:
allowed_ips = "0.0.0.0/0"
info = vpi(vlan)
dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway
domain = info.get("domain", "")
mtu_override = info.get("explicit_overrides", {}).get("mtu", "")
mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data)
# -- Add peer to dotfile ----------------------------------------------------
new_peer = {"name": name, "ip": peer_ip, "public_key": public_key,
"split_tunnel": split_tunnel, "enabled": True}
peers.append(new_peer)
save_peers(iface, peers)
print(f"Peer '{name}' added to {dotfile_path(iface).name} (ip: {peer_ip}).")
# -- Write client config ----------------------------------------------------
conf_path = write_client_conf(new_peer, private_key, server_public_key,
endpoint, allowed_ips, dns, domain, mtu)
private_key = "0" * len(private_key)
del private_key
# -- Instructions -----------------------------------------------------------
print()
print("=" * 68)
print(f" Client config written: {conf_path}")
print()
print(" NEXT STEPS:")
print(f" 1. Transfer {conf_path.name} to '{name}' by secure means")
print(" (encrypted email, USB drive, etc.).")
print(" 2. The recipient imports it into their WireGuard app.")
print(f" 3. Delete {conf_path} from this server once transferred.")
print()
print(" WARNING: This file contains the peer's private key.")
print(" Do not leave it on this server longer than necessary.")
print("=" * 68)
print()
print(" To apply changes to WireGuard, run:")
print(" sudo python3 vpn.py --apply")
print()
# ------------------------------------------------------------------------------
# --list-peers
# ------------------------------------------------------------------------------
def cmd_list_peers(dhcp_data):
check_root()
check_wireguard_tools()
wg_vlans = wg_interfaces(dhcp_data)
# -- Collect all peers across all interfaces --------------------------------
# Each entry: (iface, peer_dict, vlan_dict, peers_list)
all_entries = []
for vlan in wg_vlans:
iface = vlan["interface"]
peers = load_peers(iface)
for peer in peers:
all_entries.append((iface, peer, vlan, peers))
if not all_entries:
print("No peers found across any WireGuard interface.")
return
# -- List peers -------------------------------------------------------------
print("Peers:")
for i, (iface, peer, _, _) in enumerate(all_entries, 1):
status = "enabled" if peer.get("enabled") else "disabled"
print(f" {i}. [{iface}] {peer['name']} {peer['ip']} [{status}]")
print()
while True:
choice = input("Select peer number (or Enter to cancel): ").strip()
if not choice:
return
try:
idx = int(choice) - 1
if 0 <= idx < len(all_entries):
break
except ValueError:
pass
print(" Invalid selection.")
iface, peer, vlan, peers = all_entries[idx]
print(f"\nSelected: {peer['name']} on {iface}")
print(" 1. Rename")
print(" 2. Regenerate keys")
print(" 3. Delete")
print(" 4. Cancel")
print()
action = input("Select action: ").strip()
modified = False
if action == "1":
# -- Rename ----------------------------------------------------------------
while True:
new_name = input(f"New name [{peer['name']}]: ").strip()
if not new_name:
print(" Name cannot be empty.")
continue
if any(p["name"] == new_name for p in peers if p is not peer):
print(f" A peer named '{new_name}' already exists for {iface}.")
continue
break
old_conf = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
new_conf = SCRIPT_DIR / f"vpn-client-{new_name}.conf"
if old_conf.exists():
old_conf.rename(new_conf)
print(f" Renamed client config: {old_conf.name} -> {new_conf.name}")
peer["name"] = new_name
save_peers(iface, peers)
print(f"Peer renamed to '{new_name}'.")
modified = True
elif action == "2":
# -- Regenerate keys -------------------------------------------------------
kf = server_key_path(iface)
if not kf.exists():
die(f"Server private key not found at {kf}. Run --apply first.")
endpoint = resolve_endpoint(vpi(vlan)["listen_port"])
print(f"\nRegenerating keypair for '{peer['name']}'...")
private_key, public_key = generate_peer_keypair()
server_public_key = get_server_public_key(iface)
if peer.get("split_tunnel", True):
allowed_ips = ", ".join(split_tunnel_routes(dhcp_data))
else:
allowed_ips = "0.0.0.0/0"
info = vpi(vlan)
gateway = info["gateway"]
dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway
domain = info.get("domain", "")
mtu_override = info.get("explicit_overrides", {}).get("mtu", "")
mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data)
peer["public_key"] = public_key
save_peers(iface, peers)
conf_path = write_client_conf(peer, private_key, server_public_key,
endpoint, allowed_ips, dns, domain, mtu)
private_key = "0" * len(private_key)
del private_key
print()
print("=" * 68)
print(f" New client config written: {conf_path}")
print()
print(" WARNING: This file contains the peer's private key.")
print(" The previous client config is now invalid.")
print(" Transfer the new config and delete it from this server.")
print("=" * 68)
modified = True
elif action == "3":
# -- Delete ----------------------------------------------------------------
confirm = input(f"\nDelete peer '{peer['name']}' from {iface}? [y/N]: ").strip().lower()
if confirm != "y":
print("Cancelled.")
return
peers[:] = [p for p in peers if p["name"] != peer["name"]]
save_peers(iface, peers)
conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
if conf_path.exists():
print(f" NOTE: Client config {conf_path.name} still exists on this server.")
print(f" It is now invalid and should be deleted.")
print(f"Peer '{peer['name']}' deleted from {iface}.")
modified = True
elif action == "4":
print("Cancelled.")
return
else:
die("Invalid action.")
if modified:
print()
print(" To apply changes to WireGuard, run:")
print(" sudo python3 vpn.py --apply")
print()
# ------------------------------------------------------------------------------
# --apply
# ------------------------------------------------------------------------------
def cmd_apply(dhcp_data):
check_root()
check_wireguard_tools()
wg_vlans = wg_interfaces(dhcp_data)
for vlan in wg_vlans:
iface = vlan["interface"]
print(f"-- {iface} " + "-" * 58)
peers = load_peers(iface)
# -- Ensure server key -----------------------------------------------
kf = server_key_path(iface)
if not kf.exists():
print(f" Generating server private key for {iface}...")
generate_server_key(iface)
else:
print(f" Using existing server key: {kf}")
server_private_key = kf.read_text().strip()
server_public_key = get_server_public_key(iface)
# -- Write wg conf ---------------------------------------------------
WG_DIR.mkdir(exist_ok=True)
conf_file = wg_conf_path(iface)
new_conf = build_wg_conf(vlan, peers, server_private_key)
listen_port = vpi(vlan)["listen_port"]
# Detect whether the listen port has changed in the existing conf
port_changed = False
if conf_file.exists():
for line in conf_file.read_text().splitlines():
if line.startswith("ListenPort"):
old_port = line.split("=")[1].strip()
port_changed = old_port != str(listen_port)
break
conf_file.write_text(new_conf)
conf_file.chmod(0o600)
print(f" Written: {conf_file}")
# -- Start or sync service -------------------------------------------
svc = f"wg-quick@{iface}"
result = run(["systemctl", "is-active", svc], check=False)
if result.stdout.strip() == "active":
if port_changed:
print(f" Listen port changed -- restarting {svc}...")
result2 = run(["systemctl", "restart", svc], check=False)
if result2.returncode != 0:
die(f"Failed to restart {svc}:\n{result2.stderr.strip()}")
print(f" Service {svc} restarted.")
else:
print(f" Service {svc} is active -- syncing peers live...")
sync_peers_live(iface, peers)
else:
result2 = run(["systemctl", "enable", "--now", svc], check=False)
if result2.returncode != 0:
result3 = run(["systemctl", "reload-or-restart", svc], check=False)
if result3.returncode != 0:
die(f"Failed to start {svc}:\n{result3.stderr.strip()}")
print(f" Service {svc} enabled and started.")
# -- Summary ---------------------------------------------------------
enabled_peers = [p for p in peers if p.get("enabled") is True]
print(f" Server public key: {server_public_key}")
print(f" Listen port: UDP {vpi(vlan)['listen_port']}")
print(f" Enabled peers: {len(enabled_peers)}")
for p in enabled_peers:
print(f" {p['ip']:<22} {p['name']}")
print()
# -- Apply core config to pick up VPN firewall rules ---------------------
core_py = SCRIPT_DIR / "core.py"
if core_py.exists():
print("-- Applying core config (core.py --apply) ----------------------------")
result = subprocess.run(
[sys.executable, str(core_py), "--apply"],
capture_output=False
)
if result.returncode != 0:
print("WARNING: core.py --apply returned non-zero. Check output above.")
else:
print(f"WARNING: {core_py} not found -- run core.py --apply manually to load VPN firewall rules.")
# ------------------------------------------------------------------------------
# --disable
# ------------------------------------------------------------------------------
def cmd_disable(dhcp_data):
check_root()
wg_vlans = wg_interfaces(dhcp_data)
for vlan in wg_vlans:
iface = vlan["interface"]
svc = f"wg-quick@{iface}"
result = run(["systemctl", "disable", "--now", svc], check=False)
if result.returncode != 0:
print(f"WARNING: {svc} may not have been running:\n{result.stderr.strip()}")
else:
print(f"WireGuard service {svc} stopped and disabled.")
# ------------------------------------------------------------------------------
# --status
# ------------------------------------------------------------------------------
def cmd_status(dhcp_data):
check_root()
wg_vlans = wg_interfaces(dhcp_data)
print(f" {'UNIT':<45} {'ACTIVE':<12} {'ENABLED'}")
print(f" {'-'*45} {'-'*12} {'-'*10}")
for vlan in wg_vlans:
iface = vlan["interface"]
svc = f"wg-quick@{iface}"
r_active = run(["systemctl", "is-active", svc], check=False)
r_enabled = run(["systemctl", "is-enabled", svc], check=False)
active = r_active.stdout.strip()
enabled = r_enabled.stdout.strip()
active_sym = "" if active == "active" else ""
enabled_sym = "" if enabled == "enabled" else ""
print(f" {svc:<45} {active_sym} {active:<10} {enabled_sym} {enabled}")
if active == "active":
result = run(["wg", "show", iface], check=False)
if result.returncode == 0:
info = {}
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("public key:"):
info["pubkey"] = line.split(":", 1)[1].strip()
elif line.startswith("listening port:"):
info["port"] = line.split(":", 1)[1].strip()
elif line.startswith("peer:"):
info.setdefault("peers", 0)
info["peers"] += 1
if "pubkey" in info:
print(f" public key: {info['pubkey']}")
if "port" in info:
print(f" listening port: {info['port']}")
peers = load_peers(iface)
enabled_peers = [p for p in peers if p.get("enabled") is True]
print(f" peers: {len(enabled_peers)} configured, {info.get('peers', 0)} connected")
# ------------------------------------------------------------------------------
# --logs
# ------------------------------------------------------------------------------
def cmd_logs(dhcp_data):
check_root()
wg_vlans = wg_interfaces(dhcp_data)
now = datetime.now(timezone.utc)
for vlan in wg_vlans:
iface = vlan["interface"]
peers = load_peers(iface)
peer_by_key = {p["public_key"]: p["name"] for p in peers}
print(f"-- {iface} " + "-" * 58)
result = run(["wg", "show", iface, "dump"], check=False)
if result.returncode != 0:
print(f" WireGuard interface '{iface}' is not up.")
print()
continue
lines = result.stdout.strip().splitlines()
peer_lines = lines[1:] # first line is the server interface itself
if not peer_lines:
print(" No peers currently configured.")
print()
continue
print(f" {'PEER':<22} {'IP':<20} {'ENDPOINT':<26} {'LAST HANDSHAKE':<22} {'RX':<12} {'TX'}")
print(" " + "-" * 106)
for ln in peer_lines:
parts = ln.split("\t")
if len(parts) < 6:
continue
pub_key = parts[0]
endpoint = parts[2] if parts[2] != "(none)" else "not connected"
allowed_ips = parts[3]
last_hs_ts = parts[4]
rx_bytes = int(parts[5])
tx_bytes = int(parts[6]) if len(parts) > 6 else 0
name = peer_by_key.get(pub_key, pub_key[:12] + "...")
if last_hs_ts == "0":
last_hs = "never"
else:
ts = int(last_hs_ts)
hs_dt = datetime.fromtimestamp(ts, tz=timezone.utc)
delta = now - hs_dt
seconds = int(delta.total_seconds())
if seconds < 60:
last_hs = f"{seconds}s ago"
elif seconds < 3600:
last_hs = f"{seconds // 60}m ago"
elif seconds < 86400:
last_hs = f"{seconds // 3600}h ago"
else:
last_hs = f"{seconds // 86400}d ago"
rx_str = _fmt_bytes(rx_bytes)
tx_str = _fmt_bytes(tx_bytes)
display_ip = allowed_ips.split(",")[0].strip()
print(f" {name:<22} {display_ip:<20} {endpoint:<26} {last_hs:<22} {rx_str:<12} {tx_str}")
print()
# ------------------------------------------------------------------------------
# Main
# ------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Manage WireGuard VPN server and peers",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" sudo python3 vpn.py --add-peer Add a new peer interactively\n"
" sudo python3 vpn.py --manage-peers List and manage existing peers\n"
" sudo python3 vpn.py --apply Write WireGuard config and sync peers\n"
" sudo python3 vpn.py --disable Stop WireGuard on all interfaces\n"
" sudo python3 vpn.py --status Show WireGuard service and interface status\n"
" sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats\n"
)
)
parser.add_argument("--add-peer", action="store_true",
help="Add a new peer interactively")
parser.add_argument("--manage-peers", action="store_true",
help="List and manage existing peers")
parser.add_argument("--apply", action="store_true",
help="Write WireGuard config and sync peers")
parser.add_argument("--disable", action="store_true",
help="Stop WireGuard on all interfaces")
parser.add_argument("--status", action="store_true",
help="Show WireGuard service and interface status")
parser.add_argument("--view-peers", action="store_true",
help="Show per-peer handshake and traffic stats")
args = parser.parse_args()
if not any([args.add_peer, args.manage_peers, args.apply,
args.disable, args.status, args.view_peers]):
parser.print_help()
sys.exit(0)
dhcp_data = load_dhcp()
wg_vlans = wg_interfaces(dhcp_data)
if not wg_vlans:
die("No WireGuard interfaces (wg*) found in core.json.")
validate_wg_vlans(wg_vlans)
if args.add_peer:
cmd_add_peer(dhcp_data)
elif args.manage_peers:
cmd_list_peers(dhcp_data)
elif args.apply:
cmd_apply(dhcp_data)
elif args.disable:
cmd_disable(dhcp_data)
elif args.status:
cmd_status(dhcp_data)
elif args.view_peers:
cmd_logs(dhcp_data)
if __name__ == "__main__":
main()