linuxrouter/routlin/mod_validation.py
2026-06-05 22:54:12 -04:00

1000 lines
39 KiB
Python

"""
validation.py -- Shared structural validators for config.json fields.
Lives alongside core.py in ~/routlin/ and is volume-mounted into the
routlin-dash container at /app/validation.py. Importable by both
core.py (router host) and the Flask app directly.
Convention: primitive validators accept a raw string and return the
normalised valid value, or '' / None if the input is invalid.
"""
import ipaddress
import os
import re
VALID_PROTOCOLS = {'tcp', 'udp', 'both'}
VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'}
VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns')
# ===================================================================
# IP / CIDR
# ===================================================================
def ip(value):
"""Return value if it is a valid IPv4 or IPv6 address, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.ip_address(v)
return v
except ValueError:
return ''
def ip_or_cidr(value):
"""Return value if it is a valid IPv4/IPv6 address or CIDR network, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.ip_address(v)
return v
except ValueError:
pass
try:
ipaddress.ip_network(v, strict=False)
return v
except ValueError:
return ''
def ipv4(value):
"""Return value if it is a valid IPv4 address, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.IPv4Address(v)
return v
except ValueError:
return ''
def ipv4_or_cidr(value):
"""Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''."""
if not value:
return ''
v = str(value).strip()
try:
ipaddress.IPv4Address(v)
return v
except ValueError:
pass
try:
ipaddress.IPv4Network(v, strict=False)
return v
except ValueError:
return ''
# ===================================================================
# Port
# ===================================================================
def port(value):
"""Return port as string if valid 1-65535, else ''."""
try:
p = int(re.sub(r'[^0-9]', '', str(value)))
if 1 <= p <= 65535:
return str(p)
except (ValueError, TypeError):
pass
return ''
# ===================================================================
# Time
# ===================================================================
def time_24h(value):
"""Return value if it is a valid 24-hour HH:MM time string, else ''."""
if not value:
return ''
v = str(value).strip()
if re.fullmatch(r'([01]\d|2[0-3]):[0-5]\d', v):
return v
return ''
# ===================================================================
# Integer range
# ===================================================================
def int_range(value, lo, hi):
"""Return value as int if it is an integer within [lo, hi], else None.
lo and hi may be None to indicate no lower or upper bound respectively.
"""
try:
v = int(str(value).strip())
if (lo is None or v >= lo) and (hi is None or v <= hi):
return v
except (ValueError, TypeError):
pass
return None
# ===================================================================
# Domain name
# ===================================================================
def domainname(value):
"""Return value if it is a valid domain name, else ''.
Rules: labels separated by dots; each label contains only
letters, digits, and hyphens; no label may start or end with a
hyphen; no consecutive dots; total length <= 253.
"""
if not value:
return ''
v = str(value).strip().lower()
if len(v) > 253:
return ''
if '..' in v or v.startswith('.') or v.endswith('.'):
return ''
labels = v.split('.')
for label in labels:
if not label:
return ''
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label):
return ''
return v
# ===================================================================
# Banned-IP pattern
# ===================================================================
def banned_ip(value):
"""
Return value if it is a valid banned_ip pattern, else ''.
Accepted formats (mirrors core.py expand_banned_ip):
IPv4:
Single address 192.0.2.1
CIDR 192.0.2.0/24
Wildcard octet 192.0.2.*
Octet range 192.0.2.10-20
(combinations that expand to <=1024 entries are accepted)
IPv6:
Single address 2001:db8::1
CIDR 2001:db8::/32
Trailing wildcard 2001:db8:c17:*
"""
if not value:
return ''
v = str(value).strip()
try:
_check_banned_ip(v)
return v
except (ValueError, TypeError):
return ''
def _check_banned_ip(ip_str):
if ':' in ip_str:
_check_banned_ipv6(ip_str)
else:
_check_banned_ipv4(ip_str)
def _check_banned_ipv4(ip_str):
if '/' in ip_str:
ipaddress.IPv4Network(ip_str, strict=False)
return
parts = ip_str.split('.')
if len(parts) != 4:
raise ValueError(f"Expected 4 octets: {ip_str!r}")
def parse_octet(s):
if s == '*':
return (0, 255)
if '-' in s:
a, b = s.split('-', 1)
lo, hi = int(a), int(b)
if not (0 <= lo <= hi <= 255):
raise ValueError(f"Invalid octet range {s!r}")
return (lo, hi)
v = int(s)
if not 0 <= v <= 255:
raise ValueError(f"Octet {v} out of 0-255")
return (v, v)
ranges = [parse_octet(p) for p in parts]
trailing = 0
for lo, hi in reversed(ranges):
if lo == 0 and hi == 255:
trailing += 1
else:
break
total = 1
for lo, hi in ranges[:4 - trailing]:
total *= (hi - lo + 1)
if total > 1024:
raise ValueError(f"Pattern expands to {total} entries (limit 1024); use CIDR")
def _check_banned_ipv6(ip_str):
if '/' in ip_str:
ipaddress.IPv6Network(ip_str, strict=False)
return
if '*' not in ip_str:
ipaddress.IPv6Address(ip_str)
return
if not ip_str.endswith(':*'):
raise ValueError(f"Unsupported IPv6 wildcard: {ip_str!r}; use 'prefix:*' or CIDR")
prefix_part = ip_str[:-2]
if '::' in prefix_part:
left, right = prefix_part.split('::', 1)
lg = [g for g in left.split(':') if g] if left else []
rg = [g for g in right.split(':') if g] if right else []
zeros = 8 - len(lg) - len(rg) - 1
if zeros < 0:
raise ValueError(f"Too many groups in {ip_str!r}")
groups = lg + ['0000'] * zeros + rg
else:
groups = [g for g in prefix_part.split(':') if g]
if not (1 <= len(groups) <= 7):
raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}")
# ===================================================================
# Cross-VLAN uniqueness checks (callable independently by action.py)
# ===================================================================
def check_vlan_name_unique(vlans, name, exclude_idx=None):
"""Return error string if name is already used by another VLAN, else None."""
for i, v in enumerate(vlans):
if exclude_idx is not None and i == exclude_idx:
continue
if v.get('name') == name:
return f'VLAN name "{name}" is already in use (VLAN ID {v.get("vlan_id")}).'
return None
def check_vlan_id_unique(vlans, vlan_id, exclude_idx=None):
"""Return error string if vlan_id is already used by another VLAN, else None."""
for i, v in enumerate(vlans):
if exclude_idx is not None and i == exclude_idx:
continue
if v.get('vlan_id') == vlan_id:
return f'VLAN ID {vlan_id} is already in use (VLAN "{v.get("name")}").'
return None
def check_radius_default_unique(vlans, exclude_idx=None):
"""Return error string if another VLAN (not exclude_idx) is already the RADIUS default, else None."""
for i, v in enumerate(vlans):
if exclude_idx is not None and i == exclude_idx:
continue
if v.get('radius_default'):
return f'VLAN "{v.get("name", "?")}" is already the RADIUS default. Only one VLAN may be the default.'
return None
def check_mdns_vpn(is_vpn, mdns_reflection):
"""Return error string if mDNS reflection is enabled on a VPN VLAN, else None."""
if is_vpn and mdns_reflection:
return 'mDNS reflection is not supported on VPN VLANs.'
return None
# ===================================================================
# Blocklist checks (callable independently by action.py)
# ===================================================================
def check_blocklist_name_unique(blocklists, name, exclude_idx=None):
"""Return error string if name is already used by another blocklist entry, else None."""
for i, b in enumerate(blocklists):
if exclude_idx is not None and i == exclude_idx:
continue
if b.get('name') == name:
return f'A blocklist named "{name}" already exists.'
return None
# ===================================================================
# DHCP reservation checks (callable independently by action.py)
# ===================================================================
def check_reservation_ip_conflicts(ip, vlan):
"""Return error string if ip conflicts with the VLAN's pool range or server identities, else None."""
if not ip or ip == 'dynamic':
return None
dhcp = vlan.get('dhcp_information', {})
pool_start = dhcp.get('dynamic_pool_start')
pool_end = dhcp.get('dynamic_pool_end')
if pool_start and pool_end:
try:
if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip)
<= ipaddress.IPv4Address(pool_end)):
return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).'
except Exception:
pass
identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')}
if ip in identity_ips:
return f'{ip} is already assigned as a server identity IP.'
return None
# ===================================================================
# Host override checks (callable independently by action.py)
# ===================================================================
def check_host_override_ip_in_vlans(ip_str, cfg):
"""Return error string if ip_str is not within any configured VLAN subnet, else None."""
try:
addr = ipaddress.IPv4Address(ip_str)
except ValueError:
return f"'{ip_str}' is not a valid IPv4 address."
nets = []
for v in cfg.get('vlans', []):
subnet = v.get('subnet', '')
mask = v.get('subnet_mask', '')
if subnet and mask:
try:
nets.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False))
except ValueError:
pass
if not nets:
return None
if not any(addr in net for net in nets):
return 'IP address does not fall within any configured VLAN subnet.'
return None
# ===================================================================
# VPN uniqueness checks (callable independently by action.py)
# ===================================================================
def check_vpn_listen_port_unique(vlans, listen_port, exclude_vlan_name=None):
"""Return error string if listen_port is already used by another VPN VLAN, else None."""
for v in vlans:
if not v.get('is_vpn'):
continue
if exclude_vlan_name is not None and v.get('name') == exclude_vlan_name:
continue
if v.get('vpn_information', {}).get('listen_port') == listen_port:
return f'Listen port {listen_port} is already used by another VPN VLAN.'
return None
def check_peer_name_unique(peers, name, exclude_idx=None):
"""Return error string if name is already used by another peer, else None."""
for i, p in enumerate(peers):
if exclude_idx is not None and i == exclude_idx:
continue
if p.get('name') == name:
return f'A peer named "{name}" already exists.'
return None
# ===================================================================
# Physical interface checks (callable independently by action.py)
# ===================================================================
def check_wan_lan_unique(wan, lan):
"""Return error string if WAN and LAN resolve to the same interface, else None."""
if wan and lan and wan == lan:
return 'WAN and LAN interfaces must be different.'
return None
def check_interface_exists(iface, available):
"""Return error string if iface is absent from the available set (when non-empty), else None."""
if available and iface not in available:
return f"Interface '{iface}' does not exist on this system."
return None
# ===================================================================
# VLAN / interface helpers (shared with core.py apply logic)
# ===================================================================
def is_wg(vlan):
return vlan.get("is_vpn", False)
def is_dynamic_ip(r):
"""Return True if a reservation has no pinned IP (DHCP assigns from pool)."""
ip = r.get("ip", "dynamic")
return ip in ("", "dynamic") or ip is None
def derive_vlan_id(subnet, prefix):
"""Return VLAN ID (1-4094) derived from the active octet of the network address, or None."""
try:
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
octets = list(network.network_address.packed)
byte_idx = (int(prefix) - 1) // 8
vlan_id = octets[byte_idx]
if 1 <= vlan_id <= 4094:
return vlan_id
return None
except Exception:
return None
def derive_interface(vlan, data):
"""Derive the interface name for a VLAN without mutating data."""
lan = data.get('network_interfaces', {}).get('lan_interface', 'eth0')
if is_wg(vlan):
wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)]
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
return f'wg{idx}'
vid = vlan.get('vlan_id')
return lan if vid == 1 else f'{lan}.{vid}'
# ===================================================================
# Full config validation (shared with core.py --apply)
# ===================================================================
def validate_config(data):
"""Validate config.json structure and content. Returns list of error strings."""
errors = []
seen_interfaces = {}
seen_listen_ports = {}
# Pre-compute per-VLAN vlan_ids and interface names without mutating data
_lan = data.get("network_interfaces", {}).get("lan_interface", "eth0")
_all_vlans = data.get("vlans", [])
_stored_ids = [_v.get("vlan_id") for _v in _all_vlans]
_wg_sorted = sorted(
[(i, _stored_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)],
key=lambda x: (x[1] is None, x[1] or 0)
)
_wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)}
vlan_ifaces = []
for i, _vlan in enumerate(_all_vlans):
if is_wg(_vlan):
vlan_ifaces.append(f"wg{_wg_order[i]}")
else:
_vid = _stored_ids[i]
vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}")
# upstream_dns block ============================================
if not data.get("upstream_dns", {}).get("upstream_servers"):
errors.append("upstream_dns.upstream_servers is missing or empty.")
# WAN / LAN interfaces ==========================================
gen = data.get("network_interfaces", {})
wan = gen.get("wan_interface", "")
lan = gen.get("lan_interface", "")
if not wan:
errors.append("network_interfaces.wan_interface is missing or empty.")
if not lan:
errors.append("network_interfaces.lan_interface is missing or empty.")
if wan and lan:
available_interfaces = set()
try:
available_interfaces = set(os.listdir('/sys/class/net'))
except Exception:
pass
err = check_interface_exists(wan, available_interfaces)
if err:
errors.append(f"network_interfaces.wan_interface: {err}")
err = check_interface_exists(lan, available_interfaces)
if err:
errors.append(f"network_interfaces.lan_interface: {err}")
err = check_wan_lan_unique(wan, lan)
if err:
errors.append(f"network_interfaces: {err}")
# Blocklist library =============================================
blocklists_by_name = {}
for idx, bl in enumerate(data.get("dns_blocking", {}).get("blocklists", [])):
name = bl.get("name", "")
label = f"dns_blocking.blocklists[{idx}] '{name}'"
for field in ("name", "description", "save_as", "url", "format"):
if not bl.get(field):
errors.append(f"{label}: missing or empty field '{field}'.")
if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS:
errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.")
if name:
err = check_blocklist_name_unique(
data.get("dns_blocking", {}).get("blocklists", []), name, exclude_idx=idx
)
if err:
errors.append(f"{label}: {err}")
else:
blocklists_by_name[name] = bl
# Per-VLAN validation ===========================================
vlan_networks = {} # iface -> IPv4Network (used for NAT section)
for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)):
vlan_id = _stored_ids[i]
name = vlan.get("name", "?")
label = f"vlan '{name}' (id={vlan_id})"
if vlan_id is None or not isinstance(vlan_id, int) or not (1 <= vlan_id <= 4094):
errors.append(f"vlan '{name}': vlan_id must be an integer 1-4094 (got {vlan_id!r}).")
err = check_vlan_name_unique(_all_vlans, name, exclude_idx=i)
if err:
errors.append(f"{label}: {err}")
err = check_vlan_id_unique(_all_vlans, vlan_id, exclude_idx=i)
if err:
errors.append(f"{label}: {err}")
if iface in seen_interfaces:
errors.append(f"{label}: duplicate interface '{iface}' "
f"(also used by '{seen_interfaces[iface]}').")
else:
seen_interfaces[iface] = name
err = check_mdns_vpn(is_wg(vlan), vlan.get('mdns_reflection'))
if err:
errors.append(f"{label}: {err}")
if is_wg(vlan):
# vpn_information =======================================
vpi = vlan.get("vpn_information")
if not isinstance(vpi, dict):
errors.append(f"{label}: vpn_information must be a plain object.")
vpi = {}
else:
lp = vpi.get("listen_port")
if int_range(lp, 1, 65535) is None:
errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.")
elif lp in seen_listen_ports:
errors.append(f"{label}: vpn_information.listen_port {lp} is already used by "
f"'{seen_listen_ports[lp]}'.")
else:
seen_listen_ports[lp] = name
# subnet/subnet_mask ====================================
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required field '{field}'.")
wg_net = None
if vlan.get("subnet") and vlan.get("subnet_mask"):
try:
wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = wg_net
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
# server_identities =====================================
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
identity_ips = []
for idx, ident in enumerate(vlan.get("server_identities", [])):
ip_str = ident.get("ip", "")
ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'"
if not ip_str:
errors.append(f"{ilabel}: missing 'ip' field.")
continue
if not ipv4(ip_str):
errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.")
continue
ip_addr = ipaddress.IPv4Address(ip_str)
if wg_net and ip_addr not in wg_net:
errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.")
else:
identity_ips.append(ip_addr)
# vpn_information.explicit_overrides ====================
eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {}
if not isinstance(eo, dict):
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
if not ipv4(gw):
errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.")
else:
gw_ip = ipaddress.IPv4Address(gw)
if identity_ips and gw_ip not in identity_ips:
errors.append(
f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
dns = eo.get("dns_servers", "")
if dns and not ipv4(dns):
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
mtu = eo.get("mtu", "")
if mtu and int_range(mtu, 576, 9000) is None:
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.")
domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else ""
if domain_val and not domainname(domain_val):
errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.")
# peers =================================================
seen_peer_names = {}
seen_peer_ips = {}
for pidx, peer in enumerate(vlan.get("peers", [])):
pname = peer.get("name", "")
plabel = f"{label} peer[{pidx}] '{pname}'"
if not pname:
errors.append(f"{plabel}: missing 'name' field.")
elif pname in seen_peer_names:
errors.append(f"{plabel}: duplicate peer name '{pname}'.")
else:
seen_peer_names[pname] = pidx
if not peer.get("public_key"):
errors.append(f"{plabel}: missing 'public_key' field.")
pip_str = peer.get("ip", "")
if not pip_str:
errors.append(f"{plabel}: missing 'ip' field.")
elif not ipv4(pip_str):
errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.")
else:
pip = ipaddress.IPv4Address(pip_str)
if wg_net and pip not in wg_net:
errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.")
if pip in identity_ips:
errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.")
if pip_str in seen_peer_ips:
errors.append(
f"{plabel}: duplicate peer ip '{pip_str}' "
f"(also used by peer '{seen_peer_ips[pip_str]}')."
)
else:
seen_peer_ips[pip_str] = pname
continue
if not vlan.get("server_identities"):
errors.append(f"{label}: server_identities is empty or missing.")
continue
for field in ("subnet", "subnet_mask"):
if not vlan.get(field):
errors.append(f"{label}: missing required top-level field '{field}'.")
if not vlan.get("subnet") or not vlan.get("subnet_mask"):
continue
try:
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
vlan_networks[iface] = network
except ValueError as e:
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
continue
d = vlan.get("dhcp_information", {})
required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"}
missing = required_dhcp - set(d.keys())
if missing:
errors.append(f"{label}: missing dhcp_information fields: {missing}")
continue
def check_ip(field_label, ip_str, allow_none=False):
if ip_str is None:
if not allow_none:
errors.append(f"{label}: {field_label} is null/missing.")
return None
if not ipv4(ip_str):
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
return None
addr = ipaddress.IPv4Address(ip_str)
if addr not in network:
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
return addr
identity_ips = []
for idx, ident in enumerate(vlan["server_identities"]):
ip = check_ip(
f"server_identities[{idx}] '{ident.get('description', '?')}'",
ident.get("ip")
)
if ip:
identity_ips.append(ip)
# Validate explicit_overrides ===============================
eo = d.get("explicit_overrides", {})
if not isinstance(eo, dict):
errors.append(f"{label}: explicit_overrides must be a plain object.")
else:
gw = eo.get("gateway", "")
if gw:
gw_ip = check_ip("explicit_overrides.gateway", gw)
if gw_ip and gw_ip not in identity_ips:
errors.append(
f"{label}: explicit_overrides.gateway '{gw}' does not match "
f"any server_identity IP. Must be one of: "
f"{[str(ip) for ip in identity_ips]}."
)
dns = eo.get("dns_servers", "")
if dns:
for _ip in (dns if isinstance(dns, list) else [dns]):
check_ip("explicit_overrides.dns_servers", _ip)
ntp = eo.get("ntp_servers", "")
if ntp:
for _ip in (ntp if isinstance(ntp, list) else [ntp]):
check_ip("explicit_overrides.ntp_servers", _ip)
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
if pool_start and pool_end and pool_start > pool_end:
errors.append(
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
f"dynamic_pool_end '{pool_end}'."
)
if pool_start and pool_end:
for ip in identity_ips:
if pool_start <= ip <= pool_end:
errors.append(
f"{label}: server_identity '{ip}' falls inside the dynamic "
f"pool ({pool_start} - {pool_end})."
)
seen_res_ips = {}
seen_res_macs = {}
seen_res_hostnames = {}
vlan_name_key = vlan.get("name", "")
for r in [r for r in data.get("dhcp_reservations", []) if r.get("vlan") == vlan_name_key]:
rdesc = r.get("description", "?")
rmac = r.get("mac", "").lower().strip()
if is_dynamic_ip(r):
rip = None
else:
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
if rip:
if pool_start and pool_end and pool_start <= rip <= pool_end:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
f"the dynamic pool ({pool_start} - {pool_end})."
)
rip_str = str(rip)
if rip_str in seen_res_ips:
# Allow same IP for different MACs (multi-interface device)
if rmac and rmac in seen_res_ips[rip_str]:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
)
else:
seen_res_ips[rip_str][rmac] = rdesc
else:
seen_res_ips[rip_str] = {rmac: rdesc}
if rip in identity_ips:
errors.append(
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
f"with a server_identity."
)
if rmac:
if rmac in seen_res_macs:
errors.append(
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
f"'{seen_res_macs[rmac]}'."
)
else:
seen_res_macs[rmac] = rdesc
rhost = r.get("hostname", "").strip().lower()
if rhost:
if rhost in seen_res_hostnames:
errors.append(
f"{label}: reservation '{rdesc}' hostname '{rhost}' duplicates "
f"'{seen_res_hostnames[rhost]}'."
)
else:
seen_res_hostnames[rhost] = rdesc
for bl_name in vlan.get("use_blocklists", []):
if bl_name not in blocklists_by_name:
errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.")
# NAT / firewall validation =====================================
valid_protos = VALID_PROTOCOLS
known_interfaces = set(seen_interfaces.keys())
def nat_check_port(label, port):
if int_range(port, 1, 65535) is None:
errors.append(f"{label}: '{port}' is not a valid port number (1-65535).")
def nat_check_ip(label, ip_str):
if not ipv4(ip_str):
errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.")
return None
return ipaddress.IPv4Address(ip_str)
def nat_check_ip_in_network(label, ip_str, network):
ip = nat_check_ip(label, ip_str)
if ip and ip not in network:
errors.append(f"{label}: '{ip_str}' is not within subnet {network}.")
# port_wrangling validation (top-level) =========================
_vlan_name_to_net = {
v.get("name", ""): vlan_networks.get(iface)
for v, iface in zip(data.get("vlans", []), vlan_ifaces)
}
for idx, r in enumerate(data.get("port_wrangling", [])):
desc = r.get("description", "?")
vlan_name = r.get("vlan", "?")
label = f"port_wrangling[{idx}] (vlan '{vlan_name}') '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
nat_check_port(f"{label} dest_port", r.get("dest_port"))
net = _vlan_name_to_net.get(vlan_name)
if net:
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net)
# port_forwarding validation (top-level) ========================
for idx, r in enumerate(data.get("port_forwarding", [])):
desc = r.get("description", "?")
label = f"port_forwarding[{idx}] '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
nat_check_port(f"{label} dest_port", r.get("dest_port"))
nat_check_port(f"{label} nat_port", r.get("nat_port"))
nat_check_ip(f"{label} nat_ip", r.get("nat_ip", ""))
if r.get("enabled"):
nat_ip_str = r.get("nat_ip", "")
try:
nat_addr = ipaddress.IPv4Address(nat_ip_str)
for v in _all_vlans:
if not v.get("restricted_vlan"):
continue
try:
vnet = ipaddress.IPv4Network(f"{v['subnet']}/{v['subnet_mask']}", strict=False)
except Exception:
continue
if nat_addr in vnet:
errors.append(
f"Port forwarding rule '{desc}' is enabled but its destination "
f"({nat_ip_str}) is on restricted VLAN '{v['name']}'. "
f"Disable the rule or remove the restricted_vlan flag."
)
break
except Exception:
pass
for r in data.get("inter_vlan_exceptions", []):
desc = r.get("description", "?")
label = f"inter_vlan_exceptions '{desc}'"
if r.get("protocol") not in valid_protos:
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
f"Must be tcp, udp, or both.")
if "src_ip_or_subnet" not in r:
errors.append(f"{label}: missing field 'src_ip_or_subnet'.")
else:
val = r["src_ip_or_subnet"]
if not ipv4_or_cidr(val):
errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid "
f"IPv4 address or network.")
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
if not dst:
errors.append(f"{label}: missing field 'dst_ip_or_subnet'.")
else:
if not ipv4_or_cidr(dst):
errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid "
f"IPv4 address or network.")
if r.get("dest_port_start"):
nat_check_port(f"{label} dest_port_start", r.get("dest_port_start"))
if r.get("dest_port_end"):
nat_check_port(f"{label} dest_port_end", r.get("dest_port_end"))
min_p, max_p = r.get("dest_port_start", ""), r.get("dest_port_end", "")
if min_p and max_p:
try:
if int(min_p) > int(max_p):
errors.append(f"{label}: dest_port_start {min_p} is greater than dest_port_end {max_p}.")
except (ValueError, TypeError):
pass
# radius_default uniqueness check ===============================
_rd_idx = next(
(i for i, v in enumerate(data.get('vlans', [])) if v.get('radius_default')), None
)
if _rd_idx is not None:
err = check_radius_default_unique(data.get('vlans', []), exclude_idx=_rd_idx)
if err:
defaults = [v.get('name', '?') for v in data.get('vlans', []) if v.get('radius_default')]
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
f"Only one VLAN may be the RADIUS default.")
# RADIUS requires multiple VLANs ================================
non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
non_wg_names = {v.get("name") for v in non_wg_vlans}
has_radius_clients = any(
r.get("radius_client")
for r in data.get("dhcp_reservations", [])
if r.get("vlan") in non_wg_names
)
if has_radius_clients and len(non_wg_vlans) < 2:
errors.append(
"RADIUS clients are configured but only one non-VPN VLAN exists. "
"Dynamic VLAN assignment requires at least two VLANs."
)
# host_overrides validation =====================================
all_vlan_nets = list(vlan_networks.values())
for idx, entry in enumerate(data.get("host_overrides", [])):
lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'"
if not entry.get("host"):
errors.append(f"{lbl}: missing 'host' field.")
ip_str = entry.get("ip", "")
if not ip_str:
errors.append(f"{lbl}: missing 'ip' field.")
elif not ipv4(ip_str):
errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.")
else:
ip_addr = ipaddress.IPv4Address(ip_str)
if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets):
errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.")
# banned_ips validation =========================================
for idx, entry in enumerate(data.get("banned_ips", [])):
ip_val = entry.get("ip", "")
lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'"
if not ip_val:
errors.append(f"{lbl}: missing 'ip' field.")
continue
if not banned_ip(ip_val):
errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.")
return errors
def check_portfwd_restricted_vlan(nat_ip, vlans):
"""Return an error string if nat_ip falls within a restricted VLAN's subnet, else None."""
try:
addr = ipaddress.IPv4Address(nat_ip)
except Exception:
return None
for v in vlans:
if not v.get('restricted_vlan'):
continue
try:
net = ipaddress.IPv4Network(f"{v['subnet']}/{v['subnet_mask']}", strict=False)
except Exception:
continue
if addr in net:
return (f"NAT IP '{nat_ip}' is on restricted VLAN '{v['name']}'. "
f"Port forwarding to restricted VLANs is not permitted.")
return None
def disable_portfwd_on_restricted_vlans(data):
"""Auto-disable enabled port forwarding rules whose nat_ip falls within a restricted VLAN's subnet.
Mutates data in place. Returns list of descriptions of rules that were disabled."""
restricted_nets = []
for v in data.get('vlans', []):
if v.get('restricted_vlan'):
try:
restricted_nets.append(ipaddress.IPv4Network(f"{v['subnet']}/{v['subnet_mask']}", strict=False))
except Exception:
pass
if not restricted_nets:
return []
disabled = []
for rule in data.get('port_forwarding', []):
if not rule.get('enabled'):
continue
try:
addr = ipaddress.IPv4Address(rule.get('nat_ip', ''))
except Exception:
continue
if any(addr in net for net in restricted_nets):
rule['enabled'] = False
disabled.append(rule.get('description') or rule.get('nat_ip', '?'))
return disabled