From 5575b06b643d29f84e011791e4214acd2e24574f Mon Sep 17 00:00:00 2001 From: Matthew Grotke Date: Sun, 31 May 2026 22:29:05 -0400 Subject: [PATCH] Development --- docker/routlin-dash/app/factory.py | 12 ++- docker/routlin-dash/app/pages/dhcp/action.py | 24 +----- .../routlin-dash/app/pages/dhcp/content.json | 13 +++- .../app/pages/hostoverrides/action.py | 33 ++------ docker/routlin-dash/app/pages/vpn/action.py | 19 +++-- docker/routlin-dash/app/view_page.py | 12 +++ routlin/validation.py | 76 +++++++++++++++++++ 7 files changed, 126 insertions(+), 63 deletions(-) diff --git a/docker/routlin-dash/app/factory.py b/docker/routlin-dash/app/factory.py index 0842725..c09146e 100644 --- a/docker/routlin-dash/app/factory.py +++ b/docker/routlin-dash/app/factory.py @@ -362,7 +362,10 @@ def build_form_script(field_specs, submit_sel): checkbox_only.append(vn) else: validate_items.append((vn, nm)) - gate_vars.append(f'{vn} && {vn}._valid') + if spec.get('optional'): + gate_vars.append(f'(!{vn} || !{vn}.value.trim() || {vn}._valid)') + else: + gate_vars.append(f'{vn} && {vn}._valid') lines.append('') @@ -785,21 +788,22 @@ def build_field(item, tokens): _vmask = parse_validation(validate_raw) if validate_raw else 0 validate_attr = f' data-validate="{_vmask}"' if _vmask else '' depends_attr = f' data-depends="{e(",".join(depends))}"' if depends else '' - extra_attrs = ''.join(f' {e(ak)}="{e(str(av))}"' for ak, av in item.get('attrs', {}).items()) + extra_attrs = ''.join(f' {e(ak)}="{e(apply_tokens(str(av), tokens))}"' for ak, av in item.get('attrs', {}).items()) + optional_attr = ' data-optional="1"' if item.get('optional') else '' existing_ids = apply_tokens(item.get('existing_ids', ''), tokens) existing_attr = f' data-existing-ids="{e(existing_ids)}"' if existing_ids else '' if _vmask: return ( f'
' f'
' + f' placeholder="{placeholder}" class="form-input"{readonly}{validate_attr}{depends_attr}{extra_attrs}{optional_attr}{existing_attr}/>' f'
' f'{hint_html}
' ) return ( f'
' f'' + f' placeholder="{placeholder}" class="form-input"{readonly}{validate_attr}{depends_attr}{extra_attrs}{optional_attr}{existing_attr}/>' f'{hint_html}
' ) diff --git a/docker/routlin-dash/app/pages/dhcp/action.py b/docker/routlin-dash/app/pages/dhcp/action.py index 2348c2f..8c2f1aa 100644 --- a/docker/routlin-dash/app/pages/dhcp/action.py +++ b/docker/routlin-dash/app/pages/dhcp/action.py @@ -1,6 +1,5 @@ from pathlib import Path import copy -import ipaddress from flask import Blueprint, request, redirect, flash from auth import require_level @@ -47,25 +46,6 @@ def _parse_ip(): return ip -def _check_ip_conflicts(ip, vlan): - if ip == 'dynamic': - return None - dhcp = vlan.get('dhcp_information', {}) - pool_start = dhcp.get('dynamic_pool_start') - pool_end = dhcp.get('dynamic_pool_end') - if pool_start and pool_end: - try: - if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip) - <= ipaddress.IPv4Address(pool_end)): - return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).' - except Exception: - pass - identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')} - if ip in identity_ips: - return f'{ip} is already assigned as a server identity IP.' - return None - - @bp.route('/action/dhcp/addreservation_add', methods=['POST']) @require_level('administrator') def addreservation_add(): @@ -94,7 +74,7 @@ def addreservation_add(): flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error') return redirect(f'/{_PAGE}') - conflict = _check_ip_conflicts(ip, vlan) + conflict = validate.check_reservation_ip_conflicts(ip, vlan) if conflict: flash(f'The configuration has not been saved because {conflict}', 'error') return redirect(f'/{_PAGE}') @@ -181,7 +161,7 @@ def reservations_edit(): flash('Entry not found.', 'error') return redirect(f'/{_PAGE}') - conflict = _check_ip_conflicts(ip, vlans[vi]) + conflict = validate.check_reservation_ip_conflicts(ip, vlans[vi]) if conflict: flash(f'The configuration has not been saved because {conflict}', 'error') return redirect(f'/{_PAGE}') diff --git a/docker/routlin-dash/app/pages/dhcp/content.json b/docker/routlin-dash/app/pages/dhcp/content.json index 9ecf296..8a35f17 100644 --- a/docker/routlin-dash/app/pages/dhcp/content.json +++ b/docker/routlin-dash/app/pages/dhcp/content.json @@ -172,7 +172,11 @@ "name": "hostname", "input_type": "text", "validate": "VALIDATION_NETWORK_NAME", - "placeholder": "e.g. nas" + "optional": true, + "placeholder": "e.g. nas", + "attrs": { + "data-res-hosts-by-vlan": "%RESERVATION_HOSTNAMES_BY_VLAN_JSON%" + } }, { "type": "field", @@ -187,8 +191,13 @@ "label": "IP Address", "name": "ip", "input_type": "text", + "validate": "VALIDATION_IPV4_FORMAT", + "optional": true, "placeholder": "e.g. 192.168.10.50", - "hint": "Leave blank to authorize device on this VLAN dynamically." + "hint": "Leave blank to authorize device on this VLAN dynamically.", + "attrs": { + "data-res-ips-by-vlan": "%RESERVATION_IPS_BY_VLAN_JSON%" + } }, { "type": "field", diff --git a/docker/routlin-dash/app/pages/hostoverrides/action.py b/docker/routlin-dash/app/pages/hostoverrides/action.py index 557b2aa..8561657 100644 --- a/docker/routlin-dash/app/pages/hostoverrides/action.py +++ b/docker/routlin-dash/app/pages/hostoverrides/action.py @@ -1,6 +1,5 @@ from pathlib import Path import copy -import ipaddress from flask import Blueprint, request, redirect, flash from auth import require_level @@ -12,28 +11,6 @@ _PAGE = Path(__file__).parent.name bp = Blueprint(_PAGE, __name__) -def _vlan_networks(cfg): - nets = [] - for v in cfg.get('vlans', []): - subnet = v.get('subnet', '') - mask = v.get('subnet_mask', '') - if subnet and mask: - try: - nets.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False)) - except ValueError: - pass - return nets - - -def _ip_in_vlan(ip_str, cfg): - try: - addr = ipaddress.IPv4Address(ip_str) - except ValueError: - return False - nets = _vlan_networks(cfg) - return not nets or any(addr in net for net in nets) - - def _row_index(): try: return int(request.form.get('row_index', '')) @@ -62,8 +39,9 @@ def addoverride_add(): return redirect(f'/{_PAGE}') cfg = load_config() - if not _ip_in_vlan(ip, cfg): - flash('IP address does not fall within any configured VLAN subnet.', 'error') + err = validate.check_host_override_ip_in_vlans(ip, cfg) + if err: + flash(err, 'error') return redirect(f'/{_PAGE}') entry = {'description': description, 'host': host, 'ip': ip, 'enabled': True} @@ -130,8 +108,9 @@ def table_edit(): return redirect(f'/{_PAGE}') cfg = load_config() - if not _ip_in_vlan(ip, cfg): - flash('IP address does not fall within any configured VLAN subnet.', 'error') + err = validate.check_host_override_ip_in_vlans(ip, cfg) + if err: + flash(err, 'error') return redirect(f'/{_PAGE}') items = cfg.get('host_overrides', []) diff --git a/docker/routlin-dash/app/pages/vpn/action.py b/docker/routlin-dash/app/pages/vpn/action.py index 10f56bd..9dd9a7a 100644 --- a/docker/routlin-dash/app/pages/vpn/action.py +++ b/docker/routlin-dash/app/pages/vpn/action.py @@ -172,10 +172,11 @@ def wireguard_apply(): flash('No WireGuard VLAN found in configuration.', 'error') return redirect(f'/{_PAGE}') - for v in cfg.get('vlans', []): - if v.get('is_vpn') and v is not vpn_vlan and v.get('vpn_information', {}).get('listen_port') == listen_port: - flash(f'Listen port {listen_port} is already used by another VPN VLAN.', 'error') - return redirect(f'/{_PAGE}') + err = validate.check_vpn_listen_port_unique(cfg.get('vlans', []), listen_port, + exclude_vlan_name=vpn_vlan.get('name')) + if err: + flash(err, 'error') + return redirect(f'/{_PAGE}') before_info = copy.deepcopy(vpn_vlan.get('vpn_information', {})) info = vpn_vlan.setdefault('vpn_information', {}) @@ -242,8 +243,9 @@ def addpeer_add(): pass peers = vpn_vlan.setdefault('peers', []) - if any(p.get('name') == peer_name for p in peers): - flash(f'A peer named "{peer_name}" already exists.', 'error') + err = validate.check_peer_name_unique(peers, peer_name) + if err: + flash(err, 'error') return redirect(f'/{_PAGE}') for v in cfg.get('vlans', []): if not v.get('is_vpn'): @@ -297,8 +299,9 @@ def peers_edit(): return redirect(f'/{_PAGE}') peers = vlan.get('peers', []) - if any(j != peer_idx and p.get('name') == peer_name for j, p in enumerate(peers)): - flash(f'A peer named "{peer_name}" already exists.', 'error') + err = validate.check_peer_name_unique(peers, peer_name, exclude_idx=peer_idx) + if err: + flash(err, 'error') return redirect(f'/{_PAGE}') before = copy.deepcopy({k: peers[peer_idx].get(k) for k in ('name', 'split_tunnel', 'enabled')}) diff --git a/docker/routlin-dash/app/view_page.py b/docker/routlin-dash/app/view_page.py index 749fc44..d985282 100644 --- a/docker/routlin-dash/app/view_page.py +++ b/docker/routlin-dash/app/view_page.py @@ -789,6 +789,18 @@ def collect_tokens(): tokens['VLAN_FILTER_OPTIONS'] = filter_opts tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names]) tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn'))) + _res_ips_by_vlan = {} + _res_hosts_by_vlan = {} + for _v in vlans: + _vn = _v.get('name', '') + if not _vn: + continue + _res_ips_by_vlan[_vn] = [r['ip'] for r in _v.get('reservations', []) + if r.get('ip') and r['ip'] != 'dynamic'] + _res_hosts_by_vlan[_vn] = [r['hostname'] for r in _v.get('reservations', []) + if r.get('hostname')] + tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(_res_ips_by_vlan) + tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(_res_hosts_by_vlan) tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans]) tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name') for v in vlans]) _dv = next((v for v in vlans if v.get('radius_default')), None) diff --git a/routlin/validation.py b/routlin/validation.py index 10b6db6..e0e44f8 100644 --- a/routlin/validation.py +++ b/routlin/validation.py @@ -308,6 +308,82 @@ def check_blocklist_name_unique(blocklists, name, exclude_idx=None): return None +# =================================================================== +# DHCP reservation checks (callable independently by action.py) +# =================================================================== + +def check_reservation_ip_conflicts(ip, vlan): + """Return error string if ip conflicts with the VLAN's pool range or server identities, else None.""" + if not ip or ip == 'dynamic': + return None + dhcp = vlan.get('dhcp_information', {}) + pool_start = dhcp.get('dynamic_pool_start') + pool_end = dhcp.get('dynamic_pool_end') + if pool_start and pool_end: + try: + if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip) + <= ipaddress.IPv4Address(pool_end)): + return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).' + except Exception: + pass + identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')} + if ip in identity_ips: + return f'{ip} is already assigned as a server identity IP.' + return None + + +# =================================================================== +# Host override checks (callable independently by action.py) +# =================================================================== + +def check_host_override_ip_in_vlans(ip_str, cfg): + """Return error string if ip_str is not within any configured VLAN subnet, else None.""" + try: + addr = ipaddress.IPv4Address(ip_str) + except ValueError: + return f"'{ip_str}' is not a valid IPv4 address." + nets = [] + for v in cfg.get('vlans', []): + subnet = v.get('subnet', '') + mask = v.get('subnet_mask', '') + if subnet and mask: + try: + nets.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False)) + except ValueError: + pass + if not nets: + return None + if not any(addr in net for net in nets): + return 'IP address does not fall within any configured VLAN subnet.' + return None + + +# =================================================================== +# VPN uniqueness checks (callable independently by action.py) +# =================================================================== + +def check_vpn_listen_port_unique(vlans, listen_port, exclude_vlan_name=None): + """Return error string if listen_port is already used by another VPN VLAN, else None.""" + for v in vlans: + if not v.get('is_vpn'): + continue + if exclude_vlan_name is not None and v.get('name') == exclude_vlan_name: + continue + if v.get('vpn_information', {}).get('listen_port') == listen_port: + return f'Listen port {listen_port} is already used by another VPN VLAN.' + return None + + +def check_peer_name_unique(peers, name, exclude_idx=None): + """Return error string if name is already used by another peer, else None.""" + for i, p in enumerate(peers): + if exclude_idx is not None and i == exclude_idx: + continue + if p.get('name') == name: + return f'A peer named "{name}" already exists.' + return None + + # =================================================================== # Physical interface checks (callable independently by action.py) # ===================================================================