Development

This commit is contained in:
Matthew Grotke 2026-05-20 17:49:00 -04:00
parent 2bfa5ff29a
commit 8766c6c9a2
4 changed files with 88 additions and 56 deletions

View file

@ -2,7 +2,6 @@ from flask import Blueprint, request, redirect, flash
from auth import require_level from auth import require_level
from config_utils import load_core, save_core, verify_core_hash, queued_msg from config_utils import load_core, save_core, verify_core_hash, queued_msg
import sanitize import sanitize
import ipaddress as _ipaddress
import validation as validate import validation as validate
bp = Blueprint('action_apply_vlans', __name__) bp = Blueprint('action_apply_vlans', __name__)
@ -24,21 +23,6 @@ def _hash_ok():
return True return True
def _derive_vlan_id(subnet, prefix):
"""Return VLAN ID (1-4094) derived from the active octet of the network address,
or None if not derivable. byte_index = (prefix-1) // 8."""
try:
network = _ipaddress.ip_network(f'{subnet}/{prefix}', strict=False)
octets = list(network.network_address.packed)
byte_idx = (prefix - 1) // 8
vlan_id = octets[byte_idx]
if 1 <= vlan_id <= 4094:
return vlan_id
return None
except Exception:
return None
@bp.route('/action/add_vlan', methods=['POST']) @bp.route('/action/add_vlan', methods=['POST'])
@require_level('administrator') @require_level('administrator')
def add_vlan(): def add_vlan():
@ -63,7 +47,7 @@ def add_vlan():
flash('Invalid subnet prefix (must be 1-30).', 'error') flash('Invalid subnet prefix (must be 1-30).', 'error')
return redirect(VIEW) return redirect(VIEW)
vlan_id = _derive_vlan_id(subnet, subnet_mask) vlan_id = validate.derive_vlan_id(subnet, subnet_mask)
if vlan_id is None: if vlan_id is None:
flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error') flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error')
return redirect(VIEW) return redirect(VIEW)
@ -74,7 +58,7 @@ def add_vlan():
core = load_core() core = load_core()
vlans = core.setdefault('vlans', []) vlans = core.setdefault('vlans', [])
if any(v.get('vlan_id') == vlan_id for v in vlans): if any(validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) == vlan_id for v in vlans):
flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error')
return redirect(VIEW) return redirect(VIEW)
@ -83,7 +67,6 @@ def add_vlan():
return redirect(VIEW) return redirect(VIEW)
entry = { entry = {
'vlan_id': vlan_id,
'name': name, 'name': name,
'is_vpn': is_vpn, 'is_vpn': is_vpn,
'subnet': subnet, 'subnet': subnet,
@ -157,17 +140,20 @@ def edit_vlan():
# Use submitted subnet_mask, or fall back to whatever is already stored. # Use submitted subnet_mask, or fall back to whatever is already stored.
final_mask = subnet_mask if subnet_mask is not None else existing.get('subnet_mask', 24) final_mask = subnet_mask if subnet_mask is not None else existing.get('subnet_mask', 24)
vlan_id = _derive_vlan_id(subnet, final_mask) vlan_id = validate.derive_vlan_id(subnet, final_mask)
if vlan_id is None: if vlan_id is None:
flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error') flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error')
return redirect(VIEW) return redirect(VIEW)
current_id = existing.get('vlan_id') current_id = validate.derive_vlan_id(existing.get('subnet', ''), existing.get('subnet_mask', 24))
if current_id == 1 and vlan_id != 1: if current_id == 1 and vlan_id != 1:
flash('VLAN 1 is the physical interface; change its subnet so the derived ID remains 1.', 'error') flash('VLAN 1 is the physical interface; change its subnet so the derived ID remains 1.', 'error')
return redirect(VIEW) return redirect(VIEW)
if vlan_id != current_id and any(i != idx and v.get('vlan_id') == vlan_id for i, v in enumerate(vlans)): if vlan_id != current_id and any(
validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) == vlan_id
for i, v in enumerate(vlans) if i != idx
):
flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error')
return redirect(VIEW) return redirect(VIEW)
@ -176,7 +162,6 @@ def edit_vlan():
return redirect(VIEW) return redirect(VIEW)
existing.update({ existing.update({
'vlan_id': vlan_id,
'name': name, 'name': name,
'is_vpn': is_vpn, 'is_vpn': is_vpn,
'subnet': subnet, 'subnet': subnet,

View file

@ -152,13 +152,17 @@ def _iface_status(iface):
def _resolve_iface(vlan, core): def _resolve_iface(vlan, core):
"""Compute interface name from is_vpn + vlan_id + general.lan_interface.""" """Compute interface name from is_vpn + derived vlan_id + general.lan_interface."""
if vlan.get('is_vpn'): if vlan.get('is_vpn'):
wg_vlans = [v for v in core.get('vlans', []) if v.get('is_vpn')] wg_vlans = [v for v in core.get('vlans', []) if v.get('is_vpn')]
idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0) wg_sorted = sorted(wg_vlans, key=lambda v: (
validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None,
validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0,
))
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
return f'wg{idx}' return f'wg{idx}'
lan = core.get('general', {}).get('lan_interface', 'eth0') lan = core.get('general', {}).get('lan_interface', 'eth0')
vid = vlan.get('vlan_id', 1) vid = validate.derive_vlan_id(vlan.get('subnet', ''), vlan.get('subnet_mask', 24)) or 1
return lan if vid == 1 else f'{lan}.{vid}' return lan if vid == 1 else f'{lan}.{vid}'
@ -268,8 +272,9 @@ def _config_datasource(name):
if name == 'vlans': if name == 'vlans':
bl_desc = {b['name']: b.get('description', b['name']) for b in core.get('blocklists', []) if 'name' in b} bl_desc = {b['name']: b.get('description', b['name']) for b in core.get('blocklists', []) if 'name' in b}
rows = [] rows = []
for v in sorted(vlans, key=lambda x: x.get('vlan_id', 0)): for v in sorted(vlans, key=lambda x: validate.derive_vlan_id(x.get('subnet', ''), x.get('subnet_mask', 24)) or 0):
row = {k: v.get(k) for k in ('vlan_id', 'name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn')} row = {k: v.get(k) for k in ('name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn')}
row['vlan_id'] = validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24))
row['interface'] = _resolve_iface(v, core) row['interface'] = _resolve_iface(v, core)
row['use_blocklists'] = json.dumps([ row['use_blocklists'] = json.dumps([
{'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', []) {'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', [])
@ -319,9 +324,14 @@ def _config_datasource(name):
if name == 'vpn_peers': if name == 'vpn_peers':
rows = [] rows = []
for i, vlan in enumerate(v for v in vlans if v.get('is_vpn')): _wg_sorted = sorted(
[v for v in vlans if v.get('is_vpn')],
key=lambda v: (validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None,
validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0)
)
for i, vlan in enumerate(_wg_sorted):
iface = f'wg{i}' iface = f'wg{i}'
vlan_display = f'{iface} (VLAN {vlan.get("vlan_id", "?")})' vlan_display = f'{iface} (VLAN {validate.derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24)) or "?"})'
for peer in vlan.get('peers', []): for peer in vlan.get('peers', []):
row = dict(peer) row = dict(peer)
row['vlan_display'] = vlan_display row['vlan_display'] = vlan_display
@ -539,7 +549,7 @@ def collect_tokens():
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names]) tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn'))) tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn')))
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans]) tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) for v in vlans])
tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name', '') for v in vlans]) tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name', '') for v in vlans])
tokens['EXISTING_VLAN_INTERFACES_JSON'] = json.dumps([_resolve_iface(v, core) for v in vlans]) tokens['EXISTING_VLAN_INTERFACES_JSON'] = json.dumps([_resolve_iface(v, core) for v in vlans])
tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in core.get('banned_ips', []) if b.get('enabled', True))) tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in core.get('banned_ips', []) if b.get('enabled', True)))
@ -555,9 +565,13 @@ def collect_tokens():
for p in validate.VALID_DDNS_PROVIDERS for p in validate.VALID_DDNS_PROVIDERS
]) ])
wg_vlans_list = [v for v in vlans if v.get('is_vpn')] wg_vlans_list = sorted(
[v for v in vlans if v.get('is_vpn')],
key=lambda v: (validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) is None,
validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) or 0)
)
tokens['VPN_VLAN_OPTIONS'] = json.dumps([ tokens['VPN_VLAN_OPTIONS'] = json.dumps([
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id", "?")})'} {'value': v.get('name', ''), 'label': f'wg{i} (VLAN {validate.derive_vlan_id(v.get("subnet", ""), v.get("subnet_mask", 24)) or "?"})'}
for i, v in enumerate(wg_vlans_list) for i, v in enumerate(wg_vlans_list)
]) ])
wg_vlan = wg_vlans_list[0] if wg_vlans_list else {} wg_vlan = wg_vlans_list[0] if wg_vlans_list else {}

View file

@ -103,8 +103,8 @@ from pathlib import Path
from validation import ( from validation import (
VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS, VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS,
int_range, domainname, int_range, domainname,
inject_interfaces, is_wg, is_dynamic_ip, is_wg, is_dynamic_ip,
validate_config, resolve_vlan_derived_fields, validate_config,
) )
SCRIPT_DIR = Path(__file__).parent SCRIPT_DIR = Path(__file__).parent
@ -3079,7 +3079,7 @@ def cmd_apply(data, dry_run=False):
dnsmasq confs, start/restart all services whose interface is up, nftables, dnsmasq confs, start/restart all services whose interface is up, nftables,
timer, and boot service. Safe to run repeatedly. timer, and boot service. Safe to run repeatedly.
""" """
inject_interfaces(data) data = resolve_vlan_derived_fields(data)
if dry_run: if dry_run:
print("[DRY RUN] --apply would perform the following actions:") print("[DRY RUN] --apply would perform the following actions:")
print() print()

View file

@ -253,22 +253,47 @@ def is_dynamic_ip(r):
return ip in ("", "dynamic") or ip is None return ip in ("", "dynamic") or ip is None
def inject_interfaces(data): def derive_vlan_id(subnet, prefix):
"""Mutate data in-place: add derived 'interface' field to every VLAN. """Return VLAN ID (1-4094) derived from the active octet of the network address, or None."""
try:
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
octets = list(network.network_address.packed)
byte_idx = (int(prefix) - 1) // 8
vlan_id = octets[byte_idx]
if 1 <= vlan_id <= 4094:
return vlan_id
return None
except Exception:
return None
Called by core.py cmd_apply before apply logic. NOT called by
validate_config validate_config computes iface locally to stay pure. def resolve_vlan_derived_fields(data):
"""Return a deep copy of data with vlan_id and interface computed for every VLAN.
WireGuard VLANs are assigned wg0/wg1/... in ascending vlan_id order for
deterministic interface naming regardless of JSON list order.
Does not mutate the input dict.
""" """
lan = data.get("general", {}).get("lan_interface", "eth0") import copy
wg_idx = 0 result = copy.deepcopy(data)
for vlan in data.get("vlans", []): lan = result.get("general", {}).get("lan_interface", "eth0")
if vlan.get("is_vpn"): vlans = result.get("vlans", [])
vlan["interface"] = f"wg{wg_idx}"
wg_idx += 1 for vlan in vlans:
else: vlan["vlan_id"] = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24))
wg_entries = [(i, v) for i, v in enumerate(vlans) if is_wg(v)]
wg_sorted = sorted(wg_entries, key=lambda x: (x[1].get("vlan_id") is None, x[1].get("vlan_id") or 0))
for wg_idx, (_, vlan) in enumerate(wg_sorted):
vlan["interface"] = f"wg{wg_idx}"
for vlan in vlans:
if not is_wg(vlan):
vid = vlan.get("vlan_id", 1) vid = vlan.get("vlan_id", 1)
vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}" vlan["interface"] = lan if vid == 1 else f"{lan}.{vid}"
return result
# =================================================================== # ===================================================================
# Full config validation (shared with core.py --apply) # Full config validation (shared with core.py --apply)
@ -282,16 +307,24 @@ def validate_config(data):
seen_names = {} seen_names = {}
seen_listen_ports = {} seen_listen_ports = {}
# Pre-compute per-VLAN interface names without mutating data # Pre-compute per-VLAN vlan_ids and interface names without mutating data
_lan = data.get("general", {}).get("lan_interface", "eth0") _lan = data.get("general", {}).get("lan_interface", "eth0")
_all_vlans = data.get("vlans", [])
_derived_ids = [
derive_vlan_id(_v.get("subnet", ""), _v.get("subnet_mask", 24))
for _v in _all_vlans
]
_wg_sorted = sorted(
[(i, _derived_ids[i]) for i, _v in enumerate(_all_vlans) if is_wg(_v)],
key=lambda x: (x[1] is None, x[1] or 0)
)
_wg_order = {orig_i: wg_idx for wg_idx, (orig_i, _) in enumerate(_wg_sorted)}
vlan_ifaces = [] vlan_ifaces = []
_wg_idx = 0 for i, _vlan in enumerate(_all_vlans):
for _vlan in data.get("vlans", []):
if is_wg(_vlan): if is_wg(_vlan):
vlan_ifaces.append(f"wg{_wg_idx}") vlan_ifaces.append(f"wg{_wg_order[i]}")
_wg_idx += 1
else: else:
_vid = _vlan.get("vlan_id", 1) _vid = _derived_ids[i]
vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}") vlan_ifaces.append(_lan if _vid == 1 else f"{_lan}.{_vid}")
# -- upstream_dns block ---------------------------------------------------- # -- upstream_dns block ----------------------------------------------------
@ -339,8 +372,8 @@ def validate_config(data):
# -- Per-VLAN validation --------------------------------------------------- # -- Per-VLAN validation ---------------------------------------------------
vlan_networks = {} # iface -> IPv4Network (used for NAT section) vlan_networks = {} # iface -> IPv4Network (used for NAT section)
for vlan, iface in zip(data.get("vlans", []), vlan_ifaces): for i, (vlan, iface) in enumerate(zip(_all_vlans, vlan_ifaces)):
vlan_id = vlan.get("vlan_id") vlan_id = _derived_ids[i]
name = vlan.get("name", "?") name = vlan.get("name", "?")
label = f"vlan '{name}' (id={vlan_id})" label = f"vlan '{name}' (id={vlan_id})"