""" validation.py -- Shared structural validators for config.json fields. Lives alongside core.py in ~/routlin/ and is volume-mounted into the routlin-dash container at /app/validation.py. Importable by both core.py (router host) and the Flask app directly. Convention: primitive validators accept a raw string and return the normalised valid value, or '' / None if the input is invalid. """ import ipaddress import os import re VALID_PROTOCOLS = {'tcp', 'udp', 'both'} VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'} VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns') # =================================================================== # IP / CIDR # =================================================================== def ip(value): """Return value if it is a valid IPv4 or IPv6 address, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.ip_address(v) return v except ValueError: return '' def ip_or_cidr(value): """Return value if it is a valid IPv4/IPv6 address or CIDR network, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.ip_address(v) return v except ValueError: pass try: ipaddress.ip_network(v, strict=False) return v except ValueError: return '' def ipv4(value): """Return value if it is a valid IPv4 address, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.IPv4Address(v) return v except ValueError: return '' def ipv4_or_cidr(value): """Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.IPv4Address(v) return v except ValueError: pass try: ipaddress.IPv4Network(v, strict=False) return v except ValueError: return '' # =================================================================== # Port # =================================================================== def port(value): """Return port as string if valid 1-65535, else ''.""" try: p = int(re.sub(r'[^0-9]', '', str(value))) if 1 <= p <= 65535: return str(p) except (ValueError, TypeError): pass return '' # =================================================================== # Time # =================================================================== def time_24h(value): """Return value if it is a valid 24-hour HH:MM time string, else ''.""" if not value: return '' v = str(value).strip() if re.fullmatch(r'([01]\d|2[0-3]):[0-5]\d', v): return v return '' # =================================================================== # Integer range # =================================================================== def int_range(value, lo, hi): """Return value as int if it is an integer within [lo, hi], else None. lo and hi may be None to indicate no lower or upper bound respectively. """ try: v = int(str(value).strip()) if (lo is None or v >= lo) and (hi is None or v <= hi): return v except (ValueError, TypeError): pass return None # =================================================================== # Domain name # =================================================================== def domainname(value): """Return value if it is a valid domain name, else ''. Rules: labels separated by dots; each label contains only letters, digits, and hyphens; no label may start or end with a hyphen; no consecutive dots; total length <= 253. """ if not value: return '' v = str(value).strip().lower() if len(v) > 253: return '' if '..' in v or v.startswith('.') or v.endswith('.'): return '' labels = v.split('.') for label in labels: if not label: return '' if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label): return '' return v # =================================================================== # Banned-IP pattern # =================================================================== def banned_ip(value): """ Return value if it is a valid banned_ip pattern, else ''. Accepted formats (mirrors core.py expand_banned_ip): IPv4: Single address 192.0.2.1 CIDR 192.0.2.0/24 Wildcard octet 192.0.2.* Octet range 192.0.2.10-20 (combinations that expand to <=1024 entries are accepted) IPv6: Single address 2001:db8::1 CIDR 2001:db8::/32 Trailing wildcard 2001:db8:c17:* """ if not value: return '' v = str(value).strip() try: _check_banned_ip(v) return v except (ValueError, TypeError): return '' def _check_banned_ip(ip_str): if ':' in ip_str: _check_banned_ipv6(ip_str) else: _check_banned_ipv4(ip_str) def _check_banned_ipv4(ip_str): if '/' in ip_str: ipaddress.IPv4Network(ip_str, strict=False) return parts = ip_str.split('.') if len(parts) != 4: raise ValueError(f"Expected 4 octets: {ip_str!r}") def parse_octet(s): if s == '*': return (0, 255) if '-' in s: a, b = s.split('-', 1) lo, hi = int(a), int(b) if not (0 <= lo <= hi <= 255): raise ValueError(f"Invalid octet range {s!r}") return (lo, hi) v = int(s) if not 0 <= v <= 255: raise ValueError(f"Octet {v} out of 0-255") return (v, v) ranges = [parse_octet(p) for p in parts] trailing = 0 for lo, hi in reversed(ranges): if lo == 0 and hi == 255: trailing += 1 else: break total = 1 for lo, hi in ranges[:4 - trailing]: total *= (hi - lo + 1) if total > 1024: raise ValueError(f"Pattern expands to {total} entries (limit 1024); use CIDR") def _check_banned_ipv6(ip_str): if '/' in ip_str: ipaddress.IPv6Network(ip_str, strict=False) return if '*' not in ip_str: ipaddress.IPv6Address(ip_str) return if not ip_str.endswith(':*'): raise ValueError(f"Unsupported IPv6 wildcard: {ip_str!r}; use 'prefix:*' or CIDR") prefix_part = ip_str[:-2] if '::' in prefix_part: left, right = prefix_part.split('::', 1) lg = [g for g in left.split(':') if g] if left else [] rg = [g for g in right.split(':') if g] if right else [] zeros = 8 - len(lg) - len(rg) - 1 if zeros < 0: raise ValueError(f"Too many groups in {ip_str!r}") groups = lg + ['0000'] * zeros + rg else: groups = [g for g in prefix_part.split(':') if g] if not (1 <= len(groups) <= 7): raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}") # =================================================================== # Cross-VLAN uniqueness checks (callable independently by action.py) # =================================================================== def check_vlan_name_unique(vlans, name, exclude_idx=None): """Return error string if name is already used by another VLAN, else None.""" for i, v in enumerate(vlans): if exclude_idx is not None and i == exclude_idx: continue if v.get('name') == name: return f'VLAN name "{name}" is already in use (VLAN ID {v.get("vlan_id")}).' return None def check_vlan_id_unique(vlans, vlan_id, exclude_idx=None): """Return error string if vlan_id is already used by another VLAN, else None.""" for i, v in enumerate(vlans): if exclude_idx is not None and i == exclude_idx: continue if v.get('vlan_id') == vlan_id: return f'VLAN ID {vlan_id} is already in use (VLAN "{v.get("name")}").' return None def check_radius_default_unique(vlans, exclude_idx=None): """Return error string if another VLAN (not exclude_idx) is already the RADIUS default, else None.""" for i, v in enumerate(vlans): if exclude_idx is not None and i == exclude_idx: continue if v.get('radius_default'): return f'VLAN "{v.get("name", "?")}" is already the RADIUS default. Only one VLAN may be the default.' return None def check_mdns_vpn(is_vpn, mdns_reflection): """Return error string if mDNS reflection is enabled on a VPN VLAN, else None.""" if is_vpn and mdns_reflection: return 'mDNS reflection is not supported on VPN VLANs.' return None # =================================================================== # Blocklist checks (callable independently by action.py) # =================================================================== def check_blocklist_name_unique(blocklists, name, exclude_idx=None): """Return error string if name is already used by another blocklist entry, else None.""" for i, b in enumerate(blocklists): if exclude_idx is not None and i == exclude_idx: continue if b.get('name') == name: return f'A blocklist named "{name}" already exists.' return None # =================================================================== # DHCP reservation checks (callable independently by action.py) # =================================================================== def check_reservation_ip_conflicts(ip, vlan): """Return error string if ip conflicts with the VLAN's pool range or server identities, else None.""" if not ip or ip == 'dynamic': return None dhcp = vlan.get('dhcp_information', {}) pool_start = dhcp.get('dynamic_pool_start') pool_end = dhcp.get('dynamic_pool_end') if pool_start and pool_end: try: if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip) <= ipaddress.IPv4Address(pool_end)): return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).' except Exception: pass identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')} if ip in identity_ips: return f'{ip} is already assigned as a server identity IP.' return None # =================================================================== # Host override checks (callable independently by action.py) # =================================================================== def check_host_override_ip_in_vlans(ip_str, cfg): """Return error string if ip_str is not within any configured VLAN subnet, else None.""" try: addr = ipaddress.IPv4Address(ip_str) except ValueError: return f"'{ip_str}' is not a valid IPv4 address." nets = [] for v in cfg.get('vlans', []): subnet = v.get('subnet', '') mask = v.get('subnet_mask', '') if subnet and mask: try: nets.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False)) except ValueError: pass if not nets: return None if not any(addr in net for net in nets): return 'IP address does not fall within any configured VLAN subnet.' return None # =================================================================== # VPN uniqueness checks (callable independently by action.py) # =================================================================== def check_vpn_listen_port_unique(vlans, listen_port, exclude_vlan_name=None): """Return error string if listen_port is already used by another VPN VLAN, else None.""" for v in vlans: if not v.get('is_vpn'): continue if exclude_vlan_name is not None and v.get('name') == exclude_vlan_name: continue if v.get('vpn_information', {}).get('listen_port') == listen_port: return f'Listen port {listen_port} is already used by another VPN VLAN.' return None def check_peer_name_unique(peers, name, exclude_idx=None): """Return error string if name is already used by another peer, else None.""" for i, p in enumerate(peers): if exclude_idx is not None and i == exclude_idx: continue if p.get('name') == name: return f'A peer named "{name}" already exists.' return None # =================================================================== # Physical interface checks (callable independently by action.py) # =================================================================== def check_wan_lan_unique(wan, lan): """Return error string if WAN and LAN resolve to the same interface, else None.""" if wan and lan and wan == lan: return 'WAN and LAN interfaces must be different.' return None def check_interface_exists(iface, available): """Return error string if iface is absent from the available set (when non-empty), else None.""" if available and iface not in available: return f"Interface '{iface}' does not exist on this system." return None # =================================================================== # VLAN / interface helpers (shared with core.py apply logic) # =================================================================== def is_wg(vlan): return vlan.get("is_vpn", False) def is_dynamic_ip(r): """Return True if a reservation has no pinned IP (DHCP assigns from pool).""" ip = r.get("ip", "dynamic") return ip in ("", "dynamic") or ip is None def derive_vlan_id(subnet, prefix): """Return VLAN ID (1-4094) derived from the active octet of the network address, or None.""" try: network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False) octets = list(network.network_address.packed) byte_idx = (int(prefix) - 1) // 8 vlan_id = octets[byte_idx] if 1 <= vlan_id <= 4094: return vlan_id return None except Exception: return None def derive_interface(vlan, data): """Derive the interface name for a VLAN without mutating data.""" lan = data.get('network_interfaces', {}).get('lan_interface', 'eth0') if is_wg(vlan): wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)] wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)) idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0) return f'wg{idx}' vid = vlan.get('vlan_id') return lan if vid == 1 else f'{lan}.{vid}' # =================================================================== # Full config validation (shared with core.py --apply) # =================================================================== def validate_config(data): """Validate config.json structure and content. Returns list of error strings.""" errors = [] seen_interfaces = {} seen_listen_ports = {} # Pre-compute per-VLAN vlan_ids and interface names without mutating data _lan = data.get("network_interfaces", {}).get("lan_interface", "eth0") _all_vlans = data.get("vlans", []) _stored_ids = [_v.get("vlan_id") for _v in _all_vlans] _wg_sorted = sorted( [(i, _stored_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)], key=lambda x: (x[1] is None, x[1] or 0) ) _wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)} vlan_ifaces = [] for i, _vlan in enumerate(_all_vlans): if is_wg(_vlan): vlan_ifaces.append(f"wg{_wg_order[i]}") else: _vid = _stored_ids[i] vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}") # upstream_dns block ============================================ if not data.get("upstream_dns", {}).get("upstream_servers"): errors.append("upstream_dns.upstream_servers is missing or empty.") # WAN / LAN interfaces ========================================== gen = data.get("network_interfaces", {}) wan = gen.get("wan_interface", "") lan = gen.get("lan_interface", "") if not wan: errors.append("network_interfaces.wan_interface is missing or empty.") if not lan: errors.append("network_interfaces.lan_interface is missing or empty.") if wan and lan: available_interfaces = set() try: available_interfaces = set(os.listdir('/sys/class/net')) except Exception: pass err = check_interface_exists(wan, available_interfaces) if err: errors.append(f"network_interfaces.wan_interface: {err}") err = check_interface_exists(lan, available_interfaces) if err: errors.append(f"network_interfaces.lan_interface: {err}") err = check_wan_lan_unique(wan, lan) if err: errors.append(f"network_interfaces: {err}") # Blocklist library ============================================= blocklists_by_name = {} for idx, bl in enumerate(data.get("dns_blocking", {}).get("blocklists", [])): name = bl.get("name", "") label = f"dns_blocking.blocklists[{idx}] '{name}'" for field in ("name", "description", "save_as", "url", "format"): if not bl.get(field): errors.append(f"{label}: missing or empty field '{field}'.") if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS: errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.") if name: err = check_blocklist_name_unique( data.get("dns_blocking", {}).get("blocklists", []), name, exclude_idx=idx ) if err: errors.append(f"{label}: {err}") else: blocklists_by_name[name] = bl # Per-VLAN validation =========================================== vlan_networks = {} # iface -> IPv4Network (used for NAT section) for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)): vlan_id = _stored_ids[i] name = vlan.get("name", "?") label = f"vlan '{name}' (id={vlan_id})" if vlan_id is None or not isinstance(vlan_id, int) or not (1 <= vlan_id <= 4094): errors.append(f"vlan '{name}': vlan_id must be an integer 1-4094 (got {vlan_id!r}).") err = check_vlan_name_unique(_all_vlans, name, exclude_idx=i) if err: errors.append(f"{label}: {err}") err = check_vlan_id_unique(_all_vlans, vlan_id, exclude_idx=i) if err: errors.append(f"{label}: {err}") if iface in seen_interfaces: errors.append(f"{label}: duplicate interface '{iface}' " f"(also used by '{seen_interfaces[iface]}').") else: seen_interfaces[iface] = name err = check_mdns_vpn(is_wg(vlan), vlan.get('mdns_reflection')) if err: errors.append(f"{label}: {err}") if is_wg(vlan): # vpn_information ======================================= vpi = vlan.get("vpn_information") if not isinstance(vpi, dict): errors.append(f"{label}: vpn_information must be a plain object.") vpi = {} else: lp = vpi.get("listen_port") if int_range(lp, 1, 65535) is None: errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.") elif lp in seen_listen_ports: errors.append(f"{label}: vpn_information.listen_port {lp} is already used by " f"'{seen_listen_ports[lp]}'.") else: seen_listen_ports[lp] = name # subnet/subnet_mask ==================================== for field in ("subnet", "subnet_mask"): if not vlan.get(field): errors.append(f"{label}: missing required field '{field}'.") wg_net = None if vlan.get("subnet") and vlan.get("subnet_mask"): try: wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) vlan_networks[iface] = wg_net except ValueError as e: errors.append(f"{label}: invalid subnet/subnet_mask: {e}") # server_identities ===================================== if not vlan.get("server_identities"): errors.append(f"{label}: server_identities is empty or missing.") identity_ips = [] for idx, ident in enumerate(vlan.get("server_identities", [])): ip_str = ident.get("ip", "") ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'" if not ip_str: errors.append(f"{ilabel}: missing 'ip' field.") continue if not ipv4(ip_str): errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.") continue ip_addr = ipaddress.IPv4Address(ip_str) if wg_net and ip_addr not in wg_net: errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.") else: identity_ips.append(ip_addr) # vpn_information.explicit_overrides ==================== eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {} if not isinstance(eo, dict): errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.") else: gw = eo.get("gateway", "") if gw: if not ipv4(gw): errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.") else: gw_ip = ipaddress.IPv4Address(gw) if identity_ips and gw_ip not in identity_ips: errors.append( f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match " f"any server_identity IP. Must be one of: " f"{[str(ip) for ip in identity_ips]}." ) dns = eo.get("dns_servers", "") if dns and not ipv4(dns): errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.") mtu = eo.get("mtu", "") if mtu and int_range(mtu, 576, 9000) is None: errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.") domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else "" if domain_val and not domainname(domain_val): errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.") # peers ================================================= seen_peer_names = {} seen_peer_ips = {} for pidx, peer in enumerate(vlan.get("peers", [])): pname = peer.get("name", "") plabel = f"{label} peer[{pidx}] '{pname}'" if not pname: errors.append(f"{plabel}: missing 'name' field.") elif pname in seen_peer_names: errors.append(f"{plabel}: duplicate peer name '{pname}'.") else: seen_peer_names[pname] = pidx if not peer.get("public_key"): errors.append(f"{plabel}: missing 'public_key' field.") pip_str = peer.get("ip", "") if not pip_str: errors.append(f"{plabel}: missing 'ip' field.") elif not ipv4(pip_str): errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.") else: pip = ipaddress.IPv4Address(pip_str) if wg_net and pip not in wg_net: errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.") if pip in identity_ips: errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.") if pip_str in seen_peer_ips: errors.append( f"{plabel}: duplicate peer ip '{pip_str}' " f"(also used by peer '{seen_peer_ips[pip_str]}')." ) else: seen_peer_ips[pip_str] = pname continue if not vlan.get("server_identities"): errors.append(f"{label}: server_identities is empty or missing.") continue for field in ("subnet", "subnet_mask"): if not vlan.get(field): errors.append(f"{label}: missing required top-level field '{field}'.") if not vlan.get("subnet") or not vlan.get("subnet_mask"): continue try: network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) vlan_networks[iface] = network except ValueError as e: errors.append(f"{label}: invalid subnet/subnet_mask: {e}") continue d = vlan.get("dhcp_information", {}) required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"} missing = required_dhcp - set(d.keys()) if missing: errors.append(f"{label}: missing dhcp_information fields: {missing}") continue def check_ip(field_label, ip_str, allow_none=False): if ip_str is None: if not allow_none: errors.append(f"{label}: {field_label} is null/missing.") return None if not ipv4(ip_str): errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.") return None addr = ipaddress.IPv4Address(ip_str) if addr not in network: errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.") return addr identity_ips = [] for idx, ident in enumerate(vlan["server_identities"]): ip = check_ip( f"server_identities[{idx}] '{ident.get('description', '?')}'", ident.get("ip") ) if ip: identity_ips.append(ip) # Validate explicit_overrides =============================== eo = d.get("explicit_overrides", {}) if not isinstance(eo, dict): errors.append(f"{label}: explicit_overrides must be a plain object.") else: gw = eo.get("gateway", "") if gw: gw_ip = check_ip("explicit_overrides.gateway", gw) if gw_ip and gw_ip not in identity_ips: errors.append( f"{label}: explicit_overrides.gateway '{gw}' does not match " f"any server_identity IP. Must be one of: " f"{[str(ip) for ip in identity_ips]}." ) dns = eo.get("dns_servers", "") if dns: for _ip in (dns if isinstance(dns, list) else [dns]): check_ip("explicit_overrides.dns_servers", _ip) ntp = eo.get("ntp_servers", "") if ntp: for _ip in (ntp if isinstance(ntp, list) else [ntp]): check_ip("explicit_overrides.ntp_servers", _ip) pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"]) pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"]) if pool_start and pool_end and pool_start > pool_end: errors.append( f"{label}: dynamic_pool_start '{pool_start}' is greater than " f"dynamic_pool_end '{pool_end}'." ) if pool_start and pool_end: for ip in identity_ips: if pool_start <= ip <= pool_end: errors.append( f"{label}: server_identity '{ip}' falls inside the dynamic " f"pool ({pool_start} - {pool_end})." ) seen_res_ips = {} seen_res_macs = {} seen_res_hostnames = {} vlan_name_key = vlan.get("name", "") for r in [r for r in data.get("dhcp_reservations", []) if r.get("vlan") == vlan_name_key]: rdesc = r.get("description", "?") rmac = r.get("mac", "").lower().strip() if is_dynamic_ip(r): rip = None else: rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip")) if rip: if pool_start and pool_end and pool_start <= rip <= pool_end: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' falls inside " f"the dynamic pool ({pool_start} - {pool_end})." ) rip_str = str(rip) if rip_str in seen_res_ips: # Allow same IP for different MACs (multi-interface device) if rmac and rmac in seen_res_ips[rip_str]: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' " f"duplicates '{seen_res_ips[rip_str][rmac]}'." ) else: seen_res_ips[rip_str][rmac] = rdesc else: seen_res_ips[rip_str] = {rmac: rdesc} if rip in identity_ips: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' conflicts " f"with a server_identity." ) if rmac: if rmac in seen_res_macs: errors.append( f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates " f"'{seen_res_macs[rmac]}'." ) else: seen_res_macs[rmac] = rdesc rhost = r.get("hostname", "").strip().lower() if rhost: if rhost in seen_res_hostnames: errors.append( f"{label}: reservation '{rdesc}' hostname '{rhost}' duplicates " f"'{seen_res_hostnames[rhost]}'." ) else: seen_res_hostnames[rhost] = rdesc for bl_name in vlan.get("use_blocklists", []): if bl_name not in blocklists_by_name: errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.") # NAT / firewall validation ===================================== valid_protos = VALID_PROTOCOLS known_interfaces = set(seen_interfaces.keys()) def nat_check_port(label, port): if int_range(port, 1, 65535) is None: errors.append(f"{label}: '{port}' is not a valid port number (1-65535).") def nat_check_ip(label, ip_str): if not ipv4(ip_str): errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.") return None return ipaddress.IPv4Address(ip_str) def nat_check_ip_in_network(label, ip_str, network): ip = nat_check_ip(label, ip_str) if ip and ip not in network: errors.append(f"{label}: '{ip_str}' is not within subnet {network}.") # port_wrangling validation (top-level) ========================= _vlan_name_to_net = { v.get("name", ""): vlan_networks.get(iface) for v, iface in zip(data.get("vlans", []), vlan_ifaces) } for idx, r in enumerate(data.get("port_wrangling", [])): desc = r.get("description", "?") vlan_name = r.get("vlan", "?") label = f"port_wrangling[{idx}] (vlan '{vlan_name}') '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") nat_check_port(f"{label} dest_port", r.get("dest_port")) net = _vlan_name_to_net.get(vlan_name) if net: nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net) # port_forwarding validation (top-level) ======================== for idx, r in enumerate(data.get("port_forwarding", [])): desc = r.get("description", "?") label = f"port_forwarding[{idx}] '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") nat_check_port(f"{label} dest_port", r.get("dest_port")) nat_check_port(f"{label} nat_port", r.get("nat_port")) nat_check_ip(f"{label} nat_ip", r.get("nat_ip", "")) for r in data.get("inter_vlan_exceptions", []): desc = r.get("description", "?") label = f"inter_vlan_exceptions '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") if "src_ip_or_subnet" not in r: errors.append(f"{label}: missing field 'src_ip_or_subnet'.") else: val = r["src_ip_or_subnet"] if not ipv4_or_cidr(val): errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid " f"IPv4 address or network.") dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "") if not dst: errors.append(f"{label}: missing field 'dst_ip_or_subnet'.") else: if not ipv4_or_cidr(dst): errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid " f"IPv4 address or network.") if r.get("dest_port_start"): nat_check_port(f"{label} dest_port_start", r.get("dest_port_start")) if r.get("dest_port_end"): nat_check_port(f"{label} dest_port_end", r.get("dest_port_end")) min_p, max_p = r.get("dest_port_start", ""), r.get("dest_port_end", "") if min_p and max_p: try: if int(min_p) > int(max_p): errors.append(f"{label}: dest_port_start {min_p} is greater than dest_port_end {max_p}.") except (ValueError, TypeError): pass # radius_default uniqueness check =============================== _rd_idx = next( (i for i, v in enumerate(data.get('vlans', [])) if v.get('radius_default')), None ) if _rd_idx is not None: err = check_radius_default_unique(data.get('vlans', []), exclude_idx=_rd_idx) if err: defaults = [v.get('name', '?') for v in data.get('vlans', []) if v.get('radius_default')] errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). " f"Only one VLAN may be the RADIUS default.") # RADIUS requires multiple VLANs ================================ non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)] non_wg_names = {v.get("name") for v in non_wg_vlans} has_radius_clients = any( r.get("radius_client") for r in data.get("dhcp_reservations", []) if r.get("vlan") in non_wg_names ) if has_radius_clients and len(non_wg_vlans) < 2: errors.append( "RADIUS clients are configured but only one non-VPN VLAN exists. " "Dynamic VLAN assignment requires at least two VLANs." ) # host_overrides validation ===================================== all_vlan_nets = list(vlan_networks.values()) for idx, entry in enumerate(data.get("host_overrides", [])): lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'" if not entry.get("host"): errors.append(f"{lbl}: missing 'host' field.") ip_str = entry.get("ip", "") if not ip_str: errors.append(f"{lbl}: missing 'ip' field.") elif not ipv4(ip_str): errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.") else: ip_addr = ipaddress.IPv4Address(ip_str) if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets): errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.") # banned_ips validation ========================================= for idx, entry in enumerate(data.get("banned_ips", [])): ip_val = entry.get("ip", "") lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'" if not ip_val: errors.append(f"{lbl}: missing 'ip' field.") continue if not banned_ip(ip_val): errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.") return errors