Development

This commit is contained in:
Matthew Grotke 2026-06-01 01:44:58 -04:00
parent 9c22b6f2fd
commit 6e610f888e
10 changed files with 526 additions and 102 deletions

View file

@ -14,6 +14,7 @@ from pages.accountlogin.action import bp as accountlogin_bp
from pages.networklayout.action import bp as networklayout_bp from pages.networklayout.action import bp as networklayout_bp
from pages.physicalinterfaces.action import bp as physicalinterfaces_bp from pages.physicalinterfaces.action import bp as physicalinterfaces_bp
from pages.portforwarding.action import bp as portforwarding_bp from pages.portforwarding.action import bp as portforwarding_bp
from pages.portwrangling.action import bp as portwrangling_bp
from pages.preferences.action import bp as preferences_bp from pages.preferences.action import bp as preferences_bp
from pages.accountverifyemail.action import bp as accountverifyemail_bp from pages.accountverifyemail.action import bp as accountverifyemail_bp
from pages.vpn.action import bp as vpn_bp from pages.vpn.action import bp as vpn_bp
@ -39,6 +40,7 @@ app.register_blueprint(accountlogin_bp)
app.register_blueprint(networklayout_bp) app.register_blueprint(networklayout_bp)
app.register_blueprint(physicalinterfaces_bp) app.register_blueprint(physicalinterfaces_bp)
app.register_blueprint(portforwarding_bp) app.register_blueprint(portforwarding_bp)
app.register_blueprint(portwrangling_bp)
app.register_blueprint(preferences_bp) app.register_blueprint(preferences_bp)
app.register_blueprint(accountverifyemail_bp) app.register_blueprint(accountverifyemail_bp)
app.register_blueprint(vpn_bp) app.register_blueprint(vpn_bp)

View file

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path
import copy import copy
import ipaddress
from flask import Blueprint, request, redirect, flash from flask import Blueprint, request, redirect, flash
from auth import require_level from auth import require_level
@ -36,6 +37,27 @@ def _parse_ip():
return ip return ip
def _check_ip_in_vlan_subnet(ip, vlan):
if not ip or ip == 'dynamic':
return None
subnet = vlan.get('subnet')
prefix = vlan.get('subnet_mask')
if not subnet or prefix is None:
return None
try:
network = ipaddress.IPv4Network(f'{subnet}/{prefix}', strict=False)
addr = ipaddress.IPv4Address(ip)
if addr == network.network_address:
return f'{ip} is the network address and cannot be assigned.'
if addr == network.broadcast_address:
return f'{ip} is the broadcast address and cannot be assigned.'
if addr not in network:
return f'{ip} is not within the {vlan["name"]} subnet ({subnet}/{prefix}).'
except ValueError:
return f'{ip} is not a valid IP address.'
return None
@bp.route('/action/dhcp/addreservation_add', methods=['POST']) @bp.route('/action/dhcp/addreservation_add', methods=['POST'])
@require_level('administrator') @require_level('administrator')
def addreservation_add(): def addreservation_add():
@ -64,6 +86,11 @@ def addreservation_add():
flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error') flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error')
return redirect(f'/{_PAGE}') return redirect(f'/{_PAGE}')
subnet_err = _check_ip_in_vlan_subnet(ip, vlan)
if subnet_err:
flash(f'The configuration has not been saved because {subnet_err}', 'error')
return redirect(f'/{_PAGE}')
conflict = validate.check_reservation_ip_conflicts(ip, vlan) conflict = validate.check_reservation_ip_conflicts(ip, vlan)
if conflict: if conflict:
flash(f'The configuration has not been saved because {conflict}', 'error') flash(f'The configuration has not been saved because {conflict}', 'error')
@ -153,6 +180,10 @@ def reservations_edit():
vlan_name = res.get('vlan', '') vlan_name = res.get('vlan', '')
vlan = next((v for v in cfg.get('vlans', []) if v.get('name') == vlan_name), None) vlan = next((v for v in cfg.get('vlans', []) if v.get('name') == vlan_name), None)
if vlan: if vlan:
subnet_err = _check_ip_in_vlan_subnet(ip, vlan)
if subnet_err:
flash(f'The configuration has not been saved because {subnet_err}', 'error')
return redirect(f'/{_PAGE}')
conflict = validate.check_reservation_ip_conflicts(ip, vlan) conflict = validate.check_reservation_ip_conflicts(ip, vlan)
if conflict: if conflict:
flash(f'The configuration has not been saved because {conflict}', 'error') flash(f'The configuration has not been saved because {conflict}', 'error')

View file

@ -117,7 +117,8 @@
}, },
{ {
"col": "ip", "col": "ip",
"input_type": "text" "input_type": "text",
"validate": "VALIDATION_ADDRESS"
}, },
{ {
"col": "radius_client", "col": "radius_client",

View file

@ -0,0 +1,189 @@
from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
from auth import require_level
from config_utils import load_config, record_group, diff_fields, verify_config_hash
import sanitize
import validation as validate
_PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
_VALID_PROTOS_STR = ', '.join(sorted(validate.VALID_PROTOCOLS))
def _row_index():
try:
return int(request.form.get('row_index', ''))
except (ValueError, TypeError):
return None
def _hash_ok():
if not verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
def _parse_entry():
description = sanitize.text(request.form.get('description', ''))
protocol = sanitize.filtervalue(request.form.get('protocol', ''), validate.VALID_PROTOCOLS)
dest_port_raw = request.form.get('dest_port', '').strip()
redirect_raw = request.form.get('redirect_to', '').strip()
if not protocol:
flash(f'The configuration has not been saved because the protocol is invalid. '
f'Accepted values: {_VALID_PROTOS_STR}.', 'error')
return None, True
if not dest_port_raw:
flash('The configuration has not been saved because the destination port is required.', 'error')
return None, True
dest_port = validate.port(dest_port_raw)
if not dest_port:
flash(f'The configuration has not been saved because "{dest_port_raw}" is not a valid port number (1-65535).', 'error')
return None, True
if not redirect_raw:
flash('The configuration has not been saved because the redirect IP address is required.', 'error')
return None, True
redirect_to = validate.ip(redirect_raw)
if not redirect_to:
flash(f'The configuration has not been saved because "{redirect_raw}" is not a valid IP address.', 'error')
return None, True
return {
'description': description,
'protocol': protocol,
'dest_port': dest_port,
'redirect_to': redirect_to,
'enabled': True,
}, None
@bp.route('/action/portwrangling/addrule_add', methods=['POST'])
@require_level('administrator')
def addrule_add():
vlan_name = sanitize.name(request.form.get('vlan_name', ''))
entry, err = _parse_entry()
if err:
return redirect(f'/{_PAGE}')
if not vlan_name:
flash('The configuration has not been saved because a VLAN is required.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
vlan = next((v for v in cfg.get('vlans', []) if v.get('name') == vlan_name), None)
if vlan is None:
flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error')
return redirect(f'/{_PAGE}')
entry['vlan'] = vlan_name
cfg.setdefault('port_wrangling', []).append(entry)
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(None, entry)
flash(record_group(cfg, 'port_wrangling', 'dest_port', entry['dest_port'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_toggle', methods=['POST'])
@require_level('administrator')
def rules_toggle():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
old_enabled = items[idx].get('enabled', True)
before = copy.deepcopy(items[idx])
items[idx]['enabled'] = not old_enabled
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(before, items[idx])
flash(record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_edit', methods=['POST'])
@require_level('administrator')
def rules_edit():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
entry, err = _parse_entry()
if err:
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
before = copy.deepcopy(items[idx])
entry['vlan'] = items[idx].get('vlan', '')
entry['enabled'] = request.form.get('enabled') == 'on'
items[idx] = entry
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(before, items[idx])
flash(record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_delete', methods=['POST'])
@require_level('administrator')
def rules_delete():
idx = _row_index()
if idx is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
if not _hash_ok():
return redirect(f'/{_PAGE}')
cfg = load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
return redirect(f'/{_PAGE}')
removed = items.pop(idx)
errors = validate.validate_config(cfg)
if errors:
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
changes = diff_fields(removed, None)
flash(record_group(cfg, 'port_wrangling', 'dest_port', removed.get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')

View file

@ -0,0 +1,178 @@
{
"client_requirement": "client_is_viewer+",
"items": [
{
"type": "header_page_title",
"items": [
{
"type": "h1",
"text": "Port Wrangling"
},
{
"type": "p",
"text": "DNAT rules that redirect traffic on a given port to a local host, regardless of the original destination."
}
]
},
{
"type": "table",
"datasource": "config:port_wrangling",
"empty_message": "No port wrangling rules configured.",
"toolbar": [
{
"type": "select",
"name": "vlan_filter",
"value": "all",
"filter_col": "vlan_name",
"options_html": "%VLAN_FILTER_OPTIONS%"
}
],
"columns": [
{
"label": "Description",
"field": "description"
},
{
"label": "VLAN",
"field": "vlan_name"
},
{
"label": "Protocol",
"field": "protocol",
"class": "col-mono col-narrow"
},
{
"label": "Dest Port",
"field": "dest_port",
"class": "col-mono col-narrow"
},
{
"label": "Redirect To",
"field": "redirect_to",
"class": "col-mono"
},
{
"label": "Status",
"field": "enabled",
"render": "badge_enabled_disabled"
}
],
"row_actions": [
{
"client_requirement": "client_is_administrator+",
"action": "/action/portwrangling/rules_edit",
"method": "inline_edit",
"text": "Edit",
"class": "btn-ghost btn-sm",
"fields": [
{
"col": "description",
"input_type": "text"
},
{
"col": "protocol",
"input_type": "select",
"options": "%PROTOCOL_OPTIONS%"
},
{
"col": "dest_port",
"input_type": "number"
},
{
"col": "redirect_to",
"input_type": "text"
},
{
"col": "enabled",
"input_type": "checkbox",
"checkbox_label": "Enabled"
}
]
},
{
"client_requirement": "client_is_administrator+",
"action": "/action/portwrangling/rules_delete",
"method": "post",
"text": "Delete",
"class": "btn-danger btn-sm"
}
]
},
{
"type": "card",
"id": "add-form",
"label": "Add Rule",
"client_requirement": "client_is_administrator+",
"items": [
{
"type": "form",
"action": "/action/portwrangling/addrule_add",
"method": "post",
"items": [
{
"type": "field",
"label": "VLAN",
"name": "vlan_name",
"input_type": "select",
"options": "%VLAN_NAMES_AS_OPTIONS%",
"required": true
},
{
"type": "field",
"label": "Description",
"name": "description",
"input_type": "text",
"placeholder": "e.g. DNS wrangling"
},
{
"type": "field_row",
"cols": 3,
"items": [
{
"type": "field",
"label": "Protocol",
"name": "protocol",
"input_type": "select",
"options": "%PROTOCOL_OPTIONS%"
},
{
"type": "field",
"label": "Dest Port",
"name": "dest_port",
"input_type": "number",
"validate": "VALIDATION_PORT",
"min": 1,
"max": 65535,
"placeholder": "e.g. 53"
},
{
"type": "field",
"label": "Redirect To",
"name": "redirect_to",
"input_type": "text",
"validate": "VALIDATION_IPV4_FORMAT",
"placeholder": "e.g. 192.168.1.1"
}
]
},
{
"type": "button_row",
"items": [
{
"type": "button_primary",
"action": "/action/portwrangling/addrule_add",
"method": "post",
"text": "Add Rule"
},
{
"type": "button_cancel",
"text": "Cancel"
}
]
}
]
}
]
}
]
}

View file

@ -332,6 +332,14 @@ def config_datasource(name):
if name == 'port_forwarding': if name == 'port_forwarding':
return cfg.get('port_forwarding', []) return cfg.get('port_forwarding', [])
if name == 'port_wrangling':
rows = []
for r in cfg.get('port_wrangling', []):
row = dict(r)
row['vlan_name'] = r.get('vlan', '-')
rows.append(row)
return rows
if name == 'dhcp_reservations': if name == 'dhcp_reservations':
rows = [] rows = []
for res in cfg.get('dhcp_reservations', []): for res in cfg.get('dhcp_reservations', []):
@ -799,6 +807,10 @@ def collect_tokens():
_res_hosts_by_vlan[_vn] = [r['hostname'] for r in _vlan_res if r.get('hostname')] _res_hosts_by_vlan[_vn] = [r['hostname'] for r in _vlan_res if r.get('hostname')]
tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(_res_ips_by_vlan) tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(_res_ips_by_vlan)
tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(_res_hosts_by_vlan) tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(_res_hosts_by_vlan)
tokens['VLAN_SUBNET_INFO_JSON'] = json.dumps({
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
})
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans]) tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name') for v in vlans]) tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name') for v in vlans])
_dv = next((v for v in vlans if v.get('radius_default')), None) _dv = next((v for v in vlans if v.get('radius_default')), None)

View file

@ -316,23 +316,7 @@
"dns_servers": "", "dns_servers": "",
"ntp_servers": "" "ntp_servers": ""
} }
}, }
"port_wrangling": [
{
"description": "DNS wrangling - redirect Trusted DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.1.1"
},
{
"description": "NTP wrangling - redirect Trusted NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.1.1"
}
]
}, },
{ {
"vlan_id": 10, "vlan_id": 10,
@ -363,23 +347,7 @@
"dns_servers": "", "dns_servers": "",
"ntp_servers": "" "ntp_servers": ""
} }
}, }
"port_wrangling": [
{
"description": "DNS wrangling - redirect IoT DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.10.1"
},
{
"description": "NTP wrangling - redirect IoT NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.10.1"
}
]
}, },
{ {
"vlan_id": 20, "vlan_id": 20,
@ -410,23 +378,7 @@
"dns_servers": "", "dns_servers": "",
"ntp_servers": "" "ntp_servers": ""
} }
}, }
"port_wrangling": [
{
"description": "DNS wrangling - redirect Guest DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.20.1"
},
{
"description": "NTP wrangling - redirect Guest NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.20.1"
}
]
}, },
{ {
"vlan_id": 30, "vlan_id": 30,
@ -458,23 +410,7 @@
"dns_servers": "", "dns_servers": "",
"ntp_servers": "" "ntp_servers": ""
} }
}, }
"port_wrangling": [
{
"description": "DNS wrangling - redirect Kids DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.30.1"
},
{
"description": "NTP wrangling - redirect Kids NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.30.1"
}
]
}, },
{ {
"vlan_id": 40, "vlan_id": 40,
@ -505,23 +441,7 @@
"mtu": "" "mtu": ""
} }
}, },
"peers": [], "peers": []
"port_wrangling": [
{
"description": "DNS wrangling - redirect VPN DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.40.1"
},
{
"description": "NTP wrangling - redirect VPN NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.40.1"
}
]
} }
], ],
"ddns": { "ddns": {
@ -826,5 +746,87 @@
"ip": "dynamic", "ip": "dynamic",
"vlan": "kids" "vlan": "kids"
} }
],
"port_wrangling": [
{
"description": "DNS wrangling - redirect Trusted DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.1.1",
"vlan": "trusted"
},
{
"description": "NTP wrangling - redirect Trusted NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.1.1",
"vlan": "trusted"
},
{
"description": "DNS wrangling - redirect IoT DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.10.1",
"vlan": "iot"
},
{
"description": "NTP wrangling - redirect IoT NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.10.1",
"vlan": "iot"
},
{
"description": "DNS wrangling - redirect Guest DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.20.1",
"vlan": "guest"
},
{
"description": "NTP wrangling - redirect Guest NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.20.1",
"vlan": "guest"
},
{
"description": "DNS wrangling - redirect Kids DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.30.1",
"vlan": "kids"
},
{
"description": "NTP wrangling - redirect Kids NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.30.1",
"vlan": "kids"
},
{
"description": "DNS wrangling - redirect VPN DNS to local resolver",
"enabled": true,
"protocol": "both",
"dest_port": 53,
"redirect_to": "192.168.40.1",
"vlan": "vpn"
},
{
"description": "NTP wrangling - redirect VPN NTP to local time server",
"enabled": false,
"protocol": "udp",
"dest_port": 123,
"redirect_to": "192.168.40.1",
"vlan": "vpn"
}
] ]
} }

View file

@ -1370,7 +1370,10 @@ def build_nft_config(data, dry_run=False):
vlans = [v for v in data["vlans"] vlans = [v for v in data["vlans"]
if not is_wg(v) or dry_run or wg_interface_up(derive_interface(v, data))] if not is_wg(v) or dry_run or wg_interface_up(derive_interface(v, data))]
all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_wrngl = [(v, r) for v in vlans for r in rule_enabled(v.get("port_wrangling", []))] _wrngl_vlan_by_name = {v["name"]: v for v in vlans}
all_wrngl = [(_wrngl_vlan_by_name[r["vlan"]], r)
for r in rule_enabled(data.get("port_wrangling", []))
if r.get("vlan") in _wrngl_vlan_by_name]
# Interfaces that are active (WG interfaces only included if up) # Interfaces that are active (WG interfaces only included if up)
active_ifaces = {derive_interface(v, data) for v in vlans} active_ifaces = {derive_interface(v, data) for v in vlans}
@ -1675,8 +1678,11 @@ def apply_nftables(data, dry_run=False):
all_fwd = list(rule_enabled(data.get("port_forwarding", []))) all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_dis_fwd = list(rule_disabled(data.get("port_forwarding", []))) all_dis_fwd = list(rule_disabled(data.get("port_forwarding", [])))
all_wrngl = [(v, r) for v in active_vlans for r in rule_enabled(v.get("port_wrangling", []))] _active_vlan_by_name = {v["name"]: v for v in active_vlans}
all_dis_wrngl = [(v, r) for v in data["vlans"] for r in rule_disabled(v.get("port_wrangling", []))] all_wrngl = [(_active_vlan_by_name[r["vlan"]], r)
for r in rule_enabled(data.get("port_wrangling", []))
if r.get("vlan") in _active_vlan_by_name]
all_dis_wrngl = rule_disabled(data.get("port_wrangling", []))
all_except = rule_enabled(data.get("inter_vlan_exceptions", [])) all_except = rule_enabled(data.get("inter_vlan_exceptions", []))
print(f"Applying {len(all_fwd)} port forwarding rule(s), {len(all_dis_fwd)} skipped.") print(f"Applying {len(all_fwd)} port forwarding rule(s), {len(all_dis_fwd)} skipped.")

View file

@ -807,19 +807,22 @@ def validate_config(data):
if ip and ip not in network: if ip and ip not in network:
errors.append(f"{label}: '{ip_str}' is not within subnet {network}.") errors.append(f"{label}: '{ip_str}' is not within subnet {network}.")
for vlan, iface in zip(data.get("vlans", []), vlan_ifaces): # port_wrangling validation (top-level) =========================
name = vlan.get("name", "?") _vlan_name_to_net = {
net = vlan_networks.get(iface) v.get("name", ""): vlan_networks.get(iface)
for v, iface in zip(data.get("vlans", []), vlan_ifaces)
for r in vlan.get("port_wrangling", []): }
desc = r.get("description", "?") for idx, r in enumerate(data.get("port_wrangling", [])):
label = f"vlan '{name}' port_wrangling '{desc}'" desc = r.get("description", "?")
if r.get("protocol") not in valid_protos: vlan_name = r.get("vlan", "?")
errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. " label = f"port_wrangling[{idx}] (vlan '{vlan_name}') '{desc}'"
f"Must be tcp, udp, or both.") if r.get("protocol") not in valid_protos:
nat_check_port(f"{label} dest_port", r.get("dest_port")) errors.append(f"{label}: invalid protocol '{r.get('protocol')}'. "
if net: f"Must be tcp, udp, or both.")
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net) nat_check_port(f"{label} dest_port", r.get("dest_port"))
net = _vlan_name_to_net.get(vlan_name)
if net:
nat_check_ip_in_network(f"{label} redirect_to", r.get("redirect_to", ""), net)
# port_forwarding validation (top-level) ======================== # port_forwarding validation (top-level) ========================
for idx, r in enumerate(data.get("port_forwarding", [])): for idx, r in enumerate(data.get("port_forwarding", [])):