Development
This commit is contained in:
parent
6c3abca58c
commit
96f6e32c8f
9 changed files with 294 additions and 166 deletions
|
|
@ -253,6 +253,79 @@ def _check_banned_ipv6(ip_str):
|
|||
raise ValueError(f"IPv6 wildcard must have 1-7 prefix groups: {ip_str!r}")
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Cross-VLAN uniqueness checks (callable independently by action.py)
|
||||
# ===================================================================
|
||||
|
||||
def check_vlan_name_unique(vlans, name, exclude_idx=None):
|
||||
"""Return error string if name is already used by another VLAN, else None."""
|
||||
for i, v in enumerate(vlans):
|
||||
if exclude_idx is not None and i == exclude_idx:
|
||||
continue
|
||||
if v.get('name') == name:
|
||||
return f'VLAN name "{name}" is already in use (VLAN ID {v.get("vlan_id")}).'
|
||||
return None
|
||||
|
||||
|
||||
def check_vlan_id_unique(vlans, vlan_id, exclude_idx=None):
|
||||
"""Return error string if vlan_id is already used by another VLAN, else None."""
|
||||
for i, v in enumerate(vlans):
|
||||
if exclude_idx is not None and i == exclude_idx:
|
||||
continue
|
||||
if v.get('vlan_id') == vlan_id:
|
||||
return f'VLAN ID {vlan_id} is already in use (VLAN "{v.get("name")}").'
|
||||
return None
|
||||
|
||||
|
||||
def check_radius_default_unique(vlans, exclude_idx=None):
|
||||
"""Return error string if another VLAN (not exclude_idx) is already the RADIUS default, else None."""
|
||||
for i, v in enumerate(vlans):
|
||||
if exclude_idx is not None and i == exclude_idx:
|
||||
continue
|
||||
if v.get('radius_default'):
|
||||
return f'VLAN "{v.get("name", "?")}" is already the RADIUS default. Only one VLAN may be the default.'
|
||||
return None
|
||||
|
||||
|
||||
def check_mdns_vpn(is_vpn, mdns_reflection):
|
||||
"""Return error string if mDNS reflection is enabled on a VPN VLAN, else None."""
|
||||
if is_vpn and mdns_reflection:
|
||||
return 'mDNS reflection is not supported on VPN VLANs.'
|
||||
return None
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Blocklist checks (callable independently by action.py)
|
||||
# ===================================================================
|
||||
|
||||
def check_blocklist_name_unique(blocklists, name, exclude_idx=None):
|
||||
"""Return error string if name is already used by another blocklist entry, else None."""
|
||||
for i, b in enumerate(blocklists):
|
||||
if exclude_idx is not None and i == exclude_idx:
|
||||
continue
|
||||
if b.get('name') == name:
|
||||
return f'A blocklist named "{name}" already exists.'
|
||||
return None
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# Physical interface checks (callable independently by action.py)
|
||||
# ===================================================================
|
||||
|
||||
def check_wan_lan_unique(wan, lan):
|
||||
"""Return error string if WAN and LAN resolve to the same interface, else None."""
|
||||
if wan and lan and wan == lan:
|
||||
return 'WAN and LAN interfaces must be different.'
|
||||
return None
|
||||
|
||||
|
||||
def check_interface_exists(iface, available):
|
||||
"""Return error string if iface is absent from the available set (when non-empty), else None."""
|
||||
if available and iface not in available:
|
||||
return f"Interface '{iface}' does not exist on this system."
|
||||
return None
|
||||
|
||||
|
||||
# ===================================================================
|
||||
# VLAN / interface helpers (shared with core.py apply logic)
|
||||
# ===================================================================
|
||||
|
|
@ -300,9 +373,7 @@ def derive_interface(vlan, data):
|
|||
def validate_config(data):
|
||||
"""Validate config.json structure and content. Returns list of error strings."""
|
||||
errors = []
|
||||
seen_vlan_ids = {}
|
||||
seen_interfaces = {}
|
||||
seen_names = {}
|
||||
seen_listen_ports = {}
|
||||
|
||||
# Pre-compute per-VLAN vlan_ids and interface names without mutating data
|
||||
|
|
@ -340,13 +411,15 @@ def validate_config(data):
|
|||
available_interfaces = set(os.listdir('/sys/class/net'))
|
||||
except Exception:
|
||||
pass
|
||||
if available_interfaces:
|
||||
if wan not in available_interfaces:
|
||||
errors.append(f"network_interfaces.wan_interface: '{wan}' does not exist on this system.")
|
||||
if lan not in available_interfaces:
|
||||
errors.append(f"network_interfaces.lan_interface: '{lan}' does not exist on this system.")
|
||||
if wan == lan:
|
||||
errors.append(f"network_interfaces.wan_interface and network_interfaces.lan_interface must be different (both set to '{wan}').")
|
||||
err = check_interface_exists(wan, available_interfaces)
|
||||
if err:
|
||||
errors.append(f"network_interfaces.wan_interface: {err}")
|
||||
err = check_interface_exists(lan, available_interfaces)
|
||||
if err:
|
||||
errors.append(f"network_interfaces.lan_interface: {err}")
|
||||
err = check_wan_lan_unique(wan, lan)
|
||||
if err:
|
||||
errors.append(f"network_interfaces: {err}")
|
||||
|
||||
# Blocklist library =============================================
|
||||
blocklists_by_name = {}
|
||||
|
|
@ -359,8 +432,11 @@ def validate_config(data):
|
|||
if bl.get("format") and bl["format"] not in VALID_BLOCKLIST_FORMATS:
|
||||
errors.append(f"{label}: format must be one of: {', '.join(sorted(VALID_BLOCKLIST_FORMATS))}.")
|
||||
if name:
|
||||
if name in blocklists_by_name:
|
||||
errors.append(f"{label}: duplicate blocklist name '{name}'.")
|
||||
err = check_blocklist_name_unique(
|
||||
data.get("dns_blocking", {}).get("blocklists", []), name, exclude_idx=idx
|
||||
)
|
||||
if err:
|
||||
errors.append(f"{label}: {err}")
|
||||
else:
|
||||
blocklists_by_name[name] = bl
|
||||
|
||||
|
|
@ -375,17 +451,13 @@ def validate_config(data):
|
|||
if vlan_id is None or not isinstance(vlan_id, int) or not (1 <= vlan_id <= 4094):
|
||||
errors.append(f"vlan '{name}': vlan_id must be an integer 1-4094 (got {vlan_id!r}).")
|
||||
|
||||
if name in seen_names:
|
||||
errors.append(f"{label}: duplicate vlan name '{name}' "
|
||||
f"(also used by id={seen_names[name]}).")
|
||||
else:
|
||||
seen_names[name] = vlan_id
|
||||
err = check_vlan_name_unique(_all_vlans, name, exclude_idx=i)
|
||||
if err:
|
||||
errors.append(f"{label}: {err}")
|
||||
|
||||
if vlan_id in seen_vlan_ids:
|
||||
errors.append(f"{label}: duplicate vlan_id {vlan_id} "
|
||||
f"(also used by '{seen_vlan_ids[vlan_id]}').")
|
||||
else:
|
||||
seen_vlan_ids[vlan_id] = name
|
||||
err = check_vlan_id_unique(_all_vlans, vlan_id, exclude_idx=i)
|
||||
if err:
|
||||
errors.append(f"{label}: {err}")
|
||||
|
||||
if iface in seen_interfaces:
|
||||
errors.append(f"{label}: duplicate interface '{iface}' "
|
||||
|
|
@ -393,8 +465,9 @@ def validate_config(data):
|
|||
else:
|
||||
seen_interfaces[iface] = name
|
||||
|
||||
if vlan.get("mdns_reflection") is True and is_wg(vlan):
|
||||
errors.append(f"{label}: mdns_reflection must be false for WireGuard interfaces.")
|
||||
err = check_mdns_vpn(is_wg(vlan), vlan.get('mdns_reflection'))
|
||||
if err:
|
||||
errors.append(f"{label}: {err}")
|
||||
|
||||
if is_wg(vlan):
|
||||
# vpn_information =======================================
|
||||
|
|
@ -702,14 +775,28 @@ def validate_config(data):
|
|||
if not ipv4_or_cidr(dst):
|
||||
errors.append(f"{label}: dst_ip_or_subnet '{dst}' is not a valid "
|
||||
f"IPv4 address or network.")
|
||||
if r.get("dst_port") is not None:
|
||||
nat_check_port(f"{label} dst_port", r.get("dst_port"))
|
||||
if r.get("dst_port_min"):
|
||||
nat_check_port(f"{label} dst_port_min", r.get("dst_port_min"))
|
||||
if r.get("dst_port_max"):
|
||||
nat_check_port(f"{label} dst_port_max", r.get("dst_port_max"))
|
||||
min_p, max_p = r.get("dst_port_min", ""), r.get("dst_port_max", "")
|
||||
if min_p and max_p:
|
||||
try:
|
||||
if int(min_p) > int(max_p):
|
||||
errors.append(f"{label}: dst_port_min {min_p} is greater than dst_port_max {max_p}.")
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# radius_default uniqueness check ===============================
|
||||
defaults = [v["name"] for v in data.get("vlans", []) if v.get("radius_default") is True]
|
||||
if len(defaults) > 1:
|
||||
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
|
||||
f"Only one VLAN may be the RADIUS default.")
|
||||
_rd_idx = next(
|
||||
(i for i, v in enumerate(data.get('vlans', [])) if v.get('radius_default')), None
|
||||
)
|
||||
if _rd_idx is not None:
|
||||
err = check_radius_default_unique(data.get('vlans', []), exclude_idx=_rd_idx)
|
||||
if err:
|
||||
defaults = [v.get('name', '?') for v in data.get('vlans', []) if v.get('radius_default')]
|
||||
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
|
||||
f"Only one VLAN may be the RADIUS default.")
|
||||
|
||||
# RADIUS requires multiple VLANs ================================
|
||||
non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue