from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_core, save_core, verify_core_hash, queued_msg import sanitize import validation as validate bp = Blueprint('action_apply_vlans', __name__) VIEW = '/view/view_vlans' def _row_index(): try: return int(request.form.get('row_index', '')) except (ValueError, TypeError): return None def _hash_ok(): if not verify_core_hash(request.form.get('config_hash', '')): flash('Configuration was modified by another session. Please refresh and try again.', 'error') return False return True @bp.route('/action/add_vlan', methods=['POST']) @require_level('administrator') def add_vlan(): name = sanitize.name(request.form.get('name', '')) is_vpn = 'is_vpn' in request.form subnet = sanitize.ip(request.form.get('subnet', '')) subnet_mask = sanitize.subnet_mask(request.form.get('subnet_mask', '')) radius_default = 'radius_default' in request.form mdns_reflection = 'mdns_reflection' in request.form use_blocklists = sanitize.filterlist(request.form.getlist('use_blocklists'), {b.get('name') for b in load_core().get('blocklists', [])}) if not name: flash('Name is required.', 'error') return redirect(VIEW) if not subnet: flash('Subnet IP is required.', 'error') return redirect(VIEW) if subnet_mask is None: flash('Invalid subnet prefix (must be 1-30).', 'error') return redirect(VIEW) vlan_id = validate.derive_vlan_id(subnet, subnet_mask) if vlan_id is None: flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.setdefault('vlans', []) if any(validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) == vlan_id for v in vlans): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) if radius_default and any(v.get('radius_default') for v in vlans): flash('Only one VLAN can be the RADIUS default.', 'error') return redirect(VIEW) entry = { 'name': name, 'is_vpn': is_vpn, 'subnet': subnet, 'subnet_mask': subnet_mask, 'use_blocklists': use_blocklists, 'radius_default': radius_default, 'mdns_reflection': mdns_reflection, } if is_vpn: entry['peers'] = [] else: entry['reservations'] = [] vlans.append(entry) errors = validate.validate_config(core) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(VIEW) @bp.route('/action/edit_vlan', methods=['POST']) @require_level('administrator') def edit_vlan(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) name = sanitize.name(request.form.get('name', '')) subnet = sanitize.ip(request.form.get('subnet', '')) radius_default = 'radius_default' in request.form mdns_reflection = 'mdns_reflection' in request.form use_blocklists = sanitize.filterlist(request.form.getlist('use_blocklists'), {b.get('name') for b in load_core().get('blocklists', [])}) # subnet_mask is only present when the column is visible (not all edit paths send it). # Validate if submitted; fall back to the stored value otherwise. subnet_mask_raw = request.form.get('subnet_mask') if subnet_mask_raw is not None: subnet_mask = sanitize.subnet_mask(subnet_mask_raw) if subnet_mask is None: flash('Invalid subnet prefix (must be 1-30).', 'error') return redirect(VIEW) else: subnet_mask = None # resolved below after loading core if not name: flash('Name is required.', 'error') return redirect(VIEW) if not subnet: flash('Subnet IP is required.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.get('vlans', []) if idx < 0 or idx >= len(vlans): flash('VLAN not found.', 'error') return redirect(VIEW) existing = vlans[idx] # is_vpn is never changed via edit -- toggling it would invalidate peers/reservations. is_vpn = existing.get('is_vpn', False) # Use submitted subnet_mask, or fall back to whatever is already stored. final_mask = subnet_mask if subnet_mask is not None else existing.get('subnet_mask', 24) vlan_id = validate.derive_vlan_id(subnet, final_mask) if vlan_id is None: flash('Cannot derive a valid VLAN ID (1-4094) from this subnet/prefix combination.', 'error') return redirect(VIEW) current_id = validate.derive_vlan_id(existing.get('subnet', ''), existing.get('subnet_mask', 24)) if current_id == 1 and vlan_id != 1: flash('VLAN 1 is the physical interface; change its subnet so the derived ID remains 1.', 'error') return redirect(VIEW) if vlan_id != current_id and any( validate.derive_vlan_id(v.get('subnet', ''), v.get('subnet_mask', 24)) == vlan_id for i, v in enumerate(vlans) if i != idx ): flash(f'VLAN {vlan_id} (derived from subnet) already exists.', 'error') return redirect(VIEW) if radius_default and any(i != idx and v.get('radius_default') for i, v in enumerate(vlans)): flash('Only one VLAN can be the RADIUS default.', 'error') return redirect(VIEW) existing.update({ 'name': name, 'is_vpn': is_vpn, 'subnet': subnet, 'subnet_mask': final_mask, 'radius_default': radius_default, 'mdns_reflection': mdns_reflection, 'use_blocklists': use_blocklists, }) errors = validate.validate_config(core) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(VIEW) @bp.route('/action/delete_vlan', methods=['POST']) @require_level('administrator') def delete_vlan(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) core = load_core() vlans = core.get('vlans', []) if idx < 0 or idx >= len(vlans): flash('VLAN not found.', 'error') return redirect(VIEW) removed = vlans.pop(idx) errors = validate.validate_config(core) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) save_core(core) flash(queued_msg('core apply'), 'success') return redirect(VIEW)