from pathlib import Path import base64 import copy import ipaddress import re from flask import Blueprint, make_response, redirect, flash, request from auth import require_level from config_utils import load_config, record_group, diff_fields, verify_config_hash, CONFIGS_DIR, WEB_APP_DISPLAY_NAME import sanitize import validation as validate _PAGE = Path(__file__).parent.name bp = Blueprint(_PAGE, __name__) _MTU_MIN = 576 _MTU_MAX = 9000 def _wg_vlan(cfg): return next((v for v in cfg.get('vlans', []) if v.get('is_vpn')), None) def _wg_vlan_by_name(cfg, name): return next((v for v in cfg.get('vlans', []) if v.get('is_vpn') and v.get('name') == name), None) def _find_peer_by_flat_idx(cfg, flat_idx): i = 0 for vlan in cfg.get('vlans', []): if not vlan.get('is_vpn'): continue peers = vlan.get('peers', []) for j in range(len(peers)): if i == flat_idx: return vlan, j i += 1 return None, None def _wg_iface(vlan, cfg): wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')] idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0) return f'wg{idx}' def _row_index(): try: return int(request.form.get('row_index', '')) except (ValueError, TypeError): return None def _hash_ok(): if not verify_config_hash(request.form.get('config_hash', '')): flash('Configuration was modified by another session. Please refresh and try again.', 'error') return False return True def _generate_wg_keypair(): from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, PrivateFormat, NoEncryption, ) private = X25519PrivateKey.generate() priv_raw = private.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) pub_raw = private.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) return base64.b64encode(priv_raw).decode(), base64.b64encode(pub_raw).decode() def _server_pubkey(iface): try: with open(f'{CONFIGS_DIR}/.{iface}.pub') as f: return f.read().strip() except OSError: return None def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey): info = vlan.get('vpn_information', {}) overrides = info.get('explicit_overrides', {}) subnet = vlan.get('subnet', '') mask = vlan.get('subnet_mask', '') network = ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False) ident_ips = [s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')] default = str(min((ipaddress.IPv4Address(ip) for ip in ident_ips), key=lambda x: x.packed[-1])) if ident_ips else str(next(network.hosts())) gateway = overrides.get('gateway') or default dns = overrides.get('dns_servers') or gateway prefix = network.prefixlen mtu = overrides.get('mtu', '') endpoint = info.get('server_endpoint', '') listen_port = info.get('listen_port', 51820) split_tunnel = next( (p.get('split_tunnel', False) for p in vlan.get('peers', []) if p.get('name') == peer_name), False, ) allowed_ips = f'{subnet}/{prefix}' if split_tunnel else '0.0.0.0/0' lines = [ f'# Generated by {WEB_APP_DISPLAY_NAME}', '', '[Interface]', f'PrivateKey = {private_key}', f'Address = {peer_ip}/{prefix}', f'DNS = {dns}', ] if mtu: lines.append(f'MTU = {mtu}') lines += ['', '[Peer]', f'PublicKey = {server_pubkey}'] if endpoint: lines.append(f'Endpoint = {endpoint}:{listen_port}') lines += [f'AllowedIPs = {allowed_ips}', 'PersistentKeepalive = 25', ''] return '\n'.join(lines) def _conf_response(vlan, peer_name, peer_ip, private_key): cfg = load_config() iface = _wg_iface(vlan, cfg) server_pub = _server_pubkey(iface) if not server_pub: flash('Peer saved. Run sudo python3 ~/routlin/core.py --apply to generate the server ' 'public key, then regenerate this peer to download the client config.', 'warning') return redirect(f'/{_PAGE}') conf = _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pub) safe = re.sub(r'[^A-Za-z0-9_\-]', '_', peer_name) resp = make_response(conf) resp.headers['Content-Type'] = 'text/plain; charset=utf-8' resp.headers['Content-Disposition'] = f'attachment; filename="vpn-client-{safe}.conf"' return resp @bp.route('/action/vpn/wireguard_apply', methods=['POST']) @require_level('administrator') def wireguard_apply(): listen_port_raw = request.form.get('vpn_listen_port', '').strip() server_endpoint = validate.domainname(request.form.get('vpn_server_endpoint', '')) domain = validate.domainname(request.form.get('vpn_domain', '')) dns_raw = request.form.get('vpn_dns_servers', '').strip() mtu_raw = request.form.get('vpn_mtu', '').strip() if not listen_port_raw: flash('Listen port is required.', 'error') return redirect(f'/{_PAGE}') listen_port = validate.int_range(listen_port_raw, 1, 65535) if listen_port is None: flash(f'"{listen_port_raw}" is not a valid port number (1-65535).', 'error') return redirect(f'/{_PAGE}') dns_server = '' if dns_raw: dns_server = validate.ip(dns_raw) if not dns_server: flash(f'"{dns_raw}" is not a valid IP address for DNS server.', 'error') return redirect(f'/{_PAGE}') mtu = None if mtu_raw: mtu = validate.int_range(mtu_raw, _MTU_MIN, _MTU_MAX) if mtu is None: flash(f'"{mtu_raw}" is not a valid MTU (must be {_MTU_MIN}-{_MTU_MAX}).', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vpn_vlan = _wg_vlan(cfg) if vpn_vlan is None: flash('No WireGuard VLAN found in configuration.', 'error') return redirect(f'/{_PAGE}') for v in cfg.get('vlans', []): if v.get('is_vpn') and v is not vpn_vlan and v.get('vpn_information', {}).get('listen_port') == listen_port: flash(f'Listen port {listen_port} is already used by another VPN VLAN.', 'error') return redirect(f'/{_PAGE}') before_info = copy.deepcopy(vpn_vlan.get('vpn_information', {})) info = vpn_vlan.setdefault('vpn_information', {}) info['listen_port'] = listen_port info['server_endpoint'] = server_endpoint info['domain'] = domain overrides = info.setdefault('explicit_overrides', {}) if dns_server: overrides['dns_servers'] = dns_server else: overrides.pop('dns_servers', None) if mtu is not None: overrides['mtu'] = mtu else: overrides.pop('mtu', None) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') vlan_name = vpn_vlan['name'] changes = diff_fields(before_info, info) flash(record_group(cfg, f'vlans[name={vlan_name}].vpn_information', None, None, changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/vpn/addpeer_add', methods=['POST']) @require_level('administrator') def addpeer_add(): peer_name = sanitize.name(request.form.get('peer_name', '')) peer_vlan_nm = request.form.get('peer_vlan', '').strip() peer_ip_raw = request.form.get('peer_ip', '').strip() split_tunnel = 'split_tunnel' in request.form enabled = 'enabled' in request.form if not peer_name: flash('Peer name is required.', 'error') return redirect(f'/{_PAGE}') if not peer_vlan_nm: flash('Assigned VLAN is required.', 'error') return redirect(f'/{_PAGE}') peer_ip = validate.ip(peer_ip_raw) if not peer_ip: flash(f'"{peer_ip_raw}" is not a valid IP address.', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vpn_vlan = _wg_vlan_by_name(cfg, peer_vlan_nm) if vpn_vlan is None: flash(f'VPN VLAN "{peer_vlan_nm}" not found.', 'error') return redirect(f'/{_PAGE}') try: network = ipaddress.IPv4Network(f"{vpn_vlan['subnet']}/{vpn_vlan['subnet_mask']}", strict=False) if ipaddress.IPv4Address(peer_ip) not in network: flash(f'{peer_ip} is not within the subnet {vpn_vlan["subnet"]}/{vpn_vlan["subnet_mask"]} of {peer_vlan_nm}.', 'error') return redirect(f'/{_PAGE}') except Exception: pass peers = vpn_vlan.setdefault('peers', []) if any(p.get('name') == peer_name for p in peers): flash(f'A peer named "{peer_name}" already exists.', 'error') return redirect(f'/{_PAGE}') for v in cfg.get('vlans', []): if not v.get('is_vpn'): continue if any(p.get('ip') == peer_ip for p in v.get('peers', [])): flash(f'IP address {peer_ip} is already assigned to another peer.', 'error') return redirect(f'/{_PAGE}') private_key, public_key = _generate_wg_keypair() entry = { 'name': peer_name, 'ip': peer_ip, 'public_key': public_key, 'split_tunnel': split_tunnel, 'enabled': enabled, } peers.append(entry) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') changes = diff_fields(None, entry) record_group(cfg, f'vlans[name={peer_vlan_nm}].peers', 'name', peer_name, changes, 'core apply') return _conf_response(vpn_vlan, peer_name, peer_ip, private_key) @bp.route('/action/vpn/peers_edit', methods=['POST']) @require_level('administrator') def peers_edit(): flat_idx = _row_index() if flat_idx is None: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') peer_name = sanitize.name(request.form.get('name', '')) split_tunnel = request.form.get('split_tunnel') in ('true', '1', 'on', 'yes') enabled = request.form.get('enabled') not in ('false', '0', '') if not peer_name: flash('Peer name is required.', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx) if vlan is None: flash('Peer not found.', 'error') return redirect(f'/{_PAGE}') peers = vlan.get('peers', []) if any(j != peer_idx and p.get('name') == peer_name for j, p in enumerate(peers)): flash(f'A peer named "{peer_name}" already exists.', 'error') return redirect(f'/{_PAGE}') before = copy.deepcopy({k: peers[peer_idx].get(k) for k in ('name', 'split_tunnel', 'enabled')}) peers[peer_idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled}) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') vlan_name = vlan['name'] changes = diff_fields(before, {'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled}) flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/vpn/peers_toggle', methods=['POST']) @require_level('administrator') def peers_toggle(): flat_idx = _row_index() if flat_idx is None: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx) if vlan is None: flash('Peer not found.', 'error') return redirect(f'/{_PAGE}') peers = vlan.get('peers', []) old_enabled = peers[peer_idx].get('enabled', True) before = copy.deepcopy(peers[peer_idx]) peers[peer_idx]['enabled'] = not old_enabled errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') peer_name = peers[peer_idx]['name'] vlan_name = vlan['name'] changes = diff_fields(before, peers[peer_idx]) flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/vpn/peers_delete', methods=['POST']) @require_level('administrator') def peers_delete(): flat_idx = _row_index() if flat_idx is None: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx) if vlan is None: flash('Peer not found.', 'error') return redirect(f'/{_PAGE}') peers = vlan.get('peers', []) removed = peers.pop(peer_idx) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') vlan_name = vlan['name'] changes = diff_fields(removed, None) flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', removed['name'], changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/vpn/peers_regenerate', methods=['POST']) @require_level('administrator') def peers_regenerate(): flat_idx = _row_index() if flat_idx is None: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') if not _hash_ok(): return redirect(f'/{_PAGE}') cfg = load_config() vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx) if vlan is None: flash('Peer not found.', 'error') return redirect(f'/{_PAGE}') private_key, public_key = _generate_wg_keypair() peer = vlan['peers'][peer_idx] old_pub_key = peer.get('public_key', '') peer['public_key'] = public_key errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(f'/{_PAGE}') vlan_name = vlan['name'] changes = diff_fields({'public_key': old_pub_key}, {'public_key': public_key}) record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer['name'], changes, 'core apply') return _conf_response(vlan, peer['name'], peer['ip'], private_key)