linuxrouter/routlin/create_vpn_peer.py
2026-05-31 02:17:25 -04:00

229 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
create_vpn_peer.py -- Add a WireGuard peer to config.json and write the client .conf file.
Generates a fresh keypair, appends the peer to the specified WireGuard VLAN in config.json,
and saves a ready-to-import client config file.
Use --iface or --vlan-id to select the target VLAN. If the config contains exactly one
WireGuard VLAN, both flags are optional and it is selected automatically.
Run core.py --apply after adding peers to sync the changes to the live interface.
Usage:
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2 --iface wg0
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2 --vlan-id 40
python3 create_vpn_peer.py --name phone --ip 192.168.40.3 --split-tunnel
python3 create_vpn_peer.py --name tablet --ip 192.168.40.4 --output ~/tablet.conf
"""
import argparse
import ipaddress
import json
import subprocess
import sys
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR / "config.json"
def die(msg):
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
def load_config():
if not CONFIG_FILE.exists():
die(f"Config file not found: {CONFIG_FILE}")
with open(CONFIG_FILE) as f:
return json.load(f)
def save_config(data):
with open(CONFIG_FILE, "w") as f:
json.dump(data, f, indent=2)
def resolve_wg_iface(vlan, data):
"""Return wg0, wg1, ... based on position among is_vpn VLANs."""
wg_vlans = [v for v in data.get("vlans", []) if v.get("is_vpn")]
idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0)
return f"wg{idx}"
def find_wg_vlan(data, iface=None, vlan_id=None):
"""Return the target WireGuard VLAN, or die with a helpful message."""
wg_vlans = [v for v in data.get("vlans", []) if v.get("is_vpn")]
if iface is not None:
vlan = next((v for v in wg_vlans if resolve_wg_iface(v, data) == iface), None)
if vlan is None:
known = ", ".join(resolve_wg_iface(v, data) for v in wg_vlans) or "none"
die(f"No WireGuard VLAN with interface '{iface}' found in config.json. "
f"Known WireGuard interfaces: {known}.")
return vlan
if vlan_id is not None:
vlan = next((v for v in wg_vlans if v.get("vlan_id") == vlan_id), None)
if vlan is None:
known = ", ".join(
f"{v['vlan_id']} ({resolve_wg_iface(v, data)})" for v in wg_vlans
) or "none"
die(f"No WireGuard VLAN with vlan_id {vlan_id} found in config.json. "
f"Known WireGuard VLANs: {known}.")
return vlan
if not wg_vlans:
die("No WireGuard VLANs found in config.json. "
"Add a VLAN with is_vpn set to true.")
if len(wg_vlans) > 1:
options = " " + "\n ".join(
f"--iface {resolve_wg_iface(v, data)} or --vlan-id {v['vlan_id']} ({v.get('name', '?')})"
for v in wg_vlans
)
die(f"Multiple WireGuard VLANs found. Specify one:\n{options}")
return wg_vlans[0]
def server_pubkey(iface):
path = SCRIPT_DIR / f".wg-{iface}.pub"
if not path.exists():
die(
f"Server public key not found: {path}\n"
f"Run 'sudo python3 core.py --apply' first to generate the server keypair."
)
return path.read_text().strip()
def generate_keypair():
try:
private = subprocess.run(
["wg", "genkey"], capture_output=True, text=True, check=True
).stdout.strip()
public = subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
).stdout.strip()
return private, public
except FileNotFoundError:
die("'wg' not found. Install wireguard-tools: sudo apt install wireguard-tools")
except subprocess.CalledProcessError as e:
die(f"Key generation failed: {e.stderr.strip()}")
def build_client_conf(vlan, peer_ip, private_key, server_pub, split_tunnel):
info = vlan.get("vpn_information", {})
overrides = info.get("explicit_overrides", {})
subnet = vlan["subnet"]
mask = vlan["subnet_mask"]
network = ipaddress.IPv4Network(f"{subnet}/{mask}", strict=False)
ident_ips = [s["ip"] for s in vlan.get("server_identities", []) if s.get("ip")]
default = str(min((ipaddress.IPv4Address(ip) for ip in ident_ips),
key=lambda x: x.packed[-1])) if ident_ips else str(next(network.hosts()))
gateway = overrides.get("gateway") or default
dns = overrides.get("dns_servers") or gateway
prefix = network.prefixlen
mtu = overrides.get("mtu", "")
endpoint = info.get("server_endpoint", "")
listen_port = info.get("listen_port", 51820)
allowed_ips = f"{subnet}/{prefix}" if split_tunnel else "0.0.0.0/0"
lines = [
"# Generated by create_vpn_peer.py",
"",
"[Interface]",
f"PrivateKey = {private_key}",
f"Address = {peer_ip}/{prefix}",
f"DNS = {dns}",
]
if mtu:
lines.append(f"MTU = {mtu}")
lines += ["", "[Peer]", f"PublicKey = {server_pub}"]
if endpoint:
lines.append(f"Endpoint = {endpoint}:{listen_port}")
lines += [f"AllowedIPs = {allowed_ips}", "PersistentKeepalive = 25", ""]
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Add a WireGuard peer to config.json and write the client .conf file."
)
parser.add_argument("--name", required=True, help="Peer name (e.g. laptop)")
parser.add_argument("--ip", required=True, help="Peer IP within the VPN subnet (e.g. 192.168.40.2)")
parser.add_argument("--split-tunnel", action="store_true",
help="Route only VPN subnet traffic through the tunnel (default: full tunnel)")
parser.add_argument("--output", default=None,
help="Output path for the client .conf file (default: vpn-client-<name>.conf)")
sel = parser.add_mutually_exclusive_group()
sel.add_argument("--iface", default=None, metavar="IFACE",
help="WireGuard interface to add the peer to (e.g. wg0)")
sel.add_argument("--vlan-id", default=None, type=int, metavar="ID",
help="VLAN ID of the WireGuard VLAN to add the peer to (e.g. 40)")
args = parser.parse_args()
# Validate IP =======================================================
try:
peer_ip = str(ipaddress.IPv4Address(args.ip))
except ValueError:
die(f"'{args.ip}' is not a valid IPv4 address.")
# Load config and find WG VLAN ==========================================
data = load_config()
vlan = find_wg_vlan(data, iface=args.iface, vlan_id=args.vlan_id)
iface = resolve_wg_iface(vlan, data)
# Validate peer IP is within subnet =================================
try:
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
except (KeyError, ValueError) as e:
die(f"Invalid subnet in WireGuard VLAN: {e}")
if ipaddress.IPv4Address(peer_ip) not in network:
die(f"IP {peer_ip} is not within the VPN subnet {network}.")
# Check for duplicates ==============================================
peers = vlan.setdefault("peers", [])
if any(p.get("name") == args.name for p in peers):
die(f"A peer named '{args.name}' already exists.")
if any(p.get("ip") == peer_ip for p in peers):
die(f"IP {peer_ip} is already assigned to another peer.")
# Generate keypair and read server public key =======================
print(f"Generating keypair for '{args.name}'...")
private_key, public_key = generate_keypair()
srv_pub = server_pubkey(iface)
# Update config.json ================================================
peers.append({
"name": args.name,
"ip": peer_ip,
"public_key": public_key,
"split_tunnel": args.split_tunnel,
"enabled": True,
})
save_config(data)
print(f"Added peer '{args.name}' to config.json.")
# Write client conf =================================================
conf_content = build_client_conf(vlan, peer_ip, private_key, srv_pub, args.split_tunnel)
if args.output:
out_path = Path(args.output)
else:
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in args.name)
out_path = SCRIPT_DIR / f"vpn-client-{safe}.conf"
out_path.write_text(conf_content)
print(f"Client config saved: {out_path}")
print()
print("Next steps:")
print(f" 1. Transfer {out_path.name} to the peer device by secure means, then delete it.")
print(f" 2. Run 'sudo python3 core.py --apply' to sync the new peer to the live interface.")
if __name__ == "__main__":
main()