""" validation.py -- Shared structural validators for core.json fields. Lives alongside core.py in ~/routlin/ and is volume-mounted into the routlin-dash container at /app/validation.py. Importable by both core.py (router host) and the Flask app directly. Convention: primitive validators accept a raw string and return the normalised valid value, or '' / None if the input is invalid. """ import ipaddress import os import re VALID_PROTOCOLS = {'tcp', 'udp', 'both'} VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'} VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns') # =================================================================== # IP / CIDR # =================================================================== def ip(value): """Return value if it is a valid IPv4 or IPv6 address, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.ip_address(v) return v except ValueError: return '' def ip_or_cidr(value): """Return value if it is a valid IPv4/IPv6 address or CIDR network, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.ip_address(v) return v except ValueError: pass try: ipaddress.ip_network(v, strict=False) return v except ValueError: return '' def ipv4(value): """Return value if it is a valid IPv4 address, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.IPv4Address(v) return v except ValueError: return '' def ipv4_or_cidr(value): """Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''.""" if not value: return '' v = str(value).strip() try: ipaddress.IPv4Address(v) return v except ValueError: pass try: ipaddress.IPv4Network(v, strict=False) return v except ValueError: return '' # =================================================================== # Port # =================================================================== def port(value): """Return port as string if valid 1-65535, else ''.""" try: p = int(re.sub(r'[^0-9]', '', str(value))) if 1 <= p <= 65535: return str(p) except (ValueError, TypeError): pass return '' # =================================================================== # Time # =================================================================== def time_24h(value): """Return value if it is a valid 24-hour HH:MM time string, else ''.""" if not value: return '' v = str(value).strip() if re.fullmatch(r'([01]\d|2[0-3]):[0-5]\d', v): return v return '' # =================================================================== # Integer range # =================================================================== def int_range(value, lo, hi): """Return value as int if it is an integer within [lo, hi], else None. lo and hi may be None to indicate no lower or upper bound respectively. """ try: v = int(str(value).strip()) if (lo is None or v >= lo) and (hi is None or v <= hi): return v except (ValueError, TypeError): pass return None # =================================================================== # Domain name # =================================================================== def domainname(value): """Return value if it is a valid domain name, else ''. Rules: labels separated by dots; each label contains only letters, digits, and hyphens; no label may start or end with a hyphen; no consecutive dots; total length <= 253. """ if not value: return '' v = str(value).strip().lower() if len(v) > 253: return '' if '..' in v or v.startswith('.') or v.endswith('.'): return '' labels = v.split('.') for label in labels: if not label: return '' if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label): return '' return v # =================================================================== # Banned-IP pattern # =================================================================== def banned_ip(value): """ Return value if it is a valid banned_ip pattern, else ''. Accepted formats (mirrors core.py expand_banned_ip): IPv4: Single address 192.0.2.1 CIDR 192.0.2.0/24 Wildcard octet 192.0.2.* Octet range 192.0.2.10-20 (combinations that expand to <=1024 entries are accepted) IPv6: Single address 2001:db8::1 CIDR 2001:db8::/32 Trailing wildcard 2001:db8:c17:* """ if not value: return '' v = str(value).strip() try: _check_banned_ip(v) return v except (ValueError, TypeError): return '' def _check_banned_ip(ip_str): if ':' in ip_str: _check_banned_ipv6(ip_str) else: _check_banned_ipv4(ip_str) def _check_banned_ipv4(ip_str): if '/' in ip_str: ipaddress.IPv4Network(ip_str, strict=False) return parts = ip_str.split('.') if len(parts) != 4: raise ValueError(f"Expected 4 octets: {ip_str!r}") def parse_octet(s): if s == '*': return (0, 255) if '-' in s: a, b = s.split('-', 1) lo, hi = int(a), int(b) if not (0 <= lo <= hi <= 255): raise ValueError(f"Invalid octet range {s!r}") return (lo, hi) v = int(s) if not 0 <= v <= 255: raise ValueError(f"Octet {v} out of 0-255") return (v, v) ranges = [parse_octet(p) for p in parts] trailing = 0 for lo, hi in reversed(ranges): if lo == 0 and hi == 255: trailing += 1 else: break total = 1 for lo, hi in ranges[:4 - trailing]: total *= (hi - lo + 1) if total > 1024: raise ValueError(f"Pattern expands to {total} entries (limit 1024); use CIDR") def _check_banned_ipv6(ip_str): if '/' in ip_str: ipaddress.IPv6Network(ip_str, strict=False) return if '*' not in ip_str: ipaddress.IPv6Address(ip_str) return if not ip_str.endswith(':*'): raise ValueError(f"Unsupported IPv6 wildcard: {ip_str!r}; use 'prefix:*' or CIDR") prefix_part = ip_str[:-2] if '::' in prefix_part: left, right = prefix_part.split('::', 1) lg = [g for g in left.split(':') if g] if left else [] rg = [g for g in right.split(':') if g] if right else [] zeros = 8 - len(lg) - len(rg) - 1 if zeros < 0: raise ValueError(f"Too many groups in {ip_str!r}") groups = lg + ['0000'] * zeros + rg else: groups = [g for g in prefix_part.split(':') if g] if not (1 <= len(groups) <= 7): raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}") # =================================================================== # VLAN / interface helpers (shared with core.py apply logic) # =================================================================== def is_wg(vlan): return vlan.get("is_vpn", False) def is_dynamic_ip(r): """Return True if a reservation has no pinned IP (DHCP assigns from pool).""" ip = r.get("ip", "dynamic") return ip in ("", "dynamic") or ip is None def derive_vlan_id(subnet, prefix): """Return VLAN ID (1-4094) derived from the active octet of the network address, or None.""" try: network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False) octets = list(network.network_address.packed) byte_idx = (int(prefix) - 1) // 8 vlan_id = octets[byte_idx] if 1 <= vlan_id <= 4094: return vlan_id return None except Exception: return None def derive_interface(vlan, data): """Derive the interface name for a VLAN without mutating data.""" lan = data.get('network_interfaces', {}).get('lan_interface', 'eth0') if is_wg(vlan): wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)] wg_sorted = sorted( wg_vlans, key=lambda v: ( derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None, derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0, ) ) idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0) return f'wg{idx}' vid = derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24)) return lan if vid == 1 else f'{lan}.{vid}' # =================================================================== # Full config validation (shared with core.py --apply) # =================================================================== def validate_config(data): """Validate core.json structure and content. Returns list of error strings.""" errors = [] seen_vlan_ids = {} seen_interfaces = {} seen_names = {} seen_listen_ports = {} # Pre-compute per-VLAN vlan_ids and interface names without mutating data _lan = data.get("network_interfaces", {}).get("lan_interface", "eth0") _all_vlans = data.get("vlans", []) _derived_ids = [ derive_vlan_id(_v.get("subnet", ""), _v.get("subnet_mask", 24)) for _v in _all_vlans ] _wg_sorted = sorted( [(i, _derived_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)], key=lambda x: (x[1] is None, x[1] or 0) ) _wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)} vlan_ifaces = [] for i, _vlan in enumerate(_all_vlans): if is_wg(_vlan): vlan_ifaces.append(f"wg{_wg_order[i]}") else: _vid = _derived_ids[i] vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}") # -- upstream_dns block ---------------------------------------------------- if not data.get("upstream_dns", {}).get("upstream_servers"): errors.append("upstream_dns.upstream_servers is missing or empty.") # -- WAN / LAN interfaces -------------------------------------------------- gen = data.get("network_interfaces", {}) wan = gen.get("wan_interface", "") lan = gen.get("lan_interface", "") if not wan: errors.append("network_interfaces.wan_interface is missing or empty.") if not lan: errors.append("network_interfaces.lan_interface is missing or empty.") if wan and lan: available_interfaces = set() try: available_interfaces = set(os.listdir('/sys/class/net')) except Exception: pass if available_interfaces: if wan not in available_interfaces: errors.append(f"network_interfaces.wan_interface: '{wan}' does not exist on this system.") if lan not in available_interfaces: errors.append(f"network_interfaces.lan_interface: '{lan}' does not exist on this system.") if wan == lan: errors.append(f"network_interfaces.wan_interface and network_interfaces.lan_interface must be different (both set to '{wan}').") # -- Blocklist library ----------------------------------------------------- blocklists_by_name = {} for idx, bl in enumerate(data.get("dns_blocking", {}).get("blocklists", [])): name = bl.get("name", "") label = f"dns_blocking.blocklists[{idx}] '{name}'" for field in ("name", "description", "save_as", "url", "format"): if not bl.get(field): errors.append(f"{label}: missing or empty field '{field}'.") if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS: errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.") if name: if name in blocklists_by_name: errors.append(f"{label}: duplicate blocklist name '{name}'.") else: blocklists_by_name[name] = bl # -- Per-VLAN validation --------------------------------------------------- vlan_networks = {} # iface -> IPv4Network (used for NAT section) for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)): vlan_id = _derived_ids[i] name = vlan.get("name", "?") label = f"vlan '{name}' (id={vlan_id})" if name in seen_names: errors.append(f"{label}: duplicate vlan name '{name}' " f"(also used by id={seen_names[name]}).") else: seen_names[name] = vlan_id if vlan_id in seen_vlan_ids: errors.append(f"{label}: duplicate vlan_id {vlan_id} " f"(also used by '{seen_vlan_ids[vlan_id]}').") else: seen_vlan_ids[vlan_id] = name if iface in seen_interfaces: errors.append(f"{label}: duplicate interface '{iface}' " f"(also used by '{seen_interfaces[iface]}').") else: seen_interfaces[iface] = name if vlan.get("mdns_reflection") is True and is_wg(vlan): errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.") if is_wg(vlan): # -- vpn_information ----------------------------------------------- vpi = vlan.get("vpn_information") if not isinstance(vpi, dict): errors.append(f"{label}: vpn_information must be a plain object.") vpi = {} else: lp = vpi.get("listen_port") if int_range(lp, 1, 65535) is None: errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.") elif lp in seen_listen_ports: errors.append(f"{label}: vpn_information.listen_port {lp} is already used by " f"'{seen_listen_ports[lp]}'.") else: seen_listen_ports[lp] = name # -- subnet/subnet_mask -------------------------------------------- for field in ("subnet", "subnet_mask"): if not vlan.get(field): errors.append(f"{label}: missing required field '{field}'.") wg_net = None if vlan.get("subnet") and vlan.get("subnet_mask"): try: wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) vlan_networks[iface] = wg_net except ValueError as e: errors.append(f"{label}: invalid subnet/subnet_mask: {e}") # -- server_identities --------------------------------------------- if not vlan.get("server_identities"): errors.append(f"{label}: server_identities is empty or missing.") identity_ips = [] for idx, ident in enumerate(vlan.get("server_identities", [])): ip_str = ident.get("ip", "") ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'" if not ip_str: errors.append(f"{ilabel}: missing 'ip' field.") continue if not ipv4(ip_str): errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.") continue ip_addr = ipaddress.IPv4Address(ip_str) if wg_net and ip_addr not in wg_net: errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.") else: identity_ips.append(ip_addr) # -- vpn_information.explicit_overrides ---------------------------- eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {} if not isinstance(eo, dict): errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.") else: gw = eo.get("gateway", "") if gw: if not ipv4(gw): errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.") else: gw_ip = ipaddress.IPv4Address(gw) if identity_ips and gw_ip not in identity_ips: errors.append( f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match " f"any server_identity IP. Must be one of: " f"{[str(ip) for ip in identity_ips]}." ) dns = eo.get("dns_server", "") if dns and not ipv4(dns): errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.") mtu = eo.get("mtu", "") if mtu and int_range(mtu, 576, 9000) is None: errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.") domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else "" if domain_val and not domainname(domain_val): errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.") # -- peers --------------------------------------------------------- seen_peer_names = {} seen_peer_ips = {} for pidx, peer in enumerate(vlan.get("peers", [])): pname = peer.get("name", "") plabel = f"{label} peer[{pidx}] '{pname}'" if not pname: errors.append(f"{plabel}: missing 'name' field.") elif pname in seen_peer_names: errors.append(f"{plabel}: duplicate peer name '{pname}'.") else: seen_peer_names[pname] = pidx if not peer.get("public_key"): errors.append(f"{plabel}: missing 'public_key' field.") pip_str = peer.get("ip", "") if not pip_str: errors.append(f"{plabel}: missing 'ip' field.") elif not ipv4(pip_str): errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.") else: pip = ipaddress.IPv4Address(pip_str) if wg_net and pip not in wg_net: errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.") if pip in identity_ips: errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.") if pip_str in seen_peer_ips: errors.append( f"{plabel}: duplicate peer ip '{pip_str}' " f"(also used by peer '{seen_peer_ips[pip_str]}')." ) else: seen_peer_ips[pip_str] = pname continue if not vlan.get("server_identities"): errors.append(f"{label}: server_identities is empty or missing.") continue for field in ("subnet", "subnet_mask"): if not vlan.get(field): errors.append(f"{label}: missing required top-level field '{field}'.") if not vlan.get("subnet") or not vlan.get("subnet_mask"): continue try: network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) vlan_networks[iface] = network except ValueError as e: errors.append(f"{label}: invalid subnet/subnet_mask: {e}") continue d = vlan.get("dhcp_information", {}) required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"} missing = required_dhcp - set(d.keys()) if missing: errors.append(f"{label}: missing dhcp_information fields: {missing}") continue def check_ip(field_label, ip_str, allow_none=False): if ip_str is None: if not allow_none: errors.append(f"{label}: {field_label} is null/missing.") return None if not ipv4(ip_str): errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.") return None addr = ipaddress.IPv4Address(ip_str) if addr not in network: errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.") return addr identity_ips = [] for idx, ident in enumerate(vlan["server_identities"]): ip = check_ip( f"server_identities[{idx}] '{ident.get('description', '?')}'", ident.get("ip") ) if ip: identity_ips.append(ip) # -- Validate explicit_overrides --------------------------------------- eo = d.get("explicit_overrides", {}) if not isinstance(eo, dict): errors.append(f"{label}: explicit_overrides must be a plain object.") else: gw = eo.get("gateway", "") if gw: gw_ip = check_ip("explicit_overrides.gateway", gw) if gw_ip and gw_ip not in identity_ips: errors.append( f"{label}: explicit_overrides.gateway '{gw}' does not match " f"any server_identity IP. Must be one of: " f"{[str(ip) for ip in identity_ips]}." ) dns = eo.get("dns_server", "") if dns: check_ip("explicit_overrides.dns_server", dns) ntp = eo.get("ntp_server", "") if ntp: check_ip("explicit_overrides.ntp_server", ntp) pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"]) pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"]) if pool_start and pool_end and pool_start > pool_end: errors.append( f"{label}: dynamic_pool_start '{pool_start}' is greater than " f"dynamic_pool_end '{pool_end}'." ) if pool_start and pool_end: for ip in identity_ips: if pool_start <= ip <= pool_end: errors.append( f"{label}: server_identity '{ip}' falls inside the dynamic " f"pool ({pool_start} - {pool_end})." ) seen_res_ips = {} seen_res_macs = {} for r in vlan.get("reservations", []): rdesc = r.get("description", "?") rmac = r.get("mac", "").lower().strip() if is_dynamic_ip(r): rip = None else: rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip")) if rip: if pool_start and pool_end and pool_start <= rip <= pool_end: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' falls inside " f"the dynamic pool ({pool_start} - {pool_end})." ) rip_str = str(rip) if rip_str in seen_res_ips: # Allow same IP for different MACs (multi-interface device) if rmac and rmac in seen_res_ips[rip_str]: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' " f"duplicates '{seen_res_ips[rip_str][rmac]}'." ) else: seen_res_ips[rip_str][rmac] = rdesc else: seen_res_ips[rip_str] = {rmac: rdesc} if rip in identity_ips: errors.append( f"{label}: reservation '{rdesc}' ip '{rip}' conflicts " f"with a server_identity." ) if rmac: if rmac in seen_res_macs: errors.append( f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates " f"'{seen_res_macs[rmac]}'." ) else: seen_res_macs[rmac] = rdesc for bl_name in vlan.get("use_blocklists", []): if bl_name not in blocklists_by_name: errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.") # -- NAT / firewall validation --------------------------------------------- valid_protos = VALID_PROTOCOLS known_interfaces = set(seen_interfaces.keys()) def nat_check_port(label, port): if int_range(port, 1, 65535) is None: errors.append(f"{label}: '{port}' is not a valid port number (1-65535).") def nat_check_ip(label, ip_str): if not ipv4(ip_str): errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.") return None return ipaddress.IPv4Address(ip_str) def nat_check_ip_in_network(label, ip_str, network): ip = nat_check_ip(label, ip_str) if ip and ip not in network: errors.append(f"{label}: '{ip_str}' is not within subnet {network}.") for vlan, iface in zip(data.get("vlans", []), vlan_ifaces): name = vlan.get("name", "?") net = vlan_networks.get(iface) for r in vlan.get("port_wrangling", []): desc = r.get("description", "?") label = f"vlan '{name}' port_wrangling '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") nat_check_port(f"{label} dest_port", r.get("dest_port")) if net: nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net) # -- port_forwarding validation (top-level) -------------------------------- for idx, r in enumerate(data.get("port_forwarding", [])): desc = r.get("description", "?") label = f"port_forwarding[{idx}] '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") nat_check_port(f"{label} dest_port", r.get("dest_port")) nat_check_port(f"{label} nat_port", r.get("nat_port")) nat_check_ip(f"{label} nat_ip", r.get("nat_ip", "")) for r in data.get("inter_vlan_exceptions", []): desc = r.get("description", "?") label = f"inter_vlan_exceptions '{desc}'" if r.get("protocol") not in valid_protos: errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " f"Must be tcp, udp, or both.") if "src_ip_or_subnet" not in r: errors.append(f"{label}: missing field 'src_ip_or_subnet'.") else: val = r["src_ip_or_subnet"] if not ipv4_or_cidr(val): errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid " f"IPv4 address or network.") dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "") if not dst: errors.append(f"{label}: missing field 'dst_ip_or_subnet'.") else: if not ipv4_or_cidr(dst): errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid " f"IPv4 address or network.") if r.get("dst_port") is not None: nat_check_port(f"{label} dst_port", r.get("dst_port")) # -- radius_default uniqueness check --------------------------------------- defaults = [v["name"] for v in data.get("vlans", []) if v.get("radius_default") is True] if len(defaults) > 1: errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). " f"Only one VLAN may be the RADIUS default.") # -- RADIUS requires multiple VLANs ---------------------------------------- non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)] has_radius_clients = any( r.get("radius_client") for v in non_wg_vlans for r in v.get("reservations", []) ) if has_radius_clients and len(non_wg_vlans) < 2: errors.append( "RADIUS clients are configured but only one non-VPN VLAN exists. " "Dynamic VLAN assignment requires at least two VLANs." ) # -- host_overrides validation --------------------------------------------- all_vlan_nets = list(vlan_networks.values()) for idx, entry in enumerate(data.get("host_overrides", [])): lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'" if not entry.get("host"): errors.append(f"{lbl}: missing 'host' field.") ip_str = entry.get("ip", "") if not ip_str: errors.append(f"{lbl}: missing 'ip' field.") elif not ipv4(ip_str): errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.") else: ip_addr = ipaddress.IPv4Address(ip_str) if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets): errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.") # -- banned_ips validation ------------------------------------------------- for idx, entry in enumerate(data.get("banned_ips", [])): ip_val = entry.get("ip", "") lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'" if not ip_val: errors.append(f"{lbl}: missing 'ip' field.") continue if not banned_ip(ip_val): errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.") return errors