916 lines
36 KiB
Python
916 lines
36 KiB
Python
"""
|
|
validation.py -- Shared structural validators for config.json fields.
|
|
|
|
Lives alongside core.py in ~/routlin/ and is volume-mounted into the
|
|
routlin-dash container at /app/validation.py. Importable by both
|
|
core.py (router host) and the Flask app directly.
|
|
|
|
Convention: primitive validators accept a raw string and return the
|
|
normalised valid value, or '' / None if the input is invalid.
|
|
"""
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
|
|
VALID_PROTOCOLS = {'tcp', 'udp', 'both'}
|
|
VALID_BLOCKLIST_FORMATS = {'dnsmasq', 'hosts'}
|
|
VALID_DDNS_PROVIDERS = ('noip', 'cloudflare', 'duckdns')
|
|
|
|
|
|
# ===================================================================
|
|
# IP / CIDR
|
|
# ===================================================================
|
|
|
|
def ip(value):
|
|
"""Return value if it is a valid IPv4 or IPv6 address, else ''."""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
try:
|
|
ipaddress.ip_address(v)
|
|
return v
|
|
except ValueError:
|
|
return ''
|
|
|
|
|
|
def ip_or_cidr(value):
|
|
"""Return value if it is a valid IPv4/IPv6 address or CIDR network, else ''."""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
try:
|
|
ipaddress.ip_address(v)
|
|
return v
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
ipaddress.ip_network(v, strict=False)
|
|
return v
|
|
except ValueError:
|
|
return ''
|
|
|
|
|
|
def ipv4(value):
|
|
"""Return value if it is a valid IPv4 address, else ''."""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
try:
|
|
ipaddress.IPv4Address(v)
|
|
return v
|
|
except ValueError:
|
|
return ''
|
|
|
|
|
|
def ipv4_or_cidr(value):
|
|
"""Return value if it is a valid IPv4 address or IPv4 CIDR network, else ''."""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
try:
|
|
ipaddress.IPv4Address(v)
|
|
return v
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
ipaddress.IPv4Network(v, strict=False)
|
|
return v
|
|
except ValueError:
|
|
return ''
|
|
|
|
|
|
# ===================================================================
|
|
# Port
|
|
# ===================================================================
|
|
|
|
def port(value):
|
|
"""Return port as string if valid 1-65535, else ''."""
|
|
try:
|
|
p = int(re.sub(r'[^0-9]', '', str(value)))
|
|
if 1 <= p <= 65535:
|
|
return str(p)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return ''
|
|
|
|
|
|
# ===================================================================
|
|
# Time
|
|
# ===================================================================
|
|
|
|
def time_24h(value):
|
|
"""Return value if it is a valid 24-hour HH:MM time string, else ''."""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
if re.fullmatch(r'([01]\d|2[0-3]):[0-5]\d', v):
|
|
return v
|
|
return ''
|
|
|
|
|
|
# ===================================================================
|
|
# Integer range
|
|
# ===================================================================
|
|
|
|
def int_range(value, lo, hi):
|
|
"""Return value as int if it is an integer within [lo, hi], else None.
|
|
|
|
lo and hi may be None to indicate no lower or upper bound respectively.
|
|
"""
|
|
try:
|
|
v = int(str(value).strip())
|
|
if (lo is None or v >= lo) and (hi is None or v <= hi):
|
|
return v
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# Domain name
|
|
# ===================================================================
|
|
|
|
def domainname(value):
|
|
"""Return value if it is a valid domain name, else ''.
|
|
|
|
Rules: labels separated by dots; each label contains only
|
|
letters, digits, and hyphens; no label may start or end with a
|
|
hyphen; no consecutive dots; total length <= 253.
|
|
"""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip().lower()
|
|
if len(v) > 253:
|
|
return ''
|
|
if '..' in v or v.startswith('.') or v.endswith('.'):
|
|
return ''
|
|
labels = v.split('.')
|
|
for label in labels:
|
|
if not label:
|
|
return ''
|
|
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$|^[a-zA-Z0-9]$', label):
|
|
return ''
|
|
return v
|
|
|
|
|
|
# ===================================================================
|
|
# Banned-IP pattern
|
|
# ===================================================================
|
|
|
|
def banned_ip(value):
|
|
"""
|
|
Return value if it is a valid banned_ip pattern, else ''.
|
|
|
|
Accepted formats (mirrors core.py expand_banned_ip):
|
|
IPv4:
|
|
Single address 192.0.2.1
|
|
CIDR 192.0.2.0/24
|
|
Wildcard octet 192.0.2.*
|
|
Octet range 192.0.2.10-20
|
|
(combinations that expand to <=1024 entries are accepted)
|
|
IPv6:
|
|
Single address 2001:db8::1
|
|
CIDR 2001:db8::/32
|
|
Trailing wildcard 2001:db8:c17:*
|
|
"""
|
|
if not value:
|
|
return ''
|
|
v = str(value).strip()
|
|
try:
|
|
_check_banned_ip(v)
|
|
return v
|
|
except (ValueError, TypeError):
|
|
return ''
|
|
|
|
|
|
def _check_banned_ip(ip_str):
|
|
if ':' in ip_str:
|
|
_check_banned_ipv6(ip_str)
|
|
else:
|
|
_check_banned_ipv4(ip_str)
|
|
|
|
|
|
def _check_banned_ipv4(ip_str):
|
|
if '/' in ip_str:
|
|
ipaddress.IPv4Network(ip_str, strict=False)
|
|
return
|
|
|
|
parts = ip_str.split('.')
|
|
if len(parts) != 4:
|
|
raise ValueError(f"Expected 4 octets: {ip_str!r}")
|
|
|
|
def parse_octet(s):
|
|
if s == '*':
|
|
return (0, 255)
|
|
if '-' in s:
|
|
a, b = s.split('-', 1)
|
|
lo, hi = int(a), int(b)
|
|
if not (0 <= lo <= hi <= 255):
|
|
raise ValueError(f"Invalid octet range {s!r}")
|
|
return (lo, hi)
|
|
v = int(s)
|
|
if not 0 <= v <= 255:
|
|
raise ValueError(f"Octet {v} out of 0-255")
|
|
return (v, v)
|
|
|
|
ranges = [parse_octet(p) for p in parts]
|
|
|
|
trailing = 0
|
|
for lo, hi in reversed(ranges):
|
|
if lo == 0 and hi == 255:
|
|
trailing += 1
|
|
else:
|
|
break
|
|
|
|
total = 1
|
|
for lo, hi in ranges[:4 - trailing]:
|
|
total *= (hi - lo + 1)
|
|
if total > 1024:
|
|
raise ValueError(f"Pattern expands to {total} entries (limit 1024); use CIDR")
|
|
|
|
|
|
def _check_banned_ipv6(ip_str):
|
|
if '/' in ip_str:
|
|
ipaddress.IPv6Network(ip_str, strict=False)
|
|
return
|
|
if '*' not in ip_str:
|
|
ipaddress.IPv6Address(ip_str)
|
|
return
|
|
if not ip_str.endswith(':*'):
|
|
raise ValueError(f"Unsupported IPv6 wildcard: {ip_str!r}; use 'prefix:*' or CIDR")
|
|
prefix_part = ip_str[:-2]
|
|
if '::' in prefix_part:
|
|
left, right = prefix_part.split('::', 1)
|
|
lg = [g for g in left.split(':') if g] if left else []
|
|
rg = [g for g in right.split(':') if g] if right else []
|
|
zeros = 8 - len(lg) - len(rg) - 1
|
|
if zeros < 0:
|
|
raise ValueError(f"Too many groups in {ip_str!r}")
|
|
groups = lg + ['0000'] * zeros + rg
|
|
else:
|
|
groups = [g for g in prefix_part.split(':') if g]
|
|
if not (1 <= len(groups) <= 7):
|
|
raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}")
|
|
|
|
|
|
# ===================================================================
|
|
# Cross-VLAN uniqueness checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_vlan_name_unique(vlans, name, exclude_idx=None):
|
|
"""Return error string if name is already used by another VLAN, else None."""
|
|
for i, v in enumerate(vlans):
|
|
if exclude_idx is not None and i == exclude_idx:
|
|
continue
|
|
if v.get('name') == name:
|
|
return f'VLAN name "{name}" is already in use (VLAN ID {v.get("vlan_id")}).'
|
|
return None
|
|
|
|
|
|
def check_vlan_id_unique(vlans, vlan_id, exclude_idx=None):
|
|
"""Return error string if vlan_id is already used by another VLAN, else None."""
|
|
for i, v in enumerate(vlans):
|
|
if exclude_idx is not None and i == exclude_idx:
|
|
continue
|
|
if v.get('vlan_id') == vlan_id:
|
|
return f'VLAN ID {vlan_id} is already in use (VLAN "{v.get("name")}").'
|
|
return None
|
|
|
|
|
|
def check_radius_default_unique(vlans, exclude_idx=None):
|
|
"""Return error string if another VLAN (not exclude_idx) is already the RADIUS default, else None."""
|
|
for i, v in enumerate(vlans):
|
|
if exclude_idx is not None and i == exclude_idx:
|
|
continue
|
|
if v.get('radius_default'):
|
|
return f'VLAN "{v.get("name", "?")}" is already the RADIUS default. Only one VLAN may be the default.'
|
|
return None
|
|
|
|
|
|
def check_mdns_vpn(is_vpn, mdns_reflection):
|
|
"""Return error string if mDNS reflection is enabled on a VPN VLAN, else None."""
|
|
if is_vpn and mdns_reflection:
|
|
return 'mDNS reflection is not supported on VPN VLANs.'
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# Blocklist checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_blocklist_name_unique(blocklists, name, exclude_idx=None):
|
|
"""Return error string if name is already used by another blocklist entry, else None."""
|
|
for i, b in enumerate(blocklists):
|
|
if exclude_idx is not None and i == exclude_idx:
|
|
continue
|
|
if b.get('name') == name:
|
|
return f'A blocklist named "{name}" already exists.'
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# DHCP reservation checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_reservation_ip_conflicts(ip, vlan):
|
|
"""Return error string if ip conflicts with the VLAN's pool range or server identities, else None."""
|
|
if not ip or ip == 'dynamic':
|
|
return None
|
|
dhcp = vlan.get('dhcp_information', {})
|
|
pool_start = dhcp.get('dynamic_pool_start')
|
|
pool_end = dhcp.get('dynamic_pool_end')
|
|
if pool_start and pool_end:
|
|
try:
|
|
if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip)
|
|
<= ipaddress.IPv4Address(pool_end)):
|
|
return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).'
|
|
except Exception:
|
|
pass
|
|
identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')}
|
|
if ip in identity_ips:
|
|
return f'{ip} is already assigned as a server identity IP.'
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# Host override checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_host_override_ip_in_vlans(ip_str, cfg):
|
|
"""Return error string if ip_str is not within any configured VLAN subnet, else None."""
|
|
try:
|
|
addr = ipaddress.IPv4Address(ip_str)
|
|
except ValueError:
|
|
return f"'{ip_str}' is not a valid IPv4 address."
|
|
nets = []
|
|
for v in cfg.get('vlans', []):
|
|
subnet = v.get('subnet', '')
|
|
mask = v.get('subnet_mask', '')
|
|
if subnet and mask:
|
|
try:
|
|
nets.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False))
|
|
except ValueError:
|
|
pass
|
|
if not nets:
|
|
return None
|
|
if not any(addr in net for net in nets):
|
|
return 'IP address does not fall within any configured VLAN subnet.'
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# VPN uniqueness checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_vpn_listen_port_unique(vlans, listen_port, exclude_vlan_name=None):
|
|
"""Return error string if listen_port is already used by another VPN VLAN, else None."""
|
|
for v in vlans:
|
|
if not v.get('is_vpn'):
|
|
continue
|
|
if exclude_vlan_name is not None and v.get('name') == exclude_vlan_name:
|
|
continue
|
|
if v.get('vpn_information', {}).get('listen_port') == listen_port:
|
|
return f'Listen port {listen_port} is already used by another VPN VLAN.'
|
|
return None
|
|
|
|
|
|
def check_peer_name_unique(peers, name, exclude_idx=None):
|
|
"""Return error string if name is already used by another peer, else None."""
|
|
for i, p in enumerate(peers):
|
|
if exclude_idx is not None and i == exclude_idx:
|
|
continue
|
|
if p.get('name') == name:
|
|
return f'A peer named "{name}" already exists.'
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# Physical interface checks (callable independently by action.py)
|
|
# ===================================================================
|
|
|
|
def check_wan_lan_unique(wan, lan):
|
|
"""Return error string if WAN and LAN resolve to the same interface, else None."""
|
|
if wan and lan and wan == lan:
|
|
return 'WAN and LAN interfaces must be different.'
|
|
return None
|
|
|
|
|
|
def check_interface_exists(iface, available):
|
|
"""Return error string if iface is absent from the available set (when non-empty), else None."""
|
|
if available and iface not in available:
|
|
return f"Interface '{iface}' does not exist on this system."
|
|
return None
|
|
|
|
|
|
# ===================================================================
|
|
# VLAN / interface helpers (shared with core.py apply logic)
|
|
# ===================================================================
|
|
|
|
def is_wg(vlan):
|
|
return vlan.get("is_vpn", False)
|
|
|
|
|
|
def is_dynamic_ip(r):
|
|
"""Return True if a reservation has no pinned IP (DHCP assigns from pool)."""
|
|
ip = r.get("ip", "dynamic")
|
|
return ip in ("", "dynamic") or ip is None
|
|
|
|
|
|
def derive_vlan_id(subnet, prefix):
|
|
"""Return VLAN ID (1-4094) derived from the active octet of the network address, or None."""
|
|
try:
|
|
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
|
|
octets = list(network.network_address.packed)
|
|
byte_idx = (int(prefix) - 1) // 8
|
|
vlan_id = octets[byte_idx]
|
|
if 1 <= vlan_id <= 4094:
|
|
return vlan_id
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def derive_interface(vlan, data):
|
|
"""Derive the interface name for a VLAN without mutating data."""
|
|
lan = data.get('network_interfaces', {}).get('lan_interface', 'eth0')
|
|
if is_wg(vlan):
|
|
wg_vlans = [v for v in data.get('vlans', []) if is_wg(v)]
|
|
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
|
|
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
|
|
return f'wg{idx}'
|
|
vid = vlan.get('vlan_id')
|
|
return lan if vid == 1 else f'{lan}.{vid}'
|
|
|
|
|
|
# ===================================================================
|
|
# Full config validation (shared with core.py --apply)
|
|
# ===================================================================
|
|
|
|
def validate_config(data):
|
|
"""Validate config.json structure and content. Returns list of error strings."""
|
|
errors = []
|
|
seen_interfaces = {}
|
|
seen_listen_ports = {}
|
|
|
|
# Pre-compute per-VLAN vlan_ids and interface names without mutating data
|
|
_lan = data.get("network_interfaces", {}).get("lan_interface", "eth0")
|
|
_all_vlans = data.get("vlans", [])
|
|
_stored_ids = [_v.get("vlan_id") for _v in _all_vlans]
|
|
_wg_sorted = sorted(
|
|
[(i, _stored_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)],
|
|
key=lambda x: (x[1] is None, x[1] or 0)
|
|
)
|
|
_wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)}
|
|
vlan_ifaces = []
|
|
for i, _vlan in enumerate(_all_vlans):
|
|
if is_wg(_vlan):
|
|
vlan_ifaces.append(f"wg{_wg_order[i]}")
|
|
else:
|
|
_vid = _stored_ids[i]
|
|
vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}")
|
|
|
|
# upstream_dns block ============================================
|
|
if not data.get("upstream_dns", {}).get("upstream_servers"):
|
|
errors.append("upstream_dns.upstream_servers is missing or empty.")
|
|
|
|
# WAN / LAN interfaces ==========================================
|
|
gen = data.get("network_interfaces", {})
|
|
wan = gen.get("wan_interface", "")
|
|
lan = gen.get("lan_interface", "")
|
|
if not wan:
|
|
errors.append("network_interfaces.wan_interface is missing or empty.")
|
|
if not lan:
|
|
errors.append("network_interfaces.lan_interface is missing or empty.")
|
|
if wan and lan:
|
|
available_interfaces = set()
|
|
try:
|
|
available_interfaces = set(os.listdir('/sys/class/net'))
|
|
except Exception:
|
|
pass
|
|
err = check_interface_exists(wan, available_interfaces)
|
|
if err:
|
|
errors.append(f"network_interfaces.wan_interface: {err}")
|
|
err = check_interface_exists(lan, available_interfaces)
|
|
if err:
|
|
errors.append(f"network_interfaces.lan_interface: {err}")
|
|
err = check_wan_lan_unique(wan, lan)
|
|
if err:
|
|
errors.append(f"network_interfaces: {err}")
|
|
|
|
# Blocklist library =============================================
|
|
blocklists_by_name = {}
|
|
for idx, bl in enumerate(data.get("dns_blocking", {}).get("blocklists", [])):
|
|
name = bl.get("name", "")
|
|
label = f"dns_blocking.blocklists[{idx}] '{name}'"
|
|
for field in ("name", "description", "save_as", "url", "format"):
|
|
if not bl.get(field):
|
|
errors.append(f"{label}: missing or empty field '{field}'.")
|
|
if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS:
|
|
errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.")
|
|
if name:
|
|
err = check_blocklist_name_unique(
|
|
data.get("dns_blocking", {}).get("blocklists", []), name, exclude_idx=idx
|
|
)
|
|
if err:
|
|
errors.append(f"{label}: {err}")
|
|
else:
|
|
blocklists_by_name[name] = bl
|
|
|
|
# Per-VLAN validation ===========================================
|
|
vlan_networks = {} # iface -> IPv4Network (used for NAT section)
|
|
|
|
for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)):
|
|
vlan_id = _stored_ids[i]
|
|
name = vlan.get("name", "?")
|
|
label = f"vlan '{name}' (id={vlan_id})"
|
|
|
|
if vlan_id is None or not isinstance(vlan_id, int) or not (1 <= vlan_id <= 4094):
|
|
errors.append(f"vlan '{name}': vlan_id must be an integer 1-4094 (got {vlan_id!r}).")
|
|
|
|
err = check_vlan_name_unique(_all_vlans, name, exclude_idx=i)
|
|
if err:
|
|
errors.append(f"{label}: {err}")
|
|
|
|
err = check_vlan_id_unique(_all_vlans, vlan_id, exclude_idx=i)
|
|
if err:
|
|
errors.append(f"{label}: {err}")
|
|
|
|
if iface in seen_interfaces:
|
|
errors.append(f"{label}: duplicate interface '{iface}' "
|
|
f"(also used by '{seen_interfaces[iface]}').")
|
|
else:
|
|
seen_interfaces[iface] = name
|
|
|
|
err = check_mdns_vpn(is_wg(vlan), vlan.get('mdns_reflection'))
|
|
if err:
|
|
errors.append(f"{label}: {err}")
|
|
|
|
if is_wg(vlan):
|
|
# vpn_information =======================================
|
|
vpi = vlan.get("vpn_information")
|
|
if not isinstance(vpi, dict):
|
|
errors.append(f"{label}: vpn_information must be a plain object.")
|
|
vpi = {}
|
|
else:
|
|
lp = vpi.get("listen_port")
|
|
if int_range(lp, 1, 65535) is None:
|
|
errors.append(f"{label}: vpn_information.listen_port must be an integer 1-65535.")
|
|
elif lp in seen_listen_ports:
|
|
errors.append(f"{label}: vpn_information.listen_port {lp} is already used by "
|
|
f"'{seen_listen_ports[lp]}'.")
|
|
else:
|
|
seen_listen_ports[lp] = name
|
|
|
|
# subnet/subnet_mask ====================================
|
|
for field in ("subnet", "subnet_mask"):
|
|
if not vlan.get(field):
|
|
errors.append(f"{label}: missing required field '{field}'.")
|
|
wg_net = None
|
|
if vlan.get("subnet") and vlan.get("subnet_mask"):
|
|
try:
|
|
wg_net = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
|
vlan_networks[iface] = wg_net
|
|
except ValueError as e:
|
|
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
|
|
|
|
# server_identities =====================================
|
|
if not vlan.get("server_identities"):
|
|
errors.append(f"{label}: server_identities is empty or missing.")
|
|
identity_ips = []
|
|
for idx, ident in enumerate(vlan.get("server_identities", [])):
|
|
ip_str = ident.get("ip", "")
|
|
ilabel = f"{label} server_identities[{idx}] '{ident.get('description', '?')}'"
|
|
if not ip_str:
|
|
errors.append(f"{ilabel}: missing 'ip' field.")
|
|
continue
|
|
if not ipv4(ip_str):
|
|
errors.append(f"{ilabel}: ip '{ip_str}' is not a valid IPv4 address.")
|
|
continue
|
|
ip_addr = ipaddress.IPv4Address(ip_str)
|
|
if wg_net and ip_addr not in wg_net:
|
|
errors.append(f"{ilabel}: ip '{ip_str}' is not within subnet {wg_net}.")
|
|
else:
|
|
identity_ips.append(ip_addr)
|
|
|
|
# vpn_information.explicit_overrides ====================
|
|
eo = vpi.get("explicit_overrides", {}) if isinstance(vpi, dict) else {}
|
|
if not isinstance(eo, dict):
|
|
errors.append(f"{label}: vpn_information.explicit_overrides must be a plain object.")
|
|
else:
|
|
gw = eo.get("gateway", "")
|
|
if gw:
|
|
if not ipv4(gw):
|
|
errors.append(f"{label}: vpn_information.explicit_overrides.gateway '{gw}' is not a valid IPv4 address.")
|
|
else:
|
|
gw_ip = ipaddress.IPv4Address(gw)
|
|
if identity_ips and gw_ip not in identity_ips:
|
|
errors.append(
|
|
f"{label}: vpn_information.explicit_overrides.gateway '{gw}' does not match "
|
|
f"any server_identity IP. Must be one of: "
|
|
f"{[str(ip) for ip in identity_ips]}."
|
|
)
|
|
dns = eo.get("dns_servers", "")
|
|
if dns and not ipv4(dns):
|
|
errors.append(f"{label}: vpn_information.explicit_overrides.dns_server '{dns}' is not a valid IPv4 address.")
|
|
mtu = eo.get("mtu", "")
|
|
if mtu and int_range(mtu, 576, 9000) is None:
|
|
errors.append(f"{label}: vpn_information.explicit_overrides.mtu '{mtu}' must be an integer in range 576-9000.")
|
|
domain_val = vpi.get("domain", "") if isinstance(vpi, dict) else ""
|
|
if domain_val and not domainname(domain_val):
|
|
errors.append(f"{label}: vpn_information.domain '{domain_val}' is not a valid domain name.")
|
|
|
|
# peers =================================================
|
|
seen_peer_names = {}
|
|
seen_peer_ips = {}
|
|
for pidx, peer in enumerate(vlan.get("peers", [])):
|
|
pname = peer.get("name", "")
|
|
plabel = f"{label} peer[{pidx}] '{pname}'"
|
|
if not pname:
|
|
errors.append(f"{plabel}: missing 'name' field.")
|
|
elif pname in seen_peer_names:
|
|
errors.append(f"{plabel}: duplicate peer name '{pname}'.")
|
|
else:
|
|
seen_peer_names[pname] = pidx
|
|
if not peer.get("public_key"):
|
|
errors.append(f"{plabel}: missing 'public_key' field.")
|
|
pip_str = peer.get("ip", "")
|
|
if not pip_str:
|
|
errors.append(f"{plabel}: missing 'ip' field.")
|
|
elif not ipv4(pip_str):
|
|
errors.append(f"{plabel}: ip '{pip_str}' is not a valid IPv4 address.")
|
|
else:
|
|
pip = ipaddress.IPv4Address(pip_str)
|
|
if wg_net and pip not in wg_net:
|
|
errors.append(f"{plabel}: ip '{pip_str}' is not within subnet {wg_net}.")
|
|
if pip in identity_ips:
|
|
errors.append(f"{plabel}: ip '{pip_str}' conflicts with a server_identity.")
|
|
if pip_str in seen_peer_ips:
|
|
errors.append(
|
|
f"{plabel}: duplicate peer ip '{pip_str}' "
|
|
f"(also used by peer '{seen_peer_ips[pip_str]}')."
|
|
)
|
|
else:
|
|
seen_peer_ips[pip_str] = pname
|
|
continue
|
|
|
|
if not vlan.get("server_identities"):
|
|
errors.append(f"{label}: server_identities is empty or missing.")
|
|
continue
|
|
|
|
for field in ("subnet", "subnet_mask"):
|
|
if not vlan.get(field):
|
|
errors.append(f"{label}: missing required top-level field '{field}'.")
|
|
if not vlan.get("subnet") or not vlan.get("subnet_mask"):
|
|
continue
|
|
|
|
try:
|
|
network = ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
|
vlan_networks[iface] = network
|
|
except ValueError as e:
|
|
errors.append(f"{label}: invalid subnet/subnet_mask: {e}")
|
|
continue
|
|
|
|
d = vlan.get("dhcp_information", {})
|
|
required_dhcp = {"dynamic_pool_start", "dynamic_pool_end", "lease_time"}
|
|
missing = required_dhcp - set(d.keys())
|
|
if missing:
|
|
errors.append(f"{label}: missing dhcp_information fields: {missing}")
|
|
continue
|
|
|
|
def check_ip(field_label, ip_str, allow_none=False):
|
|
if ip_str is None:
|
|
if not allow_none:
|
|
errors.append(f"{label}: {field_label} is null/missing.")
|
|
return None
|
|
if not ipv4(ip_str):
|
|
errors.append(f"{label}: {field_label} '{ip_str}' is not a valid IPv4 address.")
|
|
return None
|
|
addr = ipaddress.IPv4Address(ip_str)
|
|
if addr not in network:
|
|
errors.append(f"{label}: {field_label} '{ip_str}' is not within subnet {network}.")
|
|
return addr
|
|
|
|
identity_ips = []
|
|
for idx, ident in enumerate(vlan["server_identities"]):
|
|
ip = check_ip(
|
|
f"server_identities[{idx}] '{ident.get('description', '?')}'",
|
|
ident.get("ip")
|
|
)
|
|
if ip:
|
|
identity_ips.append(ip)
|
|
|
|
# Validate explicit_overrides ===============================
|
|
eo = d.get("explicit_overrides", {})
|
|
if not isinstance(eo, dict):
|
|
errors.append(f"{label}: explicit_overrides must be a plain object.")
|
|
else:
|
|
gw = eo.get("gateway", "")
|
|
if gw:
|
|
gw_ip = check_ip("explicit_overrides.gateway", gw)
|
|
if gw_ip and gw_ip not in identity_ips:
|
|
errors.append(
|
|
f"{label}: explicit_overrides.gateway '{gw}' does not match "
|
|
f"any server_identity IP. Must be one of: "
|
|
f"{[str(ip) for ip in identity_ips]}."
|
|
)
|
|
dns = eo.get("dns_servers", "")
|
|
if dns:
|
|
for _ip in (dns if isinstance(dns, list) else [dns]):
|
|
check_ip("explicit_overrides.dns_servers", _ip)
|
|
ntp = eo.get("ntp_servers", "")
|
|
if ntp:
|
|
for _ip in (ntp if isinstance(ntp, list) else [ntp]):
|
|
check_ip("explicit_overrides.ntp_servers", _ip)
|
|
|
|
pool_start = check_ip("dynamic_pool_start", d["dynamic_pool_start"])
|
|
pool_end = check_ip("dynamic_pool_end", d["dynamic_pool_end"])
|
|
|
|
if pool_start and pool_end and pool_start > pool_end:
|
|
errors.append(
|
|
f"{label}: dynamic_pool_start '{pool_start}' is greater than "
|
|
f"dynamic_pool_end '{pool_end}'."
|
|
)
|
|
|
|
if pool_start and pool_end:
|
|
for ip in identity_ips:
|
|
if pool_start <= ip <= pool_end:
|
|
errors.append(
|
|
f"{label}: server_identity '{ip}' falls inside the dynamic "
|
|
f"pool ({pool_start} - {pool_end})."
|
|
)
|
|
|
|
seen_res_ips = {}
|
|
seen_res_macs = {}
|
|
for r in vlan.get("reservations", []):
|
|
rdesc = r.get("description", "?")
|
|
rmac = r.get("mac", "").lower().strip()
|
|
|
|
if is_dynamic_ip(r):
|
|
rip = None
|
|
else:
|
|
rip = check_ip(f"reservation '{rdesc}' ip", r.get("ip"))
|
|
|
|
if rip:
|
|
if pool_start and pool_end and pool_start <= rip <= pool_end:
|
|
errors.append(
|
|
f"{label}: reservation '{rdesc}' ip '{rip}' falls inside "
|
|
f"the dynamic pool ({pool_start} - {pool_end})."
|
|
)
|
|
rip_str = str(rip)
|
|
if rip_str in seen_res_ips:
|
|
# Allow same IP for different MACs (multi-interface device)
|
|
if rmac and rmac in seen_res_ips[rip_str]:
|
|
errors.append(
|
|
f"{label}: reservation '{rdesc}' ip '{rip}' and MAC '{rmac}' "
|
|
f"duplicates '{seen_res_ips[rip_str][rmac]}'."
|
|
)
|
|
else:
|
|
seen_res_ips[rip_str][rmac] = rdesc
|
|
else:
|
|
seen_res_ips[rip_str] = {rmac: rdesc}
|
|
if rip in identity_ips:
|
|
errors.append(
|
|
f"{label}: reservation '{rdesc}' ip '{rip}' conflicts "
|
|
f"with a server_identity."
|
|
)
|
|
|
|
if rmac:
|
|
if rmac in seen_res_macs:
|
|
errors.append(
|
|
f"{label}: reservation '{rdesc}' MAC '{rmac}' duplicates "
|
|
f"'{seen_res_macs[rmac]}'."
|
|
)
|
|
else:
|
|
seen_res_macs[rmac] = rdesc
|
|
|
|
for bl_name in vlan.get("use_blocklists", []):
|
|
if bl_name not in blocklists_by_name:
|
|
errors.append(f"{label}: use_blocklists references unknown blocklist '{bl_name}'.")
|
|
|
|
# NAT / firewall validation =====================================
|
|
valid_protos = VALID_PROTOCOLS
|
|
known_interfaces = set(seen_interfaces.keys())
|
|
|
|
def nat_check_port(label, port):
|
|
if int_range(port, 1, 65535) is None:
|
|
errors.append(f"{label}: '{port}' is not a valid port number (1-65535).")
|
|
|
|
def nat_check_ip(label, ip_str):
|
|
if not ipv4(ip_str):
|
|
errors.append(f"{label}: '{ip_str}' is not a valid IPv4 address.")
|
|
return None
|
|
return ipaddress.IPv4Address(ip_str)
|
|
|
|
def nat_check_ip_in_network(label, ip_str, network):
|
|
ip = nat_check_ip(label, ip_str)
|
|
if ip and ip not in network:
|
|
errors.append(f"{label}: '{ip_str}' is not within subnet {network}.")
|
|
|
|
for vlan, iface in zip(data.get("vlans", []), vlan_ifaces):
|
|
name = vlan.get("name", "?")
|
|
net = vlan_networks.get(iface)
|
|
|
|
for r in vlan.get("port_wrangling", []):
|
|
desc = r.get("description", "?")
|
|
label = f"vlan '{name}' port_wrangling '{desc}'"
|
|
if r.get("protocol") not in valid_protos:
|
|
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
|
f"Must be tcp, udp, or both.")
|
|
nat_check_port(f"{label} dest_port", r.get("dest_port"))
|
|
if net:
|
|
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net)
|
|
|
|
# port_forwarding validation (top-level) ========================
|
|
for idx, r in enumerate(data.get("port_forwarding", [])):
|
|
desc = r.get("description", "?")
|
|
label = f"port_forwarding[{idx}] '{desc}'"
|
|
if r.get("protocol") not in valid_protos:
|
|
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
|
f"Must be tcp, udp, or both.")
|
|
nat_check_port(f"{label} dest_port", r.get("dest_port"))
|
|
nat_check_port(f"{label} nat_port", r.get("nat_port"))
|
|
nat_check_ip(f"{label} nat_ip", r.get("nat_ip", ""))
|
|
|
|
for r in data.get("inter_vlan_exceptions", []):
|
|
desc = r.get("description", "?")
|
|
label = f"inter_vlan_exceptions '{desc}'"
|
|
if r.get("protocol") not in valid_protos:
|
|
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
|
|
f"Must be tcp, udp, or both.")
|
|
if "src_ip_or_subnet" not in r:
|
|
errors.append(f"{label}: missing field 'src_ip_or_subnet'.")
|
|
else:
|
|
val = r["src_ip_or_subnet"]
|
|
if not ipv4_or_cidr(val):
|
|
errors.append(f"{label}: src_ip_or_subnet '{val}' is not a valid "
|
|
f"IPv4 address or network.")
|
|
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
|
|
if not dst:
|
|
errors.append(f"{label}: missing field 'dst_ip_or_subnet'.")
|
|
else:
|
|
if not ipv4_or_cidr(dst):
|
|
errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid "
|
|
f"IPv4 address or network.")
|
|
if r.get("dst_port_min"):
|
|
nat_check_port(f"{label} dst_port_min", r.get("dst_port_min"))
|
|
if r.get("dst_port_max"):
|
|
nat_check_port(f"{label} dst_port_max", r.get("dst_port_max"))
|
|
min_p, max_p = r.get("dst_port_min", ""), r.get("dst_port_max", "")
|
|
if min_p and max_p:
|
|
try:
|
|
if int(min_p) > int(max_p):
|
|
errors.append(f"{label}: dst_port_min {min_p} is greater than dst_port_max {max_p}.")
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# radius_default uniqueness check ===============================
|
|
_rd_idx = next(
|
|
(i for i, v in enumerate(data.get('vlans', [])) if v.get('radius_default')), None
|
|
)
|
|
if _rd_idx is not None:
|
|
err = check_radius_default_unique(data.get('vlans', []), exclude_idx=_rd_idx)
|
|
if err:
|
|
defaults = [v.get('name', '?') for v in data.get('vlans', []) if v.get('radius_default')]
|
|
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
|
|
f"Only one VLAN may be the RADIUS default.")
|
|
|
|
# RADIUS requires multiple VLANs ================================
|
|
non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
|
|
has_radius_clients = any(
|
|
r.get("radius_client")
|
|
for v in non_wg_vlans
|
|
for r in v.get("reservations", [])
|
|
)
|
|
if has_radius_clients and len(non_wg_vlans) < 2:
|
|
errors.append(
|
|
"RADIUS clients are configured but only one non-VPN VLAN exists. "
|
|
"Dynamic VLAN assignment requires at least two VLANs."
|
|
)
|
|
|
|
# host_overrides validation =====================================
|
|
all_vlan_nets = list(vlan_networks.values())
|
|
for idx, entry in enumerate(data.get("host_overrides", [])):
|
|
lbl = f"host_overrides[{idx}] '{entry.get('host', '?')}'"
|
|
if not entry.get("host"):
|
|
errors.append(f"{lbl}: missing 'host' field.")
|
|
ip_str = entry.get("ip", "")
|
|
if not ip_str:
|
|
errors.append(f"{lbl}: missing 'ip' field.")
|
|
elif not ipv4(ip_str):
|
|
errors.append(f"{lbl}: '{ip_str}' is not a valid IPv4 address.")
|
|
else:
|
|
ip_addr = ipaddress.IPv4Address(ip_str)
|
|
if all_vlan_nets and not any(ip_addr in net for net in all_vlan_nets):
|
|
errors.append(f"{lbl}: '{ip_str}' does not fall within any configured VLAN subnet.")
|
|
|
|
# banned_ips validation =========================================
|
|
for idx, entry in enumerate(data.get("banned_ips", [])):
|
|
ip_val = entry.get("ip", "")
|
|
lbl = f"banned_ips[{idx}] '{entry.get('description', '')}'"
|
|
if not ip_val:
|
|
errors.append(f"{lbl}: missing 'ip' field.")
|
|
continue
|
|
if not banned_ip(ip_val):
|
|
errors.append(f"{lbl}: '{ip_val}' is not a valid IP, CIDR, or wildcard pattern.")
|
|
|
|
return errors
|