1001 lines
37 KiB
Python
1001 lines
37 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
vpn.py -- Manage WireGuard VPN server and peers.
|
|
|
|
Reads core.json for VPN interface configuration. Any VLAN whose interface
|
|
name starts with "wg" is treated as a WireGuard interface. Peer data is
|
|
stored in per-interface dotfiles (.vpn-wg0, .vpn-wg1, etc.) in the same
|
|
directory as this script.
|
|
|
|
Server private keys:
|
|
Stored at /etc/wireguard/<iface>.key (root read-only, 600).
|
|
Generated once on --apply if not already present.
|
|
|
|
Peer key model (WireGuard is symmetric):
|
|
Each peer generates a keypair. The peer's PRIVATE key is embedded in
|
|
their client config file and must be transferred to them securely.
|
|
The peer's PUBLIC key is stored in the dotfile and written into the
|
|
WireGuard conf. The server's public key is embedded in each client config.
|
|
|
|
Client config files:
|
|
Generated as vpn-client-<name>.conf in the same directory as vpn.py.
|
|
Permissions: 600 (root read-only).
|
|
Contains the peer's private key -- transfer to the client by secure means
|
|
(e.g. encrypted email, USB), then delete from this server.
|
|
|
|
Usage:
|
|
sudo python3 vpn.py --add-peer Add a new peer interactively
|
|
sudo python3 vpn.py --manage-peers List and manage existing peers
|
|
sudo python3 vpn.py --apply Write WireGuard config and sync peers
|
|
sudo python3 vpn.py --disable Stop WireGuard on all interfaces
|
|
sudo python3 vpn.py --status Show WireGuard service and interface status
|
|
sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats
|
|
"""
|
|
|
|
import ipaddress
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
DHCP_CONFIG_FILE = SCRIPT_DIR / "core.json"
|
|
DDNS_CONFIG_FILE = SCRIPT_DIR / "ddns.json"
|
|
WG_DIR = Path("/etc/wireguard")
|
|
KEEPALIVE = 25
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def die(msg):
|
|
print(f"ERROR: {msg}")
|
|
sys.exit(1)
|
|
|
|
def check_root():
|
|
if os.geteuid() != 0:
|
|
die("This script must be run as root (sudo).")
|
|
|
|
def chown_to_script_dir_owner(path):
|
|
"""Chown a file to the owner of the script directory.
|
|
Keeps SCRIPT_DIR files user-owned even when running as root.
|
|
/etc/wireguard files are intentionally excluded — they stay root-owned.
|
|
"""
|
|
try:
|
|
stat = SCRIPT_DIR.stat()
|
|
os.chown(path, stat.st_uid, stat.st_gid)
|
|
except OSError:
|
|
pass # non-fatal
|
|
|
|
def run(cmd, check=True, capture=True):
|
|
return subprocess.run(cmd, capture_output=capture, text=True, check=check)
|
|
|
|
def wg_available():
|
|
return subprocess.run(["which", "wg"], capture_output=True).returncode == 0
|
|
|
|
def sense_mtu(dhcp_data):
|
|
"""
|
|
Derive the recommended WireGuard tunnel MTU from the WAN interface MTU.
|
|
WireGuard adds 60 bytes of overhead for IPv4, so tunnel MTU = link MTU - 60.
|
|
Falls back to 1420 (correct for standard 1500 MTU links) if sensing fails.
|
|
Reads wan_interface from core.json general block.
|
|
"""
|
|
wan = dhcp_data.get("general", {}).get("wan_interface", "")
|
|
if wan:
|
|
try:
|
|
result = subprocess.run(
|
|
["ip", "link", "show", wan],
|
|
capture_output=True, text=True
|
|
)
|
|
m = re.search(r"\bmtu\s+(\d+)", result.stdout)
|
|
if m:
|
|
return int(m.group(1)) - 60
|
|
except Exception:
|
|
pass
|
|
return 1420
|
|
|
|
def check_wireguard_tools():
|
|
"""Ensure wireguard-tools is installed; prompt to install via apt if not."""
|
|
if wg_available():
|
|
return
|
|
print("wireguard-tools is not installed (provides wg and wg-quick).")
|
|
print()
|
|
while True:
|
|
choice = input("Install wireguard-tools now via apt? [y/N]: ").strip().lower()
|
|
if choice in ("y", "yes"):
|
|
break
|
|
if choice in ("n", "no", ""):
|
|
die("Cannot continue without wireguard-tools. Install it and retry.")
|
|
result = subprocess.run(["apt-get", "install", "-y", "wireguard-tools"])
|
|
if result.returncode != 0:
|
|
die("Package installation failed. Install wireguard-tools manually and retry.")
|
|
|
|
def _fmt_bytes(n):
|
|
if n < 1024:
|
|
return f"{n} B"
|
|
elif n < 1024 ** 2:
|
|
return f"{n / 1024:.1f} KB"
|
|
elif n < 1024 ** 3:
|
|
return f"{n / 1024**2:.1f} MB"
|
|
else:
|
|
return f"{n / 1024**3:.2f} GB"
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Load core.json / dotfiles
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def load_dhcp():
|
|
if not DHCP_CONFIG_FILE.exists():
|
|
die(f"Config file not found: {DHCP_CONFIG_FILE}")
|
|
with open(DHCP_CONFIG_FILE) as f:
|
|
return json.load(f)
|
|
|
|
def wg_interfaces(dhcp_data):
|
|
"""Return list of VLAN dicts whose interface name starts with 'wg'."""
|
|
return [v for v in dhcp_data.get("vlans", [])
|
|
if v.get("interface", "").startswith("wg")]
|
|
|
|
def dotfile_path(iface):
|
|
return SCRIPT_DIR / f".vpn-{iface}"
|
|
|
|
def vpi(vlan):
|
|
"""Return the vpn_information dict for a WG VLAN (plain object, not list)."""
|
|
return vlan["vpn_information"]
|
|
|
|
def server_key_path(iface):
|
|
return WG_DIR / f"{iface}.key"
|
|
|
|
def wg_conf_path(iface):
|
|
return WG_DIR / f"{iface}.conf"
|
|
|
|
def load_peers(iface):
|
|
"""Load peers list from the dotfile. Returns [] if file does not exist."""
|
|
path = dotfile_path(iface)
|
|
if not path.exists():
|
|
return []
|
|
with open(path) as f:
|
|
return json.load(f).get("peers", [])
|
|
|
|
def save_peers(iface, peers):
|
|
"""Write peers list to the dotfile with 600 permissions."""
|
|
path = dotfile_path(iface)
|
|
with open(path, "w") as f:
|
|
json.dump({"peers": peers}, f, indent=2)
|
|
f.write("\n")
|
|
path.chmod(0o600)
|
|
chown_to_script_dir_owner(path)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# IP allocation
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def next_available_ip(vlan, peers):
|
|
"""
|
|
Find the first available peer IP in the wg VLAN subnet, starting from .2.
|
|
Skips the gateway IP. Scans .2-.254 for the first gap.
|
|
"""
|
|
gateway = vpi(vlan)["gateway"]
|
|
gw_net = ipaddress.IPv4Interface(f"{gateway}/24")
|
|
network = gw_net.network
|
|
base = int(network.network_address)
|
|
|
|
used = {int(ipaddress.IPv4Address(gateway))}
|
|
for peer in peers:
|
|
try:
|
|
used.add(int(ipaddress.IPv4Interface(peer["ip"]).ip))
|
|
except (KeyError, ValueError):
|
|
pass
|
|
|
|
for offset in range(2, 255):
|
|
candidate = ipaddress.IPv4Address(base + offset)
|
|
if int(candidate) not in used:
|
|
return f"{candidate}/32"
|
|
|
|
die(f"No available IPs in VPN subnet {network} (all .2-.254 allocated).")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Key management
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def generate_server_key(iface):
|
|
"""Generate server private key and store at WG_DIR/<iface>.key (600)."""
|
|
WG_DIR.mkdir(exist_ok=True)
|
|
private = run(["wg", "genkey"]).stdout.strip()
|
|
kf = server_key_path(iface)
|
|
kf.write_text(private + "\n")
|
|
kf.chmod(0o600)
|
|
print(f"Server private key generated: {kf}")
|
|
|
|
def get_server_public_key(iface):
|
|
"""Derive and return the server's public key from the stored private key."""
|
|
kf = server_key_path(iface)
|
|
if not kf.exists():
|
|
die(f"Server private key not found at {kf}. Run --apply first.")
|
|
private = kf.read_text().strip()
|
|
return subprocess.run(
|
|
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
|
|
).stdout.strip()
|
|
|
|
def generate_peer_keypair():
|
|
"""Generate and return (private_key, public_key) for a peer."""
|
|
private = run(["wg", "genkey"]).stdout.strip()
|
|
public = subprocess.run(
|
|
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
|
|
).stdout.strip()
|
|
return private, public
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Endpoint resolution
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def resolve_endpoint(listen_port):
|
|
"""
|
|
Resolve the public endpoint for client configs.
|
|
|
|
Resolution order:
|
|
1. First enabled provider hostname from ddns.json
|
|
2. Manual entry prompt if ddns.json is missing or has no enabled provider
|
|
|
|
The user is always shown the resolved value and given the opportunity to
|
|
edit it before it is used.
|
|
"""
|
|
import readline
|
|
|
|
candidate = None
|
|
source = None
|
|
|
|
if DDNS_CONFIG_FILE.exists():
|
|
with open(DDNS_CONFIG_FILE) as f:
|
|
ddns = json.load(f)
|
|
for provider in ddns.get("providers", []):
|
|
if provider.get("enabled") is not True:
|
|
continue
|
|
ptype = provider.get("provider", "").lower()
|
|
if ptype == "noip":
|
|
hostnames = provider.get("hostnames", [])
|
|
if hostnames:
|
|
candidate = f"{hostnames[0]}:{listen_port}"
|
|
source = "ddns.json"
|
|
break
|
|
elif ptype == "duckdns":
|
|
subdomains = provider.get("subdomains", [])
|
|
if subdomains:
|
|
candidate = f"{subdomains[0]}.duckdns.org:{listen_port}"
|
|
source = "ddns.json"
|
|
break
|
|
if not candidate:
|
|
print("No enabled DDNS provider found in ddns.json.")
|
|
else:
|
|
print(f"ddns.json not found at {DDNS_CONFIG_FILE}.")
|
|
|
|
if candidate:
|
|
prompt = f"Public endpoint (from {source}): "
|
|
else:
|
|
print("Please enter the public endpoint manually.")
|
|
prompt = "Public endpoint (hostname:port): "
|
|
|
|
while True:
|
|
try:
|
|
readline.set_startup_hook(
|
|
lambda: readline.insert_text(candidate) if candidate else None
|
|
)
|
|
entry = input(prompt).strip()
|
|
finally:
|
|
readline.set_startup_hook(None)
|
|
|
|
if not entry:
|
|
print(" Endpoint cannot be empty.")
|
|
continue
|
|
if ":" not in entry:
|
|
entry = f"{entry}:{listen_port}"
|
|
return entry
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Split-tunnel route computation
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def split_tunnel_routes(dhcp_data):
|
|
"""
|
|
Return a list of CIDR strings for all VLANs defined in core.json.
|
|
WG VLANs use vpn_information.gateway to derive their subnet.
|
|
Used as AllowedIPs in client configs when split_tunnel is true.
|
|
"""
|
|
routes = []
|
|
for v in dhcp_data.get("vlans", []):
|
|
if v.get("interface", "").startswith("wg"):
|
|
gw = vpi(v)["gateway"]
|
|
net = ipaddress.IPv4Network(f"{gw}/24", strict=False)
|
|
routes.append(str(net))
|
|
else:
|
|
d = v["dhcp"]
|
|
net = ipaddress.IPv4Network(f"{d['subnet']}/{d['subnet_mask']}", strict=False)
|
|
routes.append(str(net))
|
|
return routes
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Client config
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def build_client_conf(peer, private_key, server_public_key, endpoint,
|
|
allowed_ips, dns, domain, mtu):
|
|
dns_line = f"DNS = {dns}, {domain}" if domain else f"DNS = {dns}"
|
|
return "\n".join([
|
|
"[Interface]",
|
|
f"PrivateKey = {private_key}",
|
|
f"Address = {peer['ip']}",
|
|
dns_line,
|
|
f"MTU = {mtu}",
|
|
"",
|
|
"[Peer]",
|
|
f"PublicKey = {server_public_key}",
|
|
f"Endpoint = {endpoint}",
|
|
f"AllowedIPs = {allowed_ips}",
|
|
f"PersistentKeepalive = {KEEPALIVE}",
|
|
"",
|
|
])
|
|
|
|
def write_client_conf(peer, private_key, server_public_key, endpoint,
|
|
allowed_ips, dns, domain, mtu):
|
|
conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
|
|
content = build_client_conf(peer, private_key, server_public_key,
|
|
endpoint, allowed_ips, dns, domain, mtu)
|
|
conf_path.write_text(content)
|
|
conf_path.chmod(0o600)
|
|
chown_to_script_dir_owner(conf_path)
|
|
return conf_path
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# WireGuard server conf
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def build_wg_conf(vlan, peers, server_private_key):
|
|
iface = vlan["interface"]
|
|
info = vpi(vlan)
|
|
gateway = info["gateway"]
|
|
gw_net = ipaddress.IPv4Interface(f"{gateway}/24")
|
|
server_ip = f"{gateway}/{gw_net.network.prefixlen}"
|
|
listen_port = info["listen_port"]
|
|
|
|
lines = [
|
|
"# Generated by vpn.py -- do not edit manually.",
|
|
"# Run: sudo python3 vpn.py --apply",
|
|
"",
|
|
"[Interface]",
|
|
f"PrivateKey = {server_private_key}",
|
|
f"Address = {server_ip}",
|
|
f"ListenPort = {listen_port}",
|
|
"",
|
|
]
|
|
for peer in peers:
|
|
if peer.get("enabled") is True:
|
|
lines += [
|
|
f"# {peer['name']}",
|
|
"[Peer]",
|
|
f"PublicKey = {peer['public_key']}",
|
|
f"AllowedIPs = {peer['ip']}",
|
|
"",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Live peer sync
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def sync_peers_live(iface, peers):
|
|
"""
|
|
Sync live WireGuard peers to match the dotfile state without a service
|
|
restart. Adds peers present in the dotfile (enabled) but not live;
|
|
removes peers live but not in the enabled dotfile set.
|
|
"""
|
|
result = run(["wg", "show", iface, "dump"], check=False)
|
|
if result.returncode != 0:
|
|
return # interface not up yet
|
|
|
|
lines = result.stdout.strip().splitlines()
|
|
live_keys = set()
|
|
for line in lines[1:]: # first line is the server interface itself
|
|
parts = line.split("\t")
|
|
if parts:
|
|
live_keys.add(parts[0])
|
|
|
|
enabled_peers = {
|
|
p["public_key"]: p
|
|
for p in peers
|
|
if p.get("enabled") is True
|
|
}
|
|
dotfile_keys = set(enabled_peers.keys())
|
|
|
|
for key in dotfile_keys - live_keys:
|
|
peer = enabled_peers[key]
|
|
run(["wg", "set", iface, "peer", key, "allowed-ips", peer["ip"]])
|
|
print(f" Added peer: {peer['name']} ({peer['ip']})")
|
|
|
|
for key in live_keys - dotfile_keys:
|
|
run(["wg", "set", iface, "peer", key, "remove"])
|
|
print(f" Removed peer: {key[:16]}...")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Interface selection
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def validate_wg_vlans(wg_vlans):
|
|
"""Die with a clear message if any wg VLAN is missing a valid vpn_information block."""
|
|
for vlan in wg_vlans:
|
|
iface = vlan.get("interface", "?")
|
|
info = vlan.get("vpn_information")
|
|
if not isinstance(info, dict):
|
|
die(f"Interface '{iface}' is missing a vpn_information block in core.json. "
|
|
f"Add: \"vpn_information\": {{\"listen_port\": 51820, \"gateway\": \"...\"}}")
|
|
if not isinstance(info.get("listen_port"), int):
|
|
die(f"Interface '{iface}' vpn_information is missing a valid listen_port in core.json.")
|
|
if not info.get("gateway"):
|
|
die(f"Interface '{iface}' vpn_information is missing gateway in core.json.")
|
|
|
|
def pick_wg_interface(wg_vlans):
|
|
"""
|
|
If only one wg interface exists, return it immediately.
|
|
Otherwise, print a numbered list and prompt the user to pick.
|
|
"""
|
|
if len(wg_vlans) == 1:
|
|
return wg_vlans[0]
|
|
|
|
print("Available WireGuard interfaces:")
|
|
for i, vlan in enumerate(wg_vlans, 1):
|
|
lp = vpi(vlan)["listen_port"]
|
|
print(f" {i}. {vlan['interface']} ({vlan['name']}, port {lp})")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("Select interface number: ").strip()
|
|
try:
|
|
idx = int(choice) - 1
|
|
if 0 <= idx < len(wg_vlans):
|
|
return wg_vlans[idx]
|
|
except ValueError:
|
|
pass
|
|
print(" Invalid selection.")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --add-peer
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_add_peer(dhcp_data):
|
|
check_root()
|
|
check_wireguard_tools()
|
|
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
vlan = pick_wg_interface(wg_vlans)
|
|
iface = vlan["interface"]
|
|
peers = load_peers(iface)
|
|
|
|
# -- Resolve endpoint -------------------------------------------------------
|
|
endpoint = resolve_endpoint(vpi(vlan)["listen_port"])
|
|
|
|
# -- Peer name -------------------------------------------------------------
|
|
print()
|
|
while True:
|
|
name = input("Peer name (e.g. norman-laptop): ").strip()
|
|
if not name:
|
|
print(" Name cannot be empty.")
|
|
continue
|
|
if any(p["name"] == name for p in peers):
|
|
print(f" A peer named '{name}' already exists for {iface}.")
|
|
continue
|
|
break
|
|
|
|
# -- Peer IP ---------------------------------------------------------------
|
|
prefill = next_available_ip(vlan, peers)
|
|
print(f"Peer IP [{prefill}]: ", end="", flush=True)
|
|
ip_input = input().strip()
|
|
peer_ip = ip_input if ip_input else prefill
|
|
|
|
d = vpi(vlan)
|
|
gateway = d["gateway"]
|
|
network = ipaddress.IPv4Network(f"{gateway}/24", strict=False)
|
|
try:
|
|
iface_ip = ipaddress.IPv4Interface(peer_ip)
|
|
if iface_ip.ip not in network:
|
|
die(f"IP '{peer_ip}' is not within VPN subnet {network}.")
|
|
if iface_ip.ip == ipaddress.IPv4Address(gateway):
|
|
die(f"IP '{peer_ip}' is the server gateway. Choose a different IP.")
|
|
for p in peers:
|
|
if ipaddress.IPv4Interface(p["ip"]).ip == iface_ip.ip:
|
|
die(f"IP '{peer_ip}' is already assigned to peer '{p['name']}'.")
|
|
except ValueError as e:
|
|
die(f"Invalid IP '{peer_ip}': {e}")
|
|
|
|
# -- Generate keypair -------------------------------------------------------
|
|
print(f"\nGenerating keypair for '{name}'...")
|
|
private_key, public_key = generate_peer_keypair()
|
|
|
|
# -- Ensure server key exists -----------------------------------------------
|
|
kf = server_key_path(iface)
|
|
if not kf.exists():
|
|
print(f"Server private key not found for {iface} -- generating now...")
|
|
generate_server_key(iface)
|
|
server_public_key = get_server_public_key(iface)
|
|
|
|
# -- Split tunnel prompt ---------------------------------------------------
|
|
print()
|
|
st_input = input("Split tunnel? Route only VPN subnets (not all traffic) through WireGuard. [Y/n]: ").strip().lower()
|
|
split_tunnel = st_input != "n"
|
|
|
|
if split_tunnel:
|
|
allowed_ips = ", ".join(split_tunnel_routes(dhcp_data))
|
|
else:
|
|
allowed_ips = "0.0.0.0/0"
|
|
|
|
info = vpi(vlan)
|
|
dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway
|
|
domain = info.get("domain", "")
|
|
mtu_override = info.get("explicit_overrides", {}).get("mtu", "")
|
|
mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data)
|
|
|
|
# -- Add peer to dotfile ----------------------------------------------------
|
|
new_peer = {"name": name, "ip": peer_ip, "public_key": public_key,
|
|
"split_tunnel": split_tunnel, "enabled": True}
|
|
peers.append(new_peer)
|
|
save_peers(iface, peers)
|
|
print(f"Peer '{name}' added to {dotfile_path(iface).name} (ip: {peer_ip}).")
|
|
|
|
# -- Write client config ----------------------------------------------------
|
|
conf_path = write_client_conf(new_peer, private_key, server_public_key,
|
|
endpoint, allowed_ips, dns, domain, mtu)
|
|
|
|
private_key = "0" * len(private_key)
|
|
del private_key
|
|
|
|
# -- Instructions -----------------------------------------------------------
|
|
print()
|
|
print("=" * 68)
|
|
print(f" Client config written: {conf_path}")
|
|
print()
|
|
print(" NEXT STEPS:")
|
|
print(f" 1. Transfer {conf_path.name} to '{name}' by secure means")
|
|
print(" (encrypted email, USB drive, etc.).")
|
|
print(" 2. The recipient imports it into their WireGuard app.")
|
|
print(f" 3. Delete {conf_path} from this server once transferred.")
|
|
print()
|
|
print(" WARNING: This file contains the peer's private key.")
|
|
print(" Do not leave it on this server longer than necessary.")
|
|
print("=" * 68)
|
|
print()
|
|
print(" To apply changes to WireGuard, run:")
|
|
print(" sudo python3 vpn.py --apply")
|
|
print()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --list-peers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_list_peers(dhcp_data):
|
|
check_root()
|
|
check_wireguard_tools()
|
|
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
# -- Collect all peers across all interfaces --------------------------------
|
|
# Each entry: (iface, peer_dict, vlan_dict, peers_list)
|
|
all_entries = []
|
|
for vlan in wg_vlans:
|
|
iface = vlan["interface"]
|
|
peers = load_peers(iface)
|
|
for peer in peers:
|
|
all_entries.append((iface, peer, vlan, peers))
|
|
|
|
if not all_entries:
|
|
print("No peers found across any WireGuard interface.")
|
|
return
|
|
|
|
# -- List peers -------------------------------------------------------------
|
|
print("Peers:")
|
|
for i, (iface, peer, _, _) in enumerate(all_entries, 1):
|
|
status = "enabled" if peer.get("enabled") else "disabled"
|
|
print(f" {i}. [{iface}] {peer['name']} {peer['ip']} [{status}]")
|
|
print()
|
|
|
|
while True:
|
|
choice = input("Select peer number (or Enter to cancel): ").strip()
|
|
if not choice:
|
|
return
|
|
try:
|
|
idx = int(choice) - 1
|
|
if 0 <= idx < len(all_entries):
|
|
break
|
|
except ValueError:
|
|
pass
|
|
print(" Invalid selection.")
|
|
|
|
iface, peer, vlan, peers = all_entries[idx]
|
|
print(f"\nSelected: {peer['name']} on {iface}")
|
|
print(" 1. Rename")
|
|
print(" 2. Regenerate keys")
|
|
print(" 3. Delete")
|
|
print(" 4. Cancel")
|
|
print()
|
|
|
|
action = input("Select action: ").strip()
|
|
modified = False
|
|
|
|
if action == "1":
|
|
# -- Rename ----------------------------------------------------------------
|
|
while True:
|
|
new_name = input(f"New name [{peer['name']}]: ").strip()
|
|
if not new_name:
|
|
print(" Name cannot be empty.")
|
|
continue
|
|
if any(p["name"] == new_name for p in peers if p is not peer):
|
|
print(f" A peer named '{new_name}' already exists for {iface}.")
|
|
continue
|
|
break
|
|
|
|
old_conf = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
|
|
new_conf = SCRIPT_DIR / f"vpn-client-{new_name}.conf"
|
|
if old_conf.exists():
|
|
old_conf.rename(new_conf)
|
|
print(f" Renamed client config: {old_conf.name} -> {new_conf.name}")
|
|
|
|
peer["name"] = new_name
|
|
save_peers(iface, peers)
|
|
print(f"Peer renamed to '{new_name}'.")
|
|
modified = True
|
|
|
|
elif action == "2":
|
|
# -- Regenerate keys -------------------------------------------------------
|
|
kf = server_key_path(iface)
|
|
if not kf.exists():
|
|
die(f"Server private key not found at {kf}. Run --apply first.")
|
|
|
|
endpoint = resolve_endpoint(vpi(vlan)["listen_port"])
|
|
print(f"\nRegenerating keypair for '{peer['name']}'...")
|
|
private_key, public_key = generate_peer_keypair()
|
|
server_public_key = get_server_public_key(iface)
|
|
|
|
if peer.get("split_tunnel", True):
|
|
allowed_ips = ", ".join(split_tunnel_routes(dhcp_data))
|
|
else:
|
|
allowed_ips = "0.0.0.0/0"
|
|
|
|
info = vpi(vlan)
|
|
gateway = info["gateway"]
|
|
dns = info.get("explicit_overrides", {}).get("dns_server", "") or gateway
|
|
domain = info.get("domain", "")
|
|
mtu_override = info.get("explicit_overrides", {}).get("mtu", "")
|
|
mtu = int(mtu_override) if mtu_override else sense_mtu(dhcp_data)
|
|
|
|
peer["public_key"] = public_key
|
|
save_peers(iface, peers)
|
|
|
|
conf_path = write_client_conf(peer, private_key, server_public_key,
|
|
endpoint, allowed_ips, dns, domain, mtu)
|
|
|
|
private_key = "0" * len(private_key)
|
|
del private_key
|
|
|
|
print()
|
|
print("=" * 68)
|
|
print(f" New client config written: {conf_path}")
|
|
print()
|
|
print(" WARNING: This file contains the peer's private key.")
|
|
print(" The previous client config is now invalid.")
|
|
print(" Transfer the new config and delete it from this server.")
|
|
print("=" * 68)
|
|
modified = True
|
|
|
|
elif action == "3":
|
|
# -- Delete ----------------------------------------------------------------
|
|
confirm = input(f"\nDelete peer '{peer['name']}' from {iface}? [y/N]: ").strip().lower()
|
|
if confirm != "y":
|
|
print("Cancelled.")
|
|
return
|
|
|
|
peers[:] = [p for p in peers if p["name"] != peer["name"]]
|
|
save_peers(iface, peers)
|
|
|
|
conf_path = SCRIPT_DIR / f"vpn-client-{peer['name']}.conf"
|
|
if conf_path.exists():
|
|
print(f" NOTE: Client config {conf_path.name} still exists on this server.")
|
|
print(f" It is now invalid and should be deleted.")
|
|
|
|
print(f"Peer '{peer['name']}' deleted from {iface}.")
|
|
modified = True
|
|
|
|
elif action == "4":
|
|
print("Cancelled.")
|
|
return
|
|
|
|
else:
|
|
die("Invalid action.")
|
|
|
|
if modified:
|
|
print()
|
|
print(" To apply changes to WireGuard, run:")
|
|
print(" sudo python3 vpn.py --apply")
|
|
print()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --apply
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_apply(dhcp_data):
|
|
check_root()
|
|
check_wireguard_tools()
|
|
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
for vlan in wg_vlans:
|
|
iface = vlan["interface"]
|
|
print(f"-- {iface} " + "-" * 58)
|
|
|
|
peers = load_peers(iface)
|
|
|
|
# -- Ensure server key -----------------------------------------------
|
|
kf = server_key_path(iface)
|
|
if not kf.exists():
|
|
print(f" Generating server private key for {iface}...")
|
|
generate_server_key(iface)
|
|
else:
|
|
print(f" Using existing server key: {kf}")
|
|
|
|
server_private_key = kf.read_text().strip()
|
|
server_public_key = get_server_public_key(iface)
|
|
|
|
# -- Write wg conf ---------------------------------------------------
|
|
WG_DIR.mkdir(exist_ok=True)
|
|
conf_file = wg_conf_path(iface)
|
|
new_conf = build_wg_conf(vlan, peers, server_private_key)
|
|
listen_port = vpi(vlan)["listen_port"]
|
|
|
|
# Detect whether the listen port has changed in the existing conf
|
|
port_changed = False
|
|
if conf_file.exists():
|
|
for line in conf_file.read_text().splitlines():
|
|
if line.startswith("ListenPort"):
|
|
old_port = line.split("=")[1].strip()
|
|
port_changed = old_port != str(listen_port)
|
|
break
|
|
|
|
conf_file.write_text(new_conf)
|
|
conf_file.chmod(0o600)
|
|
print(f" Written: {conf_file}")
|
|
|
|
# -- Start or sync service -------------------------------------------
|
|
svc = f"wg-quick@{iface}"
|
|
result = run(["systemctl", "is-active", svc], check=False)
|
|
|
|
if result.stdout.strip() == "active":
|
|
if port_changed:
|
|
print(f" Listen port changed -- restarting {svc}...")
|
|
result2 = run(["systemctl", "restart", svc], check=False)
|
|
if result2.returncode != 0:
|
|
die(f"Failed to restart {svc}:\n{result2.stderr.strip()}")
|
|
print(f" Service {svc} restarted.")
|
|
else:
|
|
print(f" Service {svc} is active -- syncing peers live...")
|
|
sync_peers_live(iface, peers)
|
|
else:
|
|
result2 = run(["systemctl", "enable", "--now", svc], check=False)
|
|
if result2.returncode != 0:
|
|
result3 = run(["systemctl", "reload-or-restart", svc], check=False)
|
|
if result3.returncode != 0:
|
|
die(f"Failed to start {svc}:\n{result3.stderr.strip()}")
|
|
print(f" Service {svc} enabled and started.")
|
|
|
|
# -- Summary ---------------------------------------------------------
|
|
enabled_peers = [p for p in peers if p.get("enabled") is True]
|
|
print(f" Server public key: {server_public_key}")
|
|
print(f" Listen port: UDP {vpi(vlan)['listen_port']}")
|
|
print(f" Enabled peers: {len(enabled_peers)}")
|
|
for p in enabled_peers:
|
|
print(f" {p['ip']:<22} {p['name']}")
|
|
print()
|
|
|
|
# -- Apply core config to pick up VPN firewall rules ---------------------
|
|
core_py = SCRIPT_DIR / "core.py"
|
|
if core_py.exists():
|
|
print("-- Applying core config (core.py --apply) ----------------------------")
|
|
result = subprocess.run(
|
|
[sys.executable, str(core_py), "--apply"],
|
|
capture_output=False
|
|
)
|
|
if result.returncode != 0:
|
|
print("WARNING: core.py --apply returned non-zero. Check output above.")
|
|
else:
|
|
print(f"WARNING: {core_py} not found -- run core.py --apply manually to load VPN firewall rules.")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --disable
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_disable(dhcp_data):
|
|
check_root()
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
for vlan in wg_vlans:
|
|
iface = vlan["interface"]
|
|
svc = f"wg-quick@{iface}"
|
|
result = run(["systemctl", "disable", "--now", svc], check=False)
|
|
if result.returncode != 0:
|
|
print(f"WARNING: {svc} may not have been running:\n{result.stderr.strip()}")
|
|
else:
|
|
print(f"WireGuard service {svc} stopped and disabled.")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --status
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_status(dhcp_data):
|
|
check_root()
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
print(f" {'UNIT':<45} {'ACTIVE':<12} {'ENABLED'}")
|
|
print(f" {'-'*45} {'-'*12} {'-'*10}")
|
|
|
|
for vlan in wg_vlans:
|
|
iface = vlan["interface"]
|
|
svc = f"wg-quick@{iface}"
|
|
|
|
r_active = run(["systemctl", "is-active", svc], check=False)
|
|
r_enabled = run(["systemctl", "is-enabled", svc], check=False)
|
|
active = r_active.stdout.strip()
|
|
enabled = r_enabled.stdout.strip()
|
|
active_sym = "✓" if active == "active" else "✗"
|
|
enabled_sym = "✓" if enabled == "enabled" else "✗"
|
|
print(f" {svc:<45} {active_sym} {active:<10} {enabled_sym} {enabled}")
|
|
|
|
if active == "active":
|
|
result = run(["wg", "show", iface], check=False)
|
|
if result.returncode == 0:
|
|
info = {}
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("public key:"):
|
|
info["pubkey"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("listening port:"):
|
|
info["port"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("peer:"):
|
|
info.setdefault("peers", 0)
|
|
info["peers"] += 1
|
|
if "pubkey" in info:
|
|
print(f" public key: {info['pubkey']}")
|
|
if "port" in info:
|
|
print(f" listening port: {info['port']}")
|
|
peers = load_peers(iface)
|
|
enabled_peers = [p for p in peers if p.get("enabled") is True]
|
|
print(f" peers: {len(enabled_peers)} configured, {info.get('peers', 0)} connected")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# --logs
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def cmd_logs(dhcp_data):
|
|
check_root()
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
for vlan in wg_vlans:
|
|
iface = vlan["interface"]
|
|
peers = load_peers(iface)
|
|
peer_by_key = {p["public_key"]: p["name"] for p in peers}
|
|
|
|
print(f"-- {iface} " + "-" * 58)
|
|
|
|
result = run(["wg", "show", iface, "dump"], check=False)
|
|
if result.returncode != 0:
|
|
print(f" WireGuard interface '{iface}' is not up.")
|
|
print()
|
|
continue
|
|
|
|
lines = result.stdout.strip().splitlines()
|
|
peer_lines = lines[1:] # first line is the server interface itself
|
|
|
|
if not peer_lines:
|
|
print(" No peers currently configured.")
|
|
print()
|
|
continue
|
|
|
|
print(f" {'PEER':<22} {'IP':<20} {'ENDPOINT':<26} {'LAST HANDSHAKE':<22} {'RX':<12} {'TX'}")
|
|
print(" " + "-" * 106)
|
|
|
|
for ln in peer_lines:
|
|
parts = ln.split("\t")
|
|
if len(parts) < 6:
|
|
continue
|
|
pub_key = parts[0]
|
|
endpoint = parts[2] if parts[2] != "(none)" else "not connected"
|
|
allowed_ips = parts[3]
|
|
last_hs_ts = parts[4]
|
|
rx_bytes = int(parts[5])
|
|
tx_bytes = int(parts[6]) if len(parts) > 6 else 0
|
|
|
|
name = peer_by_key.get(pub_key, pub_key[:12] + "...")
|
|
|
|
if last_hs_ts == "0":
|
|
last_hs = "never"
|
|
else:
|
|
ts = int(last_hs_ts)
|
|
hs_dt = datetime.fromtimestamp(ts, tz=timezone.utc)
|
|
delta = now - hs_dt
|
|
seconds = int(delta.total_seconds())
|
|
if seconds < 60:
|
|
last_hs = f"{seconds}s ago"
|
|
elif seconds < 3600:
|
|
last_hs = f"{seconds // 60}m ago"
|
|
elif seconds < 86400:
|
|
last_hs = f"{seconds // 3600}h ago"
|
|
else:
|
|
last_hs = f"{seconds // 86400}d ago"
|
|
|
|
rx_str = _fmt_bytes(rx_bytes)
|
|
tx_str = _fmt_bytes(tx_bytes)
|
|
display_ip = allowed_ips.split(",")[0].strip()
|
|
|
|
print(f" {name:<22} {display_ip:<20} {endpoint:<26} {last_hs:<22} {rx_str:<12} {tx_str}")
|
|
|
|
print()
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Main
|
|
# ------------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Manage WireGuard VPN server and peers",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=(
|
|
"examples:\n"
|
|
" sudo python3 vpn.py --add-peer Add a new peer interactively\n"
|
|
" sudo python3 vpn.py --manage-peers List and manage existing peers\n"
|
|
" sudo python3 vpn.py --apply Write WireGuard config and sync peers\n"
|
|
" sudo python3 vpn.py --disable Stop WireGuard on all interfaces\n"
|
|
" sudo python3 vpn.py --status Show WireGuard service and interface status\n"
|
|
" sudo python3 vpn.py --view-peers Show per-peer handshake and traffic stats\n"
|
|
)
|
|
)
|
|
parser.add_argument("--add-peer", action="store_true",
|
|
help="Add a new peer interactively")
|
|
parser.add_argument("--manage-peers", action="store_true",
|
|
help="List and manage existing peers")
|
|
parser.add_argument("--apply", action="store_true",
|
|
help="Write WireGuard config and sync peers")
|
|
parser.add_argument("--disable", action="store_true",
|
|
help="Stop WireGuard on all interfaces")
|
|
parser.add_argument("--status", action="store_true",
|
|
help="Show WireGuard service and interface status")
|
|
parser.add_argument("--view-peers", action="store_true",
|
|
help="Show per-peer handshake and traffic stats")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not any([args.add_peer, args.manage_peers, args.apply,
|
|
args.disable, args.status, args.view_peers]):
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
dhcp_data = load_dhcp()
|
|
wg_vlans = wg_interfaces(dhcp_data)
|
|
if not wg_vlans:
|
|
die("No WireGuard interfaces (wg*) found in core.json.")
|
|
validate_wg_vlans(wg_vlans)
|
|
|
|
if args.add_peer:
|
|
cmd_add_peer(dhcp_data)
|
|
elif args.manage_peers:
|
|
cmd_list_peers(dhcp_data)
|
|
elif args.apply:
|
|
cmd_apply(dhcp_data)
|
|
elif args.disable:
|
|
cmd_disable(dhcp_data)
|
|
elif args.status:
|
|
cmd_status(dhcp_data)
|
|
elif args.view_peers:
|
|
cmd_logs(dhcp_data)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|