From 2bfa5ff29a872712a6b39d37fed6e766a3895b12 Mon Sep 17 00:00:00 2001 From: Matthew Grotke Date: Wed, 20 May 2026 17:10:18 -0400 Subject: [PATCH] Development --- .../app/action_apply_banned_ips.py | 22 +- .../app/action_apply_blocklists.py | 17 +- .../app/action_apply_ddns_providers.py | 2 +- .../app/action_apply_dhcp_reservations.py | 56 +- .../router-dash/app/action_apply_general.py | 13 +- .../app/action_apply_host_overrides.py | 25 +- .../app/action_apply_iface_config.py | 8 +- .../app/action_apply_inter_vlan.py | 22 +- .../router-dash/app/action_apply_interface.py | 6 + docker/router-dash/app/action_apply_mdns.py | 6 + .../app/action_apply_port_forwarding.py | 22 +- .../app/action_apply_upstream_dns.py | 13 +- docker/router-dash/app/action_apply_vlans.py | 24 + docker/router-dash/app/action_apply_vpn.py | 55 +- docker/router-dash/app/validate.py | 28 - docker/router-dash/app/view_page.py | 2 +- router/core.py | 505 +--------------- router/validation.py | 553 +++++++++++++++++- 18 files changed, 814 insertions(+), 565 deletions(-) delete mode 100644 docker/router-dash/app/validate.py diff --git a/docker/router-dash/app/action_apply_banned_ips.py b/docker/router-dash/app/action_apply_banned_ips.py index ece76b0..460b7cc 100644 --- a/docker/router-dash/app/action_apply_banned_ips.py +++ b/docker/router-dash/app/action_apply_banned_ips.py @@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_banned_ips', __name__) @@ -53,6 +53,11 @@ def add_banned_ip(): 'ip': ip, 'enabled': True, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -77,6 +82,11 @@ def toggle_banned_ip(): return redirect(VIEW) items[idx]['enabled'] = not items[idx].get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -107,6 +117,11 @@ def edit_banned_ip(): return redirect(VIEW) items[idx].update({'description': description, 'ip': ip, 'enabled': enabled}) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -131,6 +146,11 @@ def delete_banned_ip(): return redirect(VIEW) removed = items.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_blocklists.py b/docker/router-dash/app/action_apply_blocklists.py index e4ec91d..85b79a1 100644 --- a/docker/router-dash/app/action_apply_blocklists.py +++ b/docker/router-dash/app/action_apply_blocklists.py @@ -3,7 +3,7 @@ from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import re import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_blocklists', __name__) @@ -76,6 +76,11 @@ def add_blocklist(): 'url': fields['url'], 'save_as': _save_as_from_name(fields['name']), }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -110,6 +115,11 @@ def edit_blocklist(): 'format': fields['format'], 'url': fields['url'], }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -134,6 +144,11 @@ def delete_blocklist(): return redirect(VIEW) removed = items.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_ddns_providers.py b/docker/router-dash/app/action_apply_ddns_providers.py index 50ab5ca..e2f43bc 100644 --- a/docker/router-dash/app/action_apply_ddns_providers.py +++ b/docker/router-dash/app/action_apply_ddns_providers.py @@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level import json import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_ddns_providers', __name__) diff --git a/docker/router-dash/app/action_apply_dhcp_reservations.py b/docker/router-dash/app/action_apply_dhcp_reservations.py index 6ca485f..512e278 100644 --- a/docker/router-dash/app/action_apply_dhcp_reservations.py +++ b/docker/router-dash/app/action_apply_dhcp_reservations.py @@ -1,8 +1,10 @@ +import ipaddress + from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_dhcp_reservations', __name__) @@ -46,12 +48,30 @@ def _parse_ip(): return ip +def _check_ip_conflicts(ip, vlan): + """Return an error message if ip conflicts with pool range or server identities, else None.""" + dhcp = vlan.get('dhcp_information', {}) + pool_start = dhcp.get('dynamic_pool_start') + pool_end = dhcp.get('dynamic_pool_end') + if pool_start and pool_end: + try: + if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip) + <= ipaddress.IPv4Address(pool_end)): + return f'{ip} falls within the dynamic pool range ({pool_start}–{pool_end}).' + except Exception: + pass + identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')} + if ip in identity_ips: + return f'{ip} is already assigned as a server identity IP.' + return None + + @bp.route('/action/add_dhcp_reservation', methods=['POST']) @require_level('administrator') def add_dhcp_reservation(): vlan_name = sanitize.name(request.form.get('vlan_name', '')) description = sanitize.text(request.form.get('description', '')) - hostname = sanitize.domainname(request.form.get('hostname', '')) + hostname = validate.domainname(request.form.get('hostname', '')) mac = sanitize.mac(request.form.get('mac', '')) ip = _parse_ip() radius_client = 'radius_client' in request.form @@ -76,6 +96,11 @@ def add_dhcp_reservation(): flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error') return redirect(VIEW) + conflict = _check_ip_conflicts(ip, vlan) + if conflict: + flash(f'The configuration has not been saved because {conflict}', 'error') + return redirect(VIEW) + vlan.setdefault('reservations', []).append({ 'description': description, 'hostname': hostname, @@ -84,6 +109,11 @@ def add_dhcp_reservation(): 'radius_client': radius_client, 'enabled': True, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -110,6 +140,11 @@ def toggle_dhcp_reservation(): res = vlans[vi]['reservations'][ri] res['enabled'] = not res.get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -125,7 +160,7 @@ def edit_dhcp_reservation(): return redirect(VIEW) description = sanitize.text(request.form.get('description', '')) - hostname = sanitize.domainname(request.form.get('hostname', '')) + hostname = validate.domainname(request.form.get('hostname', '')) mac = sanitize.mac(request.form.get('mac', '')) ip = _parse_ip() radius_client = 'radius_client' in request.form @@ -146,6 +181,11 @@ def edit_dhcp_reservation(): flash('Entry not found.', 'error') return redirect(VIEW) + conflict = _check_ip_conflicts(ip, vlans[vi]) + if conflict: + flash(f'The configuration has not been saved because {conflict}', 'error') + return redirect(VIEW) + res = vlans[vi]['reservations'][ri] res.update({ 'description': description, @@ -155,6 +195,11 @@ def edit_dhcp_reservation(): 'radius_client': radius_client, 'enabled': 'enabled' in request.form, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -180,6 +225,11 @@ def delete_dhcp_reservation(): return redirect(VIEW) removed = vlans[vi]['reservations'].pop(ri) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_general.py b/docker/router-dash/app/action_apply_general.py index 81b95cf..04e000e 100644 --- a/docker/router-dash/app/action_apply_general.py +++ b/docker/router-dash/app/action_apply_general.py @@ -2,6 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize +import validation as validate bp = Blueprint('action_apply_general', __name__) @@ -14,11 +15,8 @@ def apply_general(): dnsmasq_log_queries = 'dnsmasq_log_queries' in request.form daily_execute_time = sanitize.time_24h(request.form.get('daily_execute_time_24hr_local', '')) - try: - log_max_kb = int(log_max_kb_raw) - if log_max_kb < 64: - raise ValueError - except (ValueError, TypeError): + log_max_kb = validate.int_range(log_max_kb_raw, 64, None) + if log_max_kb is None: flash('Max Log Size must be a number >= 64.', 'error') return redirect('/view/view_general') @@ -33,6 +31,11 @@ def apply_general(): 'dnsmasq_log_queries': dnsmasq_log_queries, 'daily_execute_time_24hr_local': daily_execute_time, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect('/view/view_general') save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_host_overrides.py b/docker/router-dash/app/action_apply_host_overrides.py index e260e4d..ef0f51b 100644 --- a/docker/router-dash/app/action_apply_host_overrides.py +++ b/docker/router-dash/app/action_apply_host_overrides.py @@ -4,6 +4,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize +import validation as validate bp = Blueprint('action_apply_host_overrides', __name__) @@ -51,7 +52,7 @@ def _hash_ok(): @require_level('administrator') def add_host_override(): description = sanitize.text(request.form.get('description', '')) - host = sanitize.domainname(request.form.get('host', '')) + host = validate.domainname(request.form.get('host', '')) ip = sanitize.ip(request.form.get('ip', '')) if not host or not ip: @@ -72,6 +73,11 @@ def add_host_override(): 'ip': ip, 'enabled': True, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -96,6 +102,11 @@ def toggle_host_override(): return redirect(VIEW) items[idx]['enabled'] = not items[idx].get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -111,7 +122,7 @@ def edit_host_override(): return redirect(VIEW) description = sanitize.text(request.form.get('description', '')) - host = sanitize.domainname(request.form.get('host', '')) + host = validate.domainname(request.form.get('host', '')) ip = sanitize.ip(request.form.get('ip', '')) enabled = request.form.get('enabled') == 'on' @@ -133,6 +144,11 @@ def edit_host_override(): return redirect(VIEW) items[idx].update({'description': description, 'host': host, 'ip': ip, 'enabled': enabled}) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -157,6 +173,11 @@ def delete_host_override(): return redirect(VIEW) removed = items.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_iface_config.py b/docker/router-dash/app/action_apply_iface_config.py index 0fbdae5..51f5a47 100644 --- a/docker/router-dash/app/action_apply_iface_config.py +++ b/docker/router-dash/app/action_apply_iface_config.py @@ -4,6 +4,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import verify_core_hash, queued_msg, queue_command import sanitize +import validation as validate bp = Blueprint('action_apply_iface_config', __name__) @@ -47,11 +48,8 @@ def apply_iface_config(): mtu_int = None if mtu: - try: - mtu_int = int(mtu) - if not (68 <= mtu_int <= 9000): - raise ValueError - except ValueError: + mtu_int = validate.int_range(mtu, 68, 9000) + if mtu_int is None: flash('MTU must be an integer between 68 and 9000.', 'error') return redirect(_VIEW) diff --git a/docker/router-dash/app/action_apply_inter_vlan.py b/docker/router-dash/app/action_apply_inter_vlan.py index acd054d..f8cc7c4 100644 --- a/docker/router-dash/app/action_apply_inter_vlan.py +++ b/docker/router-dash/app/action_apply_inter_vlan.py @@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_inter_vlan', __name__) @@ -83,6 +83,11 @@ def add_inter_vlan(): core = load_core() core.setdefault('inter_vlan_exceptions', []).append(entry) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -107,6 +112,11 @@ def toggle_inter_vlan(): return redirect(VIEW) items[idx]['enabled'] = not items[idx].get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -136,6 +146,11 @@ def edit_inter_vlan(): items[idx] = entry items[idx]['enabled'] = request.form.get('enabled') == 'on' + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -160,6 +175,11 @@ def delete_inter_vlan(): return redirect(VIEW) items.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_interface.py b/docker/router-dash/app/action_apply_interface.py index 3ba45f1..38ccf63 100644 --- a/docker/router-dash/app/action_apply_interface.py +++ b/docker/router-dash/app/action_apply_interface.py @@ -4,6 +4,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize +import validation as validate bp = Blueprint('action_apply_interface', __name__) @@ -53,6 +54,11 @@ def apply_interface(): gen = core.setdefault('general', {}) gen['wan_interface'] = wan gen['lan_interface'] = lan + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_mdns.py b/docker/router-dash/app/action_apply_mdns.py index 4601fa9..5c99146 100644 --- a/docker/router-dash/app/action_apply_mdns.py +++ b/docker/router-dash/app/action_apply_mdns.py @@ -2,6 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize +import validation as validate bp = Blueprint('action_apply_mdns', __name__) @@ -23,6 +24,11 @@ def apply_mdns(): 'enabled': mdns_enabled, 'reflect_vlans': mdns_reflect_vlans, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect('/view/view_mdns') save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_port_forwarding.py b/docker/router-dash/app/action_apply_port_forwarding.py index bac06b3..4897d78 100644 --- a/docker/router-dash/app/action_apply_port_forwarding.py +++ b/docker/router-dash/app/action_apply_port_forwarding.py @@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_port_forwarding', __name__) @@ -84,6 +84,11 @@ def add_port_forward(): core = load_core() core.setdefault('port_forwarding', []).append(entry) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -108,6 +113,11 @@ def toggle_port_forward(): return redirect(VIEW) items[idx]['enabled'] = not items[idx].get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -137,6 +147,11 @@ def edit_port_forward(): items[idx] = entry items[idx]['enabled'] = request.form.get('enabled') == 'on' + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -161,6 +176,11 @@ def delete_port_forward(): return redirect(VIEW) removed = items.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_upstream_dns.py b/docker/router-dash/app/action_apply_upstream_dns.py index 9d2b805..757205f 100644 --- a/docker/router-dash/app/action_apply_upstream_dns.py +++ b/docker/router-dash/app/action_apply_upstream_dns.py @@ -2,6 +2,7 @@ from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize +import validation as validate bp = Blueprint('action_apply_upstream_dns', __name__) @@ -26,11 +27,8 @@ def apply_upstream_dns(): return redirect('/view/view_upstream_dns') upstream_servers.append(clean) - try: - cache_size = int(cache_size_raw) - if cache_size < 0: - raise ValueError - except (ValueError, TypeError): + cache_size = validate.int_range(cache_size_raw, 0, None) + if cache_size is None: flash('Cache Size must be a non-negative integer.', 'error') return redirect('/view/view_upstream_dns') @@ -51,6 +49,11 @@ def apply_upstream_dns(): 'cache_size': cache_size, 'upstream_servers': upstream_servers, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect('/view/view_upstream_dns') save_core(core) flash(queued_msg('core apply'), 'success') return redirect('/view/view_upstream_dns') diff --git a/docker/router-dash/app/action_apply_vlans.py b/docker/router-dash/app/action_apply_vlans.py index fa6ef90..15b800b 100644 --- a/docker/router-dash/app/action_apply_vlans.py +++ b/docker/router-dash/app/action_apply_vlans.py @@ -3,6 +3,7 @@ from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize import ipaddress as _ipaddress +import validation as validate bp = Blueprint('action_apply_vlans', __name__) @@ -77,6 +78,10 @@ def add_vlan(): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) + if radius_default and any(v.get('radius_default') for v in vlans): + flash('Only one VLAN can be the RADIUS default.', 'error') + return redirect(VIEW) + entry = { 'vlan_id': vlan_id, 'name': name, @@ -92,6 +97,11 @@ def add_vlan(): else: entry['reservations'] = [] vlans.append(entry) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -161,6 +171,10 @@ def edit_vlan(): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) + if radius_default and any(i != idx and v.get('radius_default') for i, v in enumerate(vlans)): + flash('Only one VLAN can be the RADIUS default.', 'error') + return redirect(VIEW) + existing.update({ 'vlan_id': vlan_id, 'name': name, @@ -171,6 +185,11 @@ def edit_vlan(): 'mdns_reflection': mdns_reflection, 'use_blocklists': use_blocklists, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') @@ -195,6 +214,11 @@ def delete_vlan(): return redirect(VIEW) removed = vlans.pop(idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') diff --git a/docker/router-dash/app/action_apply_vpn.py b/docker/router-dash/app/action_apply_vpn.py index 198727b..7df9ca2 100644 --- a/docker/router-dash/app/action_apply_vpn.py +++ b/docker/router-dash/app/action_apply_vpn.py @@ -6,7 +6,7 @@ from flask import Blueprint, make_response, redirect, flash, request from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg, CONFIGS_DIR import sanitize -import validate +import validation as validate bp = Blueprint('action_apply_vpn', __name__) @@ -140,19 +140,16 @@ def _conf_response(vlan, peer_name, peer_ip, private_key): @require_level('administrator') def apply_vpn(): listen_port_raw = request.form.get('vpn_listen_port', '').strip() - server_endpoint = sanitize.domainname(request.form.get('vpn_server_endpoint', '')) - domain = sanitize.domainname(request.form.get('vpn_domain', '')) + server_endpoint = validate.domainname(request.form.get('vpn_server_endpoint', '')) + domain = validate.domainname(request.form.get('vpn_domain', '')) dns_raw = request.form.get('vpn_dns_server', '').strip() mtu_raw = request.form.get('vpn_mtu', '').strip() if not listen_port_raw: flash('Listen port is required.', 'error') return redirect(_VIEW) - try: - listen_port = int(listen_port_raw) - if not (1 <= listen_port <= 65535): - raise ValueError - except (ValueError, TypeError): + listen_port = validate.int_range(listen_port_raw, 1, 65535) + if listen_port is None: flash(f'"{listen_port_raw}" is not a valid port number (1-65535).', 'error') return redirect(_VIEW) @@ -165,11 +162,8 @@ def apply_vpn(): mtu = None if mtu_raw: - try: - mtu = int(mtu_raw) - if not (_MTU_MIN <= mtu <= _MTU_MAX): - raise ValueError - except (ValueError, TypeError): + mtu = validate.int_range(mtu_raw, _MTU_MIN, _MTU_MAX) + if mtu is None: flash(f'"{mtu_raw}" is not a valid MTU (must be {_MTU_MIN}-{_MTU_MAX}).', 'error') return redirect(_VIEW) @@ -182,6 +176,11 @@ def apply_vpn(): flash('No WireGuard VLAN found in configuration.', 'error') return redirect(_VIEW) + for v in core.get('vlans', []): + if v.get('is_vpn') and v is not vpn_vlan and v.get('vpn_information', {}).get('listen_port') == listen_port: + flash(f'Listen port {listen_port} is already used by another VPN VLAN.', 'error') + return redirect(_VIEW) + info = vpn_vlan.setdefault('vpn_information', {}) info['listen_port'] = listen_port info['server_endpoint'] = server_endpoint @@ -197,6 +196,11 @@ def apply_vpn(): else: overrides.pop('mtu', None) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(_VIEW) @@ -258,6 +262,11 @@ def add_vpn_peer(): 'split_tunnel': split_tunnel, 'enabled': enabled, }) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) return _conf_response(vpn_vlan, peer_name, peer_ip, private_key) @@ -293,6 +302,11 @@ def edit_vpn_peer(): return redirect(_VIEW) peers[peer_idx].update({'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled}) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(_VIEW) @@ -316,6 +330,11 @@ def toggle_vpn_peer(): peers = vlan.get('peers', []) peers[peer_idx]['enabled'] = not peers[peer_idx].get('enabled', True) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(_VIEW) @@ -338,6 +357,11 @@ def delete_vpn_peer(): return redirect(_VIEW) vlan.get('peers', []).pop(peer_idx) + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(_VIEW) @@ -362,6 +386,11 @@ def regenerate_vpn_peer(): private_key, public_key = _generate_wg_keypair() peer = vlan['peers'][peer_idx] peer['public_key'] = public_key + errors = validate.validate_config(core) + if errors: + for msg in errors: + flash(msg, 'error') + return redirect(_VIEW) save_core(core) return _conf_response(vlan, peer['name'], peer['ip'], private_key) diff --git a/docker/router-dash/app/validate.py b/docker/router-dash/app/validate.py deleted file mode 100644 index d20b81e..0000000 --- a/docker/router-dash/app/validate.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -validate.py -- Flask app re-export of the shared validation module. - -~/router/validation.py is volume-mounted to /app/validation.py by -docker-compose, making it directly importable. Action scripts import -this module (validate.*) so they are insulated from the shared file's -location and name. -""" -from validation import ( - VALID_PROTOCOLS, - VALID_BLOCKLIST_FORMATS, - ip, - ip_or_cidr, - port, - banned_ip, -) - -VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns') - -__all__ = [ - 'VALID_PROTOCOLS', - 'VALID_BLOCKLIST_FORMATS', - 'VALID_DDNS_PROVIDERS', - 'ip', - 'ip_or_cidr', - 'port', - 'banned_ip', -] diff --git a/docker/router-dash/app/view_page.py b/docker/router-dash/app/view_page.py index 801e45c..3435088 100644 --- a/docker/router-dash/app/view_page.py +++ b/docker/router-dash/app/view_page.py @@ -2,7 +2,7 @@ from flask import Blueprint, session, redirect, get_flashed_messages from markupsafe import Markup import json, re, subprocess, os, sys, html as html_mod import sanitize -import validate +import validation as validate from datetime import datetime, timezone from config_utils import core_hash, get_pending_entries, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime diff --git a/router/core.py b/router/core.py index c6bc10d..ab59afd 100644 --- a/router/core.py +++ b/router/core.py @@ -100,7 +100,12 @@ import urllib.error import argparse from datetime import datetime from pathlib import Path -from validation import VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS +from validation import ( + VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS, + int_range, domainname, + inject_interfaces, is_wg, is_dynamic_ip, + validate_config, +) SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "core.json" @@ -247,29 +252,6 @@ def resolve_vlan_options(vlan): def is_physical(vlan): return vlan["vlan_id"] == 1 -def is_wg(vlan): - return vlan.get("is_vpn", False) - - -def inject_interfaces(data): - """Compute and inject the 'interface' field for every VLAN from is_vpn + vlan_id. - - is_vpn=False (regular VLAN): - vlan_id 1 → general.lan_interface (e.g. enp6s0) - vlan_id N → lan_interface.N (e.g. enp6s0.10) - is_vpn=True (WireGuard VLAN): - 1st WG VLAN → wg0, 2nd → wg1, etc. (order in vlans array) - """ - lan = data.get("general", {}).get("lan_interface", "eth0") - wg_idx = 0 - for vlan in data.get("vlans", []): - if vlan.get("is_vpn"): - vlan["interface"] = f"wg{wg_idx}" - wg_idx += 1 - else: - vid = vlan.get("vlan_id", 1) - vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}" - def networkd_stem(vlan): return f"10-router-{vlan['name']}" @@ -297,13 +279,6 @@ def rule_enabled(rules): def rule_disabled(rules): return [r for r in rules if r.get("enabled") is not True] -def is_dynamic_ip(r): - """Return True if a reservation has no pinned IP -- DHCP assigns from pool. - Triggered by: ip field absent, empty string, or the keyword 'dynamic'. - """ - ip = r.get("ip", "dynamic") - return ip in ("", "dynamic") or ip is None - def expand_protocols(rule): """Return list of (protocol, rule, comment_suffix) tuples. When protocol is 'both', expands into tcp and udp with suffixes @@ -328,467 +303,6 @@ def load_config(): die("No vlans defined in core.json.") return data -# =================================================================== -# Validate -# =================================================================== - -def validate_config(data): - inject_interfaces(data) - errors = [] - seen_vlan_ids = {} - seen_interfaces = {} - seen_names = {} - seen_listen_ports = {} - - # -- upstream_dns block ---------------------------------------------------- - if not data.get("upstream_dns", {}).get("upstream_servers"): - errors.append("upstream_dns.upstream_servers is missing or empty.") - - # -- WAN / LAN interfaces -------------------------------------------------- - gen = data.get("general", {}) - wan = gen.get("wan_interface", "") - lan = gen.get("lan_interface", "") - if not wan: - errors.append("general.wan_interface is missing or empty.") - if not lan: - errors.append("general.lan_interface is missing or empty.") - if wan and lan: - available_interfaces = set() - try: - result = subprocess.run(["ip", "link", "show"], capture_output=True, text=True) - available_interfaces = set(re.findall(r"^\d+:\s+(\S+):", result.stdout, re.MULTILINE)) - available_interfaces = {i.split("@")[0] for i in available_interfaces} - except Exception: - pass - if available_interfaces: - if wan not in available_interfaces: - errors.append(f"general.wan_interface: '{wan}' does not exist on this system.") - if lan not in available_interfaces: - errors.append(f"general.lan_interface: '{lan}' does not exist on this system.") - if wan == lan: - errors.append(f"general.wan_interface and general.lan_interface must be different (both set to '{wan}').") - - # -- Blocklist library ----------------------------------------------------- - blocklists_by_name = {} - for idx, bl in enumerate(data.get("blocklists", [])): - name = bl.get("name", "") - label = f"blocklists[{idx}] '{name}'" - for field in ("name", "description", "save_as", "url", "format"): - if not bl.get(field): - errors.append(f"{label}: missing or empty field '{field}'.") - if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS: - errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.") - if name: - if name in blocklists_by_name: - errors.append(f"{label}: duplicate blocklist name '{name}'.") - else: - blocklists_by_name[name] = bl - - # -- Per-VLAN validation --------------------------------------------------- - vlan_networks = {} # interface -> IPv4Network (built for nat validation) - - for vlan in data["vlans"]: - vlan_id = vlan.get("vlan_id") - name = vlan.get("name", "?") - iface = vlan.get("interface", "") - label = f"vlan '{name}' (id={vlan_id})" - - if name in seen_names: - errors.append(f"{label}: duplicate vlan name '{name}' " - f"(also used by id={seen_names[name]}).") - else: - seen_names[name] = vlan_id - - if vlan_id in seen_vlan_ids: - errors.append(f"{label}: duplicate vlan_id {vlan_id} " - f"(also used by '{seen_vlan_ids[vlan_id]}').") - else: - seen_vlan_ids[vlan_id] = name - - if iface in seen_interfaces: - errors.append(f"{label}: duplicate interface '{iface}' " - f"(also used by '{seen_interfaces[iface]}').") - else: - seen_interfaces[iface] = name - - if vlan.get("mdns_reflection") is True and is_wg(vlan): - errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.") - - if is_wg(vlan): - # -- vpn_information ----------------------------------------------- - vpi = vlan.get("vpn_information") - if not isinstance(vpi, dict): - errors.append(f"{label}: vpn_information must be a plain object.") - vpi = {} - else: - lp = vpi.get("listen_port") - if not isinstance(lp, int) or not (1 <= lp <= 65535): - errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.") - elif lp in seen_listen_ports: - errors.append(f"{label}: vpn_information.listen_port {lp} is already used by " - f"'{seen_listen_ports[lp]}'.") - else: - seen_listen_ports[lp] = name - - # -- subnet/subnet_mask -------------------------------------------- - for field in ("subnet", "subnet_mask"): - if not vlan.get(field): - errors.append(f"{label}: missing required field '{field}'.") - wg_net = None - if vlan.get("subnet") and vlan.get("subnet_mask"): - try: - wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) - vlan_networks[iface] = wg_net - except ValueError as e: - errors.append(f"{label}: invalid subnet/subnet_mask: {e}") - - # -- server_identities --------------------------------------------- - if not vlan.get("server_identities"): - errors.append(f"{label}: server_identities is empty or missing.") - identity_ips = [] - for idx, ident in enumerate(vlan.get("server_identities", [])): - ip_str = ident.get("ip", "") - ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'" - if not ip_str: - errors.append(f"{ilabel}: missing 'ip' field.") - continue - try: - ip = ipaddress.IPv4Address(ip_str) - if wg_net and ip not in wg_net: - errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.") - else: - identity_ips.append(ip) - except ValueError: - errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.") - - # -- vpn_information.explicit_overrides ---------------------------- - eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {} - if not isinstance(eo, dict): - errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.") - else: - gw = eo.get("gateway", "") - if gw: - try: - gw_ip = ipaddress.IPv4Address(gw) - if identity_ips and gw_ip not in identity_ips: - errors.append( - f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match " - f"any server_identity IP. Must be one of: " - f"{[str(ip) for ip in identity_ips]}." - ) - except ValueError: - errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.") - dns = eo.get("dns_server", "") - if dns: - try: - ipaddress.IPv4Address(dns) - except ValueError: - errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.") - mtu = eo.get("mtu", "") - if mtu: - try: - m = int(mtu) - if not (576 <= m <= 9000): - errors.append(f"{label}: vpn_information.explicit_overrides.mtu {mtu} is out of valid range (576-9000).") - except (ValueError, TypeError): - errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' is not a valid integer.") - - # -- peers --------------------------------------------------------- - seen_peer_names = {} - seen_peer_ips = {} - for pidx, peer in enumerate(vlan.get("peers", [])): - pname = peer.get("name", "") - plabel = f"{label} peer[{pidx}] '{pname}'" - if not pname: - errors.append(f"{plabel}: missing 'name' field.") - elif pname in seen_peer_names: - errors.append(f"{plabel}: duplicate peer name '{pname}'.") - else: - seen_peer_names[pname] = pidx - if not peer.get("public_key"): - errors.append(f"{plabel}: missing 'public_key' field.") - pip_str = peer.get("ip", "") - if not pip_str: - errors.append(f"{plabel}: missing 'ip' field.") - else: - try: - pip = ipaddress.IPv4Address(pip_str) - if wg_net and pip not in wg_net: - errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.") - if pip in identity_ips: - errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.") - if pip_str in seen_peer_ips: - errors.append( - f"{plabel}: duplicate peer ip '{pip_str}' " - f"(also used by peer '{seen_peer_ips[pip_str]}')." - ) - else: - seen_peer_ips[pip_str] = pname - except ValueError: - errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.") - continue - - if not vlan.get("server_identities"): - errors.append(f"{label}: server_identities is empty or missing.") - continue - - for field in ("subnet", "subnet_mask"): - if not vlan.get(field): - errors.append(f"{label}: missing required top-level field '{field}'.") - if not vlan.get("subnet") or not vlan.get("subnet_mask"): - continue - - try: - network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) - vlan_networks[iface] = network - except ValueError as e: - errors.append(f"{label}: invalid subnet/subnet_mask: {e}") - continue - - d = vlan.get("dhcp_information", {}) - required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"} - missing = required_dhcp - set(d.keys()) - if missing: - errors.append(f"{label}: missing dhcp_information fields: {missing}") - continue - - def check_ip(field_label, ip_str, allow_none=False): - if ip_str is None: - if not allow_none: - errors.append(f"{label}: {field_label} is null/missing.") - return None - try: - ip = ipaddress.IPv4Address(ip_str) - except ValueError: - errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.") - return None - if ip not in network: - errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.") - return ip - - identity_ips = [] - for idx, ident in enumerate(vlan["server_identities"]): - ip = check_ip( - f"server_identities[{idx}] '{ident.get('description', '?')}'", - ident.get("ip") - ) - if ip: - identity_ips.append(ip) - - # -- Validate explicit_overrides ----------------------------------- - eo = d.get("explicit_overrides", {}) - if not isinstance(eo, dict): - errors.append(f"{label}: explicit_overrides must be a plain object.") - else: - gw = eo.get("gateway", "") - if gw: - gw_ip = check_ip("explicit_overrides.gateway", gw) - if gw_ip and gw_ip not in identity_ips: - errors.append( - f"{label}: explicit_overrides.gateway '{gw}' does not match " - f"any server_identity IP. Must be one of: " - f"{[str(ip) for ip in identity_ips]}." - ) - dns = eo.get("dns_server", "") - if dns: - check_ip("explicit_overrides.dns_server", dns) - ntp = eo.get("ntp_server", "") - if ntp: - check_ip("explicit_overrides.ntp_server", ntp) - - pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"]) - pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"]) - - if pool_start and pool_end and pool_start > pool_end: - errors.append( - f"{label}: dynamic_pool_start '{pool_start}' is greater than " - f"dynamic_pool_end '{pool_end}'." - ) - - if pool_start and pool_end: - for ip in identity_ips: - if pool_start <= ip <= pool_end: - errors.append( - f"{label}: server_identity '{ip}' falls inside the dynamic " - f"pool ({pool_start} - {pool_end})." - ) - - seen_res_ips = {} - seen_res_macs = {} - for r in vlan.get("reservations", []): - rdesc = r.get("description", "?") - rmac = r.get("mac", "").lower().strip() - - if is_dynamic_ip(r): - rip = None # no pinned IP -- skip all IP validation - else: - rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip")) - - if rip: - if pool_start and pool_end and pool_start <= rip <= pool_end: - errors.append( - f"{label}: reservation '{rdesc}' ip '{rip}' falls inside " - f"the dynamic pool ({pool_start} - {pool_end})." - ) - rip_str = str(rip) - if rip_str in seen_res_ips: - # Allow same IP for different MACs (multi-interface device) - # Only flag if same MAC is also duplicated (caught below) - if rmac and rmac in seen_res_ips[rip_str]: - errors.append( - f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' " - f"duplicates '{seen_res_ips[rip_str][rmac]}'." - ) - else: - seen_res_ips[rip_str][rmac] = rdesc - else: - seen_res_ips[rip_str] = {rmac: rdesc} - if rip in identity_ips: - errors.append( - f"{label}: reservation '{rdesc}' ip '{rip}' conflicts " - f"with a server_identity." - ) - - if rmac: - if rmac in seen_res_macs: - errors.append( - f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates " - f"'{seen_res_macs[rmac]}'." - ) - else: - seen_res_macs[rmac] = rdesc - - for bl_name in vlan.get("use_blocklists", []): - if bl_name not in blocklists_by_name: - errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.") - - # -- NAT / firewall validation --------------------------------------------- - valid_protos = VALID_PROTOCOLS - known_interfaces = set(seen_interfaces.keys()) - - def nat_check_port(label, port): - try: - p = int(port) - if not (1 <= p <= 65535): - errors.append(f"{label}: port {port} is out of valid range (1-65535).") - except (TypeError, ValueError): - errors.append(f"{label}: '{port}' is not a valid port number.") - - def nat_check_ip(label, ip_str): - try: - return ipaddress.IPv4Address(ip_str) - except ValueError: - errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.") - return None - - def nat_check_ip_in_network(label, ip_str, network): - ip = nat_check_ip(label, ip_str) - if ip and ip not in network: - errors.append(f"{label}: '{ip_str}' is not within subnet {network}.") - - for vlan in data["vlans"]: - name = vlan.get("name", "?") - iface = vlan.get("interface", "") - net = vlan_networks.get(iface) - - for r in vlan.get("port_wrangling", []): - desc = r.get("description", "?") - label = f"vlan '{name}' port_wrangling '{desc}'" - if r.get("protocol") not in valid_protos: - errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " - f"Must be tcp, udp, or both.") - nat_check_port(f"{label} dest_port", r.get("dest_port")) - if net: - nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net) - - # -- port_forwarding validation (top-level) -------------------------------- - for idx, r in enumerate(data.get("port_forwarding", [])): - desc = r.get("description", "?") - label = f"port_forwarding[{idx}] '{desc}'" - if r.get("protocol") not in valid_protos: - errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " - f"Must be tcp, udp, or both.") - nat_check_port(f"{label} dest_port", r.get("dest_port")) - nat_check_port(f"{label} nat_port", r.get("nat_port")) - nat_check_ip(f"{label} nat_ip", r.get("nat_ip", "")) - - for r in data.get("inter_vlan_exceptions", []): - desc = r.get("description", "?") - label = f"inter_vlan_exceptions '{desc}'" - if r.get("protocol") not in valid_protos: - errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " - f"Must be tcp, udp, or both.") - if "src_ip_or_subnet" not in r: - errors.append(f"{label}: missing field 'src_ip_or_subnet'.") - else: - val = r["src_ip_or_subnet"] - try: - ipaddress.IPv4Address(val) - except ValueError: - try: - ipaddress.IPv4Network(val, strict=False) - except ValueError: - errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid " - f"IPv4 address or network.") - # Support both dst_ip (legacy, single IP) and dst_ip_or_subnet (IP or subnet) - dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "") - if not dst: - errors.append(f"{label}: missing field 'dst_ip_or_subnet'.") - else: - try: - ipaddress.IPv4Address(dst) - except ValueError: - try: - ipaddress.IPv4Network(dst, strict=False) - except ValueError: - errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid " - f"IPv4 address or network.") - if r.get("dst_port") is not None: - nat_check_port(f"{label} dst_port", r.get("dst_port")) - - # -- radius_default uniqueness check --------------------------------------- - defaults = [v["name"] for v in data["vlans"] if v.get("radius_default") is True] - if len(defaults) > 1: - errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). " - f"Only one VLAN may be the RADIUS default.") - - # -- host_overrides validation --------------------------------------------- - all_vlan_nets = list(vlan_networks.values()) - for idx, entry in enumerate(data.get("host_overrides", [])): - lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'" - if not entry.get("host"): - errors.append(f"{lbl}: missing 'host' field.") - ip_str = entry.get("ip", "") - if not ip_str: - errors.append(f"{lbl}: missing 'ip' field.") - else: - try: - ip_addr = ipaddress.IPv4Address(ip_str) - if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets): - errors.append( - f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet." - ) - except ValueError: - errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.") - - # -- banned_ips validation ------------------------------------------------- - for idx, entry in enumerate(data.get("banned_ips", [])): - ip = entry.get("ip", "") - lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'" - if not ip: - errors.append(f"{lbl}: missing 'ip' field.") - continue - try: - expand_banned_ip(ip) - except ValueError as e: - errors.append(f"{lbl}: {e}") - - if errors: - print("Validation failed:", file=sys.stderr) - for e in errors: - print(f" - {e}", file=sys.stderr) - sys.exit(1) - # =================================================================== # Build systemd-networkd files # =================================================================== @@ -3759,7 +3273,12 @@ def main(): sys.exit(1) data = load_config() - validate_config(data) + errors = validate_config(data) + if errors: + print("Validation failed:", file=sys.stderr) + for e in errors: + print(f" - {e}", file=sys.stderr) + sys.exit(1) general = data.get("general", {}) setup_logging( diff --git a/router/validation.py b/router/validation.py index 81b4e85..278c0df 100644 --- a/router/validation.py +++ b/router/validation.py @@ -2,18 +2,19 @@ validation.py -- Shared structural validators for core.json fields. Lives alongside core.py in ~/router/ and is volume-mounted into the -router-dash container at /configs/validation.py. Importable by both -core.py (router host) and the Flask app (via validate.py which adds -/configs to sys.path). +router-dash container at /app/validation.py. Importable by both +core.py (router host) and the Flask app directly. -Convention: each function accepts a raw string and returns the -normalised valid value, or '' if the input is invalid. +Convention: primitive validators accept a raw string and return the +normalised valid value, or '' / None if the input is invalid. """ import ipaddress +import os import re VALID_PROTOCOLS = {'tcp', 'udp', 'both'} VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'} +VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns') # =================================================================== @@ -49,6 +50,35 @@ def ip_or_cidr(value): return '' +def ipv4(value): + """Return value if it is a valid IPv4 address, else ''.""" + if not value: + return '' + v = str(value).strip() + try: + ipaddress.IPv4Address(v) + return v + except ValueError: + return '' + + +def ipv4_or_cidr(value): + """Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''.""" + if not value: + return '' + v = str(value).strip() + try: + ipaddress.IPv4Address(v) + return v + except ValueError: + pass + try: + ipaddress.IPv4Network(v, strict=False) + return v + except ValueError: + return '' + + # =================================================================== # Port # =================================================================== @@ -64,6 +94,51 @@ def port(value): return '' +# =================================================================== +# Integer range +# =================================================================== + +def int_range(value, lo, hi): + """Return value as int if it is an integer within [lo, hi], else None. + + lo and hi may be None to indicate no lower or upper bound respectively. + """ + try: + v = int(str(value).strip()) + if (lo is None or v >= lo) and (hi is None or v <= hi): + return v + except (ValueError, TypeError): + pass + return None + + +# =================================================================== +# Domain name +# =================================================================== + +def domainname(value): + """Return value if it is a valid domain name, else ''. + + Rules: labels separated by dots; each label contains only + letters, digits, and hyphens; no label may start or end with a + hyphen; no consecutive dots; total length <= 253. + """ + if not value: + return '' + v = str(value).strip().lower() + if len(v) > 253: + return '' + if '..' in v or v.startswith('.') or v.endswith('.'): + return '' + labels = v.split('.') + for label in labels: + if not label: + return '' + if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label): + return '' + return v + + # =================================================================== # Banned-IP pattern # =================================================================== @@ -162,3 +237,471 @@ def _check_banned_ipv6(ip_str): groups = [g for g in prefix_part.split(':') if g] if not (1 <= len(groups) <= 7): raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}") + + +# =================================================================== +# VLAN / interface helpers (shared with core.py apply logic) +# =================================================================== + +def is_wg(vlan): + return vlan.get("is_vpn", False) + + +def is_dynamic_ip(r): + """Return True if a reservation has no pinned IP (DHCP assigns from pool).""" + ip = r.get("ip", "dynamic") + return ip in ("", "dynamic") or ip is None + + +def inject_interfaces(data): + """Mutate data in-place: add derived 'interface' field to every VLAN. + + Called by core.py cmd_apply before apply logic. NOT called by + validate_config — validate_config computes iface locally to stay pure. + """ + lan = data.get("general", {}).get("lan_interface", "eth0") + wg_idx = 0 + for vlan in data.get("vlans", []): + if vlan.get("is_vpn"): + vlan["interface"] = f"wg{wg_idx}" + wg_idx += 1 + else: + vid = vlan.get("vlan_id", 1) + vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}" + + +# =================================================================== +# Full config validation (shared with core.py --apply) +# =================================================================== + +def validate_config(data): + """Validate core.json structure and content. Returns list of error strings.""" + errors = [] + seen_vlan_ids = {} + seen_interfaces = {} + seen_names = {} + seen_listen_ports = {} + + # Pre-compute per-VLAN interface names without mutating data + _lan = data.get("general", {}).get("lan_interface", "eth0") + vlan_ifaces = [] + _wg_idx = 0 + for _vlan in data.get("vlans", []): + if is_wg(_vlan): + vlan_ifaces.append(f"wg{_wg_idx}") + _wg_idx += 1 + else: + _vid = _vlan.get("vlan_id", 1) + vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}") + + # -- upstream_dns block ---------------------------------------------------- + if not data.get("upstream_dns", {}).get("upstream_servers"): + errors.append("upstream_dns.upstream_servers is missing or empty.") + + # -- WAN / LAN interfaces -------------------------------------------------- + gen = data.get("general", {}) + wan = gen.get("wan_interface", "") + lan = gen.get("lan_interface", "") + if not wan: + errors.append("general.wan_interface is missing or empty.") + if not lan: + errors.append("general.lan_interface is missing or empty.") + if wan and lan: + available_interfaces = set() + try: + available_interfaces = set(os.listdir('/sys/class/net')) + except Exception: + pass + if available_interfaces: + if wan not in available_interfaces: + errors.append(f"general.wan_interface: '{wan}' does not exist on this system.") + if lan not in available_interfaces: + errors.append(f"general.lan_interface: '{lan}' does not exist on this system.") + if wan == lan: + errors.append(f"general.wan_interface and general.lan_interface must be different (both set to '{wan}').") + + # -- Blocklist library ----------------------------------------------------- + blocklists_by_name = {} + for idx, bl in enumerate(data.get("blocklists", [])): + name = bl.get("name", "") + label = f"blocklists[{idx}] '{name}'" + for field in ("name", "description", "save_as", "url", "format"): + if not bl.get(field): + errors.append(f"{label}: missing or empty field '{field}'.") + if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS: + errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.") + if name: + if name in blocklists_by_name: + errors.append(f"{label}: duplicate blocklist name '{name}'.") + else: + blocklists_by_name[name] = bl + + # -- Per-VLAN validation --------------------------------------------------- + vlan_networks = {} # iface -> IPv4Network (used for NAT section) + + for vlan, iface in zip(data.get("vlans", []), vlan_ifaces): + vlan_id = vlan.get("vlan_id") + name = vlan.get("name", "?") + label = f"vlan '{name}' (id={vlan_id})" + + if name in seen_names: + errors.append(f"{label}: duplicate vlan name '{name}' " + f"(also used by id={seen_names[name]}).") + else: + seen_names[name] = vlan_id + + if vlan_id in seen_vlan_ids: + errors.append(f"{label}: duplicate vlan_id {vlan_id} " + f"(also used by '{seen_vlan_ids[vlan_id]}').") + else: + seen_vlan_ids[vlan_id] = name + + if iface in seen_interfaces: + errors.append(f"{label}: duplicate interface '{iface}' " + f"(also used by '{seen_interfaces[iface]}').") + else: + seen_interfaces[iface] = name + + if vlan.get("mdns_reflection") is True and is_wg(vlan): + errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.") + + if is_wg(vlan): + # -- vpn_information ----------------------------------------------- + vpi = vlan.get("vpn_information") + if not isinstance(vpi, dict): + errors.append(f"{label}: vpn_information must be a plain object.") + vpi = {} + else: + lp = vpi.get("listen_port") + if int_range(lp, 1, 65535) is None: + errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.") + elif lp in seen_listen_ports: + errors.append(f"{label}: vpn_information.listen_port {lp} is already used by " + f"'{seen_listen_ports[lp]}'.") + else: + seen_listen_ports[lp] = name + + # -- subnet/subnet_mask -------------------------------------------- + for field in ("subnet", "subnet_mask"): + if not vlan.get(field): + errors.append(f"{label}: missing required field '{field}'.") + wg_net = None + if vlan.get("subnet") and vlan.get("subnet_mask"): + try: + wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) + vlan_networks[iface] = wg_net + except ValueError as e: + errors.append(f"{label}: invalid subnet/subnet_mask: {e}") + + # -- server_identities --------------------------------------------- + if not vlan.get("server_identities"): + errors.append(f"{label}: server_identities is empty or missing.") + identity_ips = [] + for idx, ident in enumerate(vlan.get("server_identities", [])): + ip_str = ident.get("ip", "") + ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'" + if not ip_str: + errors.append(f"{ilabel}: missing 'ip' field.") + continue + if not ipv4(ip_str): + errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.") + continue + ip_addr = ipaddress.IPv4Address(ip_str) + if wg_net and ip_addr not in wg_net: + errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.") + else: + identity_ips.append(ip_addr) + + # -- vpn_information.explicit_overrides ---------------------------- + eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {} + if not isinstance(eo, dict): + errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.") + else: + gw = eo.get("gateway", "") + if gw: + if not ipv4(gw): + errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.") + else: + gw_ip = ipaddress.IPv4Address(gw) + if identity_ips and gw_ip not in identity_ips: + errors.append( + f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match " + f"any server_identity IP. Must be one of: " + f"{[str(ip) for ip in identity_ips]}." + ) + dns = eo.get("dns_server", "") + if dns and not ipv4(dns): + errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.") + mtu = eo.get("mtu", "") + if mtu and int_range(mtu, 576, 9000) is None: + errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.") + domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else "" + if domain_val and not domainname(domain_val): + errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.") + + # -- peers --------------------------------------------------------- + seen_peer_names = {} + seen_peer_ips = {} + for pidx, peer in enumerate(vlan.get("peers", [])): + pname = peer.get("name", "") + plabel = f"{label} peer[{pidx}] '{pname}'" + if not pname: + errors.append(f"{plabel}: missing 'name' field.") + elif pname in seen_peer_names: + errors.append(f"{plabel}: duplicate peer name '{pname}'.") + else: + seen_peer_names[pname] = pidx + if not peer.get("public_key"): + errors.append(f"{plabel}: missing 'public_key' field.") + pip_str = peer.get("ip", "") + if not pip_str: + errors.append(f"{plabel}: missing 'ip' field.") + elif not ipv4(pip_str): + errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.") + else: + pip = ipaddress.IPv4Address(pip_str) + if wg_net and pip not in wg_net: + errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.") + if pip in identity_ips: + errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.") + if pip_str in seen_peer_ips: + errors.append( + f"{plabel}: duplicate peer ip '{pip_str}' " + f"(also used by peer '{seen_peer_ips[pip_str]}')." + ) + else: + seen_peer_ips[pip_str] = pname + continue + + if not vlan.get("server_identities"): + errors.append(f"{label}: server_identities is empty or missing.") + continue + + for field in ("subnet", "subnet_mask"): + if not vlan.get(field): + errors.append(f"{label}: missing required top-level field '{field}'.") + if not vlan.get("subnet") or not vlan.get("subnet_mask"): + continue + + try: + network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) + vlan_networks[iface] = network + except ValueError as e: + errors.append(f"{label}: invalid subnet/subnet_mask: {e}") + continue + + d = vlan.get("dhcp_information", {}) + required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"} + missing = required_dhcp - set(d.keys()) + if missing: + errors.append(f"{label}: missing dhcp_information fields: {missing}") + continue + + def check_ip(field_label, ip_str, allow_none=False): + if ip_str is None: + if not allow_none: + errors.append(f"{label}: {field_label} is null/missing.") + return None + if not ipv4(ip_str): + errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.") + return None + addr = ipaddress.IPv4Address(ip_str) + if addr not in network: + errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.") + return addr + + identity_ips = [] + for idx, ident in enumerate(vlan["server_identities"]): + ip = check_ip( + f"server_identities[{idx}] '{ident.get('description', '?')}'", + ident.get("ip") + ) + if ip: + identity_ips.append(ip) + + # -- Validate explicit_overrides --------------------------------------- + eo = d.get("explicit_overrides", {}) + if not isinstance(eo, dict): + errors.append(f"{label}: explicit_overrides must be a plain object.") + else: + gw = eo.get("gateway", "") + if gw: + gw_ip = check_ip("explicit_overrides.gateway", gw) + if gw_ip and gw_ip not in identity_ips: + errors.append( + f"{label}: explicit_overrides.gateway '{gw}' does not match " + f"any server_identity IP. Must be one of: " + f"{[str(ip) for ip in identity_ips]}." + ) + dns = eo.get("dns_server", "") + if dns: + check_ip("explicit_overrides.dns_server", dns) + ntp = eo.get("ntp_server", "") + if ntp: + check_ip("explicit_overrides.ntp_server", ntp) + + pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"]) + pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"]) + + if pool_start and pool_end and pool_start > pool_end: + errors.append( + f"{label}: dynamic_pool_start '{pool_start}' is greater than " + f"dynamic_pool_end '{pool_end}'." + ) + + if pool_start and pool_end: + for ip in identity_ips: + if pool_start <= ip <= pool_end: + errors.append( + f"{label}: server_identity '{ip}' falls inside the dynamic " + f"pool ({pool_start} - {pool_end})." + ) + + seen_res_ips = {} + seen_res_macs = {} + for r in vlan.get("reservations", []): + rdesc = r.get("description", "?") + rmac = r.get("mac", "").lower().strip() + + if is_dynamic_ip(r): + rip = None + else: + rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip")) + + if rip: + if pool_start and pool_end and pool_start <= rip <= pool_end: + errors.append( + f"{label}: reservation '{rdesc}' ip '{rip}' falls inside " + f"the dynamic pool ({pool_start} - {pool_end})." + ) + rip_str = str(rip) + if rip_str in seen_res_ips: + # Allow same IP for different MACs (multi-interface device) + if rmac and rmac in seen_res_ips[rip_str]: + errors.append( + f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' " + f"duplicates '{seen_res_ips[rip_str][rmac]}'." + ) + else: + seen_res_ips[rip_str][rmac] = rdesc + else: + seen_res_ips[rip_str] = {rmac: rdesc} + if rip in identity_ips: + errors.append( + f"{label}: reservation '{rdesc}' ip '{rip}' conflicts " + f"with a server_identity." + ) + + if rmac: + if rmac in seen_res_macs: + errors.append( + f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates " + f"'{seen_res_macs[rmac]}'." + ) + else: + seen_res_macs[rmac] = rdesc + + for bl_name in vlan.get("use_blocklists", []): + if bl_name not in blocklists_by_name: + errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.") + + # -- NAT / firewall validation --------------------------------------------- + valid_protos = VALID_PROTOCOLS + known_interfaces = set(seen_interfaces.keys()) + + def nat_check_port(label, port): + if int_range(port, 1, 65535) is None: + errors.append(f"{label}: '{port}' is not a valid port number (1-65535).") + + def nat_check_ip(label, ip_str): + if not ipv4(ip_str): + errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.") + return None + return ipaddress.IPv4Address(ip_str) + + def nat_check_ip_in_network(label, ip_str, network): + ip = nat_check_ip(label, ip_str) + if ip and ip not in network: + errors.append(f"{label}: '{ip_str}' is not within subnet {network}.") + + for vlan, iface in zip(data.get("vlans", []), vlan_ifaces): + name = vlan.get("name", "?") + net = vlan_networks.get(iface) + + for r in vlan.get("port_wrangling", []): + desc = r.get("description", "?") + label = f"vlan '{name}' port_wrangling '{desc}'" + if r.get("protocol") not in valid_protos: + errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " + f"Must be tcp, udp, or both.") + nat_check_port(f"{label} dest_port", r.get("dest_port")) + if net: + nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net) + + # -- port_forwarding validation (top-level) -------------------------------- + for idx, r in enumerate(data.get("port_forwarding", [])): + desc = r.get("description", "?") + label = f"port_forwarding[{idx}] '{desc}'" + if r.get("protocol") not in valid_protos: + errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " + f"Must be tcp, udp, or both.") + nat_check_port(f"{label} dest_port", r.get("dest_port")) + nat_check_port(f"{label} nat_port", r.get("nat_port")) + nat_check_ip(f"{label} nat_ip", r.get("nat_ip", "")) + + for r in data.get("inter_vlan_exceptions", []): + desc = r.get("description", "?") + label = f"inter_vlan_exceptions '{desc}'" + if r.get("protocol") not in valid_protos: + errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " + f"Must be tcp, udp, or both.") + if "src_ip_or_subnet" not in r: + errors.append(f"{label}: missing field 'src_ip_or_subnet'.") + else: + val = r["src_ip_or_subnet"] + if not ipv4_or_cidr(val): + errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid " + f"IPv4 address or network.") + dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "") + if not dst: + errors.append(f"{label}: missing field 'dst_ip_or_subnet'.") + else: + if not ipv4_or_cidr(dst): + errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid " + f"IPv4 address or network.") + if r.get("dst_port") is not None: + nat_check_port(f"{label} dst_port", r.get("dst_port")) + + # -- radius_default uniqueness check --------------------------------------- + defaults = [v["name"] for v in data.get("vlans", []) if v.get("radius_default") is True] + if len(defaults) > 1: + errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). " + f"Only one VLAN may be the RADIUS default.") + + # -- host_overrides validation --------------------------------------------- + all_vlan_nets = list(vlan_networks.values()) + for idx, entry in enumerate(data.get("host_overrides", [])): + lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'" + if not entry.get("host"): + errors.append(f"{lbl}: missing 'host' field.") + ip_str = entry.get("ip", "") + if not ip_str: + errors.append(f"{lbl}: missing 'ip' field.") + elif not ipv4(ip_str): + errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.") + else: + ip_addr = ipaddress.IPv4Address(ip_str) + if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets): + errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.") + + # -- banned_ips validation ------------------------------------------------- + for idx, entry in enumerate(data.get("banned_ips", [])): + ip_val = entry.get("ip", "") + lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'" + if not ip_val: + errors.append(f"{lbl}: missing 'ip' field.") + continue + if not banned_ip(ip_val): + errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.") + + return errors