229 lines
8.7 KiB
Python
229 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
create_vpn_peer.py -- Add a WireGuard peer to config.json and write the client .conf file.
|
|
|
|
Generates a fresh keypair, appends the peer to the specified WireGuard VLAN in config.json,
|
|
and saves a ready-to-import client config file.
|
|
|
|
Use --iface or --vlan-id to select the target VLAN. If the config contains exactly one
|
|
WireGuard VLAN, both flags are optional and it is selected automatically.
|
|
|
|
Run core.py --apply after adding peers to sync the changes to the live interface.
|
|
|
|
Usage:
|
|
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2
|
|
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2 --iface wg0
|
|
python3 create_vpn_peer.py --name laptop --ip 192.168.40.2 --vlan-id 40
|
|
python3 create_vpn_peer.py --name phone --ip 192.168.40.3 --split-tunnel
|
|
python3 create_vpn_peer.py --name tablet --ip 192.168.40.4 --output ~/tablet.conf
|
|
"""
|
|
|
|
import argparse
|
|
import ipaddress
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
CONFIG_FILE = SCRIPT_DIR / "config.json"
|
|
|
|
|
|
def die(msg):
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def load_config():
|
|
if not CONFIG_FILE.exists():
|
|
die(f"Config file not found: {CONFIG_FILE}")
|
|
with open(CONFIG_FILE) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_config(data):
|
|
with open(CONFIG_FILE, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def resolve_wg_iface(vlan, data):
|
|
"""Return wg0, wg1, ... based on position among is_vpn VLANs."""
|
|
wg_vlans = [v for v in data.get("vlans", []) if v.get("is_vpn")]
|
|
idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0)
|
|
return f"wg{idx}"
|
|
|
|
|
|
def find_wg_vlan(data, iface=None, vlan_id=None):
|
|
"""Return the target WireGuard VLAN, or die with a helpful message."""
|
|
wg_vlans = [v for v in data.get("vlans", []) if v.get("is_vpn")]
|
|
|
|
if iface is not None:
|
|
vlan = next((v for v in wg_vlans if resolve_wg_iface(v, data) == iface), None)
|
|
if vlan is None:
|
|
known = ", ".join(resolve_wg_iface(v, data) for v in wg_vlans) or "none"
|
|
die(f"No WireGuard VLAN with interface '{iface}' found in config.json. "
|
|
f"Known WireGuard interfaces: {known}.")
|
|
return vlan
|
|
|
|
if vlan_id is not None:
|
|
vlan = next((v for v in wg_vlans if v.get("vlan_id") == vlan_id), None)
|
|
if vlan is None:
|
|
known = ", ".join(
|
|
f"{v['vlan_id']} ({resolve_wg_iface(v, data)})" for v in wg_vlans
|
|
) or "none"
|
|
die(f"No WireGuard VLAN with vlan_id {vlan_id} found in config.json. "
|
|
f"Known WireGuard VLANs: {known}.")
|
|
return vlan
|
|
|
|
if not wg_vlans:
|
|
die("No WireGuard VLANs found in config.json. "
|
|
"Add a VLAN with is_vpn set to true.")
|
|
if len(wg_vlans) > 1:
|
|
options = " " + "\n ".join(
|
|
f"--iface {resolve_wg_iface(v, data)} or --vlan-id {v['vlan_id']} ({v.get('name', '?')})"
|
|
for v in wg_vlans
|
|
)
|
|
die(f"Multiple WireGuard VLANs found. Specify one:\n{options}")
|
|
return wg_vlans[0]
|
|
|
|
|
|
def server_pubkey(iface):
|
|
path = SCRIPT_DIR / f".wg-{iface}.pub"
|
|
if not path.exists():
|
|
die(
|
|
f"Server public key not found: {path}\n"
|
|
f"Run 'sudo python3 core.py --apply' first to generate the server keypair."
|
|
)
|
|
return path.read_text().strip()
|
|
|
|
|
|
def generate_keypair():
|
|
try:
|
|
private = subprocess.run(
|
|
["wg", "genkey"], capture_output=True, text=True, check=True
|
|
).stdout.strip()
|
|
public = subprocess.run(
|
|
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
|
|
).stdout.strip()
|
|
return private, public
|
|
except FileNotFoundError:
|
|
die("'wg' not found. Install wireguard-tools: sudo apt install wireguard-tools")
|
|
except subprocess.CalledProcessError as e:
|
|
die(f"Key generation failed: {e.stderr.strip()}")
|
|
|
|
|
|
def build_client_conf(vlan, peer_ip, private_key, server_pub, split_tunnel):
|
|
info = vlan.get("vpn_information", {})
|
|
overrides = info.get("explicit_overrides", {})
|
|
subnet = vlan["subnet"]
|
|
mask = vlan["subnet_mask"]
|
|
network = ipaddress.IPv4Network(f"{subnet}/{mask}", strict=False)
|
|
ident_ips = [s["ip"] for s in vlan.get("server_identities", []) if s.get("ip")]
|
|
default = str(min((ipaddress.IPv4Address(ip) for ip in ident_ips),
|
|
key=lambda x: x.packed[-1])) if ident_ips else str(next(network.hosts()))
|
|
gateway = overrides.get("gateway") or default
|
|
dns = overrides.get("dns_server") or gateway
|
|
prefix = network.prefixlen
|
|
mtu = overrides.get("mtu", "")
|
|
endpoint = info.get("server_endpoint", "")
|
|
listen_port = info.get("listen_port", 51820)
|
|
|
|
allowed_ips = f"{subnet}/{prefix}" if split_tunnel else "0.0.0.0/0"
|
|
|
|
lines = [
|
|
"# Generated by create_vpn_peer.py",
|
|
"",
|
|
"[Interface]",
|
|
f"PrivateKey = {private_key}",
|
|
f"Address = {peer_ip}/{prefix}",
|
|
f"DNS = {dns}",
|
|
]
|
|
if mtu:
|
|
lines.append(f"MTU = {mtu}")
|
|
lines += ["", "[Peer]", f"PublicKey = {server_pub}"]
|
|
if endpoint:
|
|
lines.append(f"Endpoint = {endpoint}:{listen_port}")
|
|
lines += [f"AllowedIPs = {allowed_ips}", "PersistentKeepalive = 25", ""]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Add a WireGuard peer to config.json and write the client .conf file."
|
|
)
|
|
parser.add_argument("--name", required=True, help="Peer name (e.g. laptop)")
|
|
parser.add_argument("--ip", required=True, help="Peer IP within the VPN subnet (e.g. 192.168.40.2)")
|
|
parser.add_argument("--split-tunnel", action="store_true",
|
|
help="Route only VPN subnet traffic through the tunnel (default: full tunnel)")
|
|
parser.add_argument("--output", default=None,
|
|
help="Output path for the client .conf file (default: vpn-client-<name>.conf)")
|
|
|
|
sel = parser.add_mutually_exclusive_group()
|
|
sel.add_argument("--iface", default=None, metavar="IFACE",
|
|
help="WireGuard interface to add the peer to (e.g. wg0)")
|
|
sel.add_argument("--vlan-id", default=None, type=int, metavar="ID",
|
|
help="VLAN ID of the WireGuard VLAN to add the peer to (e.g. 40)")
|
|
args = parser.parse_args()
|
|
|
|
# Validate IP =======================================================
|
|
try:
|
|
peer_ip = str(ipaddress.IPv4Address(args.ip))
|
|
except ValueError:
|
|
die(f"'{args.ip}' is not a valid IPv4 address.")
|
|
|
|
# Load config and find WG VLAN ==========================================
|
|
data = load_config()
|
|
vlan = find_wg_vlan(data, iface=args.iface, vlan_id=args.vlan_id)
|
|
|
|
iface = resolve_wg_iface(vlan, data)
|
|
|
|
# Validate peer IP is within subnet =================================
|
|
try:
|
|
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
|
except (KeyError, ValueError) as e:
|
|
die(f"Invalid subnet in WireGuard VLAN: {e}")
|
|
|
|
if ipaddress.IPv4Address(peer_ip) not in network:
|
|
die(f"IP {peer_ip} is not within the VPN subnet {network}.")
|
|
|
|
# Check for duplicates ==============================================
|
|
peers = vlan.setdefault("peers", [])
|
|
if any(p.get("name") == args.name for p in peers):
|
|
die(f"A peer named '{args.name}' already exists.")
|
|
if any(p.get("ip") == peer_ip for p in peers):
|
|
die(f"IP {peer_ip} is already assigned to another peer.")
|
|
|
|
# Generate keypair and read server public key =======================
|
|
print(f"Generating keypair for '{args.name}'...")
|
|
private_key, public_key = generate_keypair()
|
|
srv_pub = server_pubkey(iface)
|
|
|
|
# Update config.json ================================================
|
|
peers.append({
|
|
"name": args.name,
|
|
"ip": peer_ip,
|
|
"public_key": public_key,
|
|
"split_tunnel": args.split_tunnel,
|
|
"enabled": True,
|
|
})
|
|
save_config(data)
|
|
print(f"Added peer '{args.name}' to config.json.")
|
|
|
|
# Write client conf =================================================
|
|
conf_content = build_client_conf(vlan, peer_ip, private_key, srv_pub, args.split_tunnel)
|
|
if args.output:
|
|
out_path = Path(args.output)
|
|
else:
|
|
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in args.name)
|
|
out_path = SCRIPT_DIR / f"vpn-client-{safe}.conf"
|
|
|
|
out_path.write_text(conf_content)
|
|
print(f"Client config saved: {out_path}")
|
|
print()
|
|
print("Next steps:")
|
|
print(f" 1. Transfer {out_path.name} to the peer device by secure means, then delete it.")
|
|
print(f" 2. Run 'sudo python3 core.py --apply' to sync the new peer to the live interface.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|