import copy import ipaddress from flask import Blueprint, request, redirect, flash from auth import require_level from config_utils import load_config, save_config_with_snapshot, verify_config_hash import sanitize import validation as validate bp = Blueprint('action_apply_dhcp_reservations', __name__) VIEW = '/view/view_dhcp' def _row_index(): try: return int(request.form.get('row_index', '')) except (ValueError, TypeError): return None def _hash_ok(): if not verify_config_hash(request.form.get('config_hash', '')): flash('Configuration was modified by another session. Please refresh and try again.', 'error') return False return True def _flat_index_to_vlan_res(vlans, flat_idx): pos = 0 for vi, vlan in enumerate(vlans): for ri in range(len(vlan.get('reservations', []))): if pos == flat_idx: return vi, ri pos += 1 return None, None def _parse_ip(): raw = request.form.get('ip', '').strip() if not raw: flash('The configuration has not been saved because an IP address is required.', 'error') return None ip = validate.ip(raw) if not ip: flash(f'The configuration has not been saved because "{raw}" is not a valid IP address.', 'error') return None return ip def _check_ip_conflicts(ip, vlan): dhcp = vlan.get('dhcp_information', {}) pool_start = dhcp.get('dynamic_pool_start') pool_end = dhcp.get('dynamic_pool_end') if pool_start and pool_end: try: if (ipaddress.IPv4Address(pool_start) <= ipaddress.IPv4Address(ip) <= ipaddress.IPv4Address(pool_end)): return f'{ip} falls within the dynamic pool range ({pool_start}-{pool_end}).' except Exception: pass identity_ips = {s['ip'] for s in vlan.get('server_identities', []) if s.get('ip')} if ip in identity_ips: return f'{ip} is already assigned as a server identity IP.' return None @bp.route('/action/add_dhcp_reservation', methods=['POST']) @require_level('administrator') def add_dhcp_reservation(): vlan_name = sanitize.name(request.form.get('vlan_name', '')) description = sanitize.text(request.form.get('description', '')) hostname = validate.domainname(request.form.get('hostname', '')) mac = sanitize.mac(request.form.get('mac', '')) ip = _parse_ip() radius_client = 'radius_client' in request.form if ip is None: return redirect(VIEW) if not vlan_name: flash('The configuration has not been saved because a VLAN is required.', 'error') return redirect(VIEW) if not mac: flash('The configuration has not been saved because a MAC address is required.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) cfg = load_config() vlans = cfg.get('vlans', []) vlan = next((v for v in vlans if v.get('name') == vlan_name), None) if vlan is None: flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error') return redirect(VIEW) conflict = _check_ip_conflicts(ip, vlan) if conflict: flash(f'The configuration has not been saved because {conflict}', 'error') return redirect(VIEW) entry = { 'description': description, 'hostname': hostname, 'mac': mac, 'ip': ip, 'radius_client': radius_client, 'enabled': True, } vlan.setdefault('reservations', []).append(entry) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) flash(save_config_with_snapshot( cfg, path=f'vlans.{vlan_name}.reservations', key=mac, operation='add', before=None, after=entry, description=f'Added DHCP reservation: {hostname or mac} ({ip})', ), 'success') return redirect(VIEW) @bp.route('/action/toggle_dhcp_reservation', methods=['POST']) @require_level('administrator') def toggle_dhcp_reservation(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) cfg = load_config() vlans = cfg.get('vlans', []) vi, ri = _flat_index_to_vlan_res(vlans, idx) if vi is None: flash('Entry not found.', 'error') return redirect(VIEW) res = vlans[vi]['reservations'][ri] old_enabled = res.get('enabled', True) res['enabled'] = not old_enabled errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) vlan_name = vlans[vi]['name'] action = 'Enabled' if not old_enabled else 'Disabled' flash(save_config_with_snapshot( cfg, path=f'vlans.{vlan_name}.reservations', key=res['mac'], operation='toggle', before={'enabled': old_enabled}, after={'enabled': not old_enabled}, description=f'{action} DHCP reservation: {res.get("hostname") or res["mac"]}', ), 'success') return redirect(VIEW) @bp.route('/action/edit_dhcp_reservation', methods=['POST']) @require_level('administrator') def edit_dhcp_reservation(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) description = sanitize.text(request.form.get('description', '')) hostname = validate.domainname(request.form.get('hostname', '')) mac = sanitize.mac(request.form.get('mac', '')) ip = _parse_ip() radius_client = 'radius_client' in request.form if ip is None: return redirect(VIEW) if not mac: flash('The configuration has not been saved because a MAC address is required.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) cfg = load_config() vlans = cfg.get('vlans', []) vi, ri = _flat_index_to_vlan_res(vlans, idx) if vi is None: flash('Entry not found.', 'error') return redirect(VIEW) conflict = _check_ip_conflicts(ip, vlans[vi]) if conflict: flash(f'The configuration has not been saved because {conflict}', 'error') return redirect(VIEW) res = vlans[vi]['reservations'][ri] before = copy.deepcopy(res) res.update({ 'description': description, 'hostname': hostname, 'mac': mac, 'ip': ip, 'radius_client': radius_client, 'enabled': 'enabled' in request.form, }) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) vlan_name = vlans[vi]['name'] flash(save_config_with_snapshot( cfg, path=f'vlans.{vlan_name}.reservations', key=mac, operation='edit', before=before, after=copy.deepcopy(res), description=f'Edited DHCP reservation: {hostname or mac} ({ip})', ), 'success') return redirect(VIEW) @bp.route('/action/delete_dhcp_reservation', methods=['POST']) @require_level('administrator') def delete_dhcp_reservation(): idx = _row_index() if idx is None: flash('Invalid request.', 'error') return redirect(VIEW) if not _hash_ok(): return redirect(VIEW) cfg = load_config() vlans = cfg.get('vlans', []) vi, ri = _flat_index_to_vlan_res(vlans, idx) if vi is None: flash('Entry not found.', 'error') return redirect(VIEW) vlan_name = vlans[vi]['name'] removed = vlans[vi]['reservations'].pop(ri) errors = validate.validate_config(cfg) if errors: for msg in errors: flash(msg, 'error') return redirect(VIEW) flash(save_config_with_snapshot( cfg, path=f'vlans.{vlan_name}.reservations', key=removed['mac'], operation='delete', before=removed, after=None, description=f'Deleted DHCP reservation: {removed.get("hostname") or removed["mac"]}', ), 'success') return redirect(VIEW)