/operstate.
+ Returns INVALID if the interface does not exist, otherwise UP/DOWN/UNKNOWN/etc."""
+ if not iface:
+ return 'INVALID'
+ safe = re.sub(r'[^A-Za-z0-9._-]', '', iface)
+ if not safe:
+ return 'INVALID'
+ try:
+ with open(f'/sys/class/net/{safe}/operstate') as f:
+ state = f.read().strip().upper()
+ return state if state else 'UP'
+ except OSError:
+ return 'INVALID'
+
+
+def _resolve_iface(vlan, core):
+ """Compute interface name from is_vpn + vlan_id + general.lan_interface."""
+ if vlan.get('is_vpn'):
+ wg_vlans = [v for v in core.get('vlans', []) if v.get('is_vpn')]
+ idx = next((i for i, v in enumerate(wg_vlans) if v is vlan), 0)
+ return f'wg{idx}'
+ lan = core.get('general', {}).get('lan_interface', 'eth0')
+ vid = vlan.get('vlan_id', 1)
+ return lan if vid == 1 else f'{lan}.{vid}'
+
+
# -- Live data loaders ---------------------------------------------------------
def _live_dhcp_leases():
@@ -91,11 +131,12 @@ def _live_dhcp_leases():
def _vlan_name_for_ip(ip):
import ipaddress
for vlan in _load_core().get('vlans', []):
- subnet = vlan.get('dhcp', {}).get('subnet', '')
+ subnet = vlan.get('subnet', '')
+ mask = vlan.get('subnet_mask', 24)
if not subnet:
continue
try:
- if ipaddress.ip_address(ip) in ipaddress.ip_network(subnet + '/24', strict=False):
+ if ipaddress.ip_address(ip) in ipaddress.ip_network(f'{subnet}/{mask}', strict=False):
return vlan.get('name', '-')
except Exception:
pass
@@ -139,6 +180,15 @@ def _config_datasource(name):
core = _load_core()
vlans = core.get('vlans', [])
+ if name == 'interfaces':
+ gen = core.get('general', {})
+ wan = gen.get('wan_interface', '')
+ lan = gen.get('lan_interface', '')
+ return [
+ {'iface_type': 'WAN', 'interface': wan, 'status': _iface_status(wan)},
+ {'iface_type': 'LAN', 'interface': lan, 'status': _iface_status(lan)},
+ ]
+
if name == 'banned_ips':
return core.get('banned_ips', [])
@@ -161,10 +211,11 @@ def _config_datasource(name):
return rows
if name == 'vlans':
+ core = _load_core()
rows = []
- for v in vlans:
- row = {k: v.get(k) for k in ('vlan_id', 'name', 'interface', 'dhcp', 'radius_default', 'mdns_reflection')}
- row['subnet'] = v.get('dhcp', {}).get('subnet', '')
+ for v in sorted(vlans, key=lambda x: x.get('vlan_id', 0)):
+ row = {k: v.get(k) for k in ('vlan_id', 'name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn')}
+ row['interface'] = _resolve_iface(v, core)
row['use_blocklists'] = json.dumps(v.get('use_blocklists', []))
rows.append(row)
return rows
@@ -208,6 +259,18 @@ def _config_datasource(name):
rows.append(row)
return rows
+ if name == 'vpn_peers':
+ wg_vlan = next((v for v in vlans if v.get('is_vpn')), None)
+ if not wg_vlan:
+ return []
+ rows = []
+ for peer in wg_vlan.get('peers', []):
+ row = dict(peer)
+ row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
+ row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
+ rows.append(row)
+ return rows
+
return []
@@ -369,7 +432,22 @@ def collect_tokens():
dns = core.get('upstream_dns', {})
vlans = core.get('vlans', [])
tokens['GENERAL_WAN_INTERFACE'] = str(gen.get('wan_interface', '-'))
+ tokens['GENERAL_LAN_INTERFACE'] = str(gen.get('lan_interface', '-'))
tokens['GENERAL_LOG_MAX_KB'] = str(gen.get('log_max_kb', '-'))
+
+ sys_ifaces = _get_system_interfaces()
+ # Always include currently-configured values so dropdowns are never blank.
+ for configured in [gen.get('wan_interface', ''), gen.get('lan_interface', '')]:
+ if configured and configured not in sys_ifaces:
+ sys_ifaces.append(configured)
+ sys_ifaces.sort()
+ tokens['NETWORK_INTERFACE_OPTIONS'] = json.dumps(
+ [{'value': i, 'label': i} for i in sys_ifaces]
+ )
+ tokens['NETWORK_INTERFACE_STATUS_OPTIONS'] = json.dumps(
+ [{'value': i, 'label': f'{i} — {_iface_status(i).title()}'} for i in sys_ifaces]
+ )
+
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if gen.get('log_errors_only') else 'false'
tokens['GENERAL_DNSMASQ_LOG_QUERIES'] = 'true' if gen.get('dnsmasq_log_queries') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(gen.get('daily_execute_time_24hr_local', '-'))
@@ -380,7 +458,7 @@ def collect_tokens():
tokens['DNS_UPSTREAM_SERVERS_JSON'] = json.dumps(servers)
tokens['OVERVIEW_UPSTREAM_SERVERS'] = ', '.join(servers) or '-'
- non_vpn_vlans = [v for v in vlans if 'dhcp' in v]
+ non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
vlan_names = [v.get('name', '') for v in vlans]
tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-'
tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans))
@@ -392,6 +470,10 @@ def collect_tokens():
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
+ tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn')))
+ tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
+ tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name', '') for v in vlans])
+ tokens['EXISTING_VLAN_INTERFACES_JSON'] = json.dumps([_resolve_iface(v, core) for v in vlans])
tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in core.get('banned_ips', []) if b.get('enabled', True)))
tokens['STAT_BLOCKLIST_COUNT'] = str(sum(1 for b in core.get('blocklists', []) if b.get('enabled', True)))
@@ -405,13 +487,28 @@ def collect_tokens():
{'value': 'duckdns', 'label': 'DuckDNS'},
])
- vpn = _vpn_info()
+ wg_vlan = next((v for v in vlans if v.get('is_vpn')), {})
+ vpn = wg_vlan.get('vpn_information', {})
overrides = vpn.get('explicit_overrides', {})
- tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
- tokens['VPN_GATEWAY'] = str(vpn.get('gateway', ''))
- tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
- tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_server', ''))
- tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
+ tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
+ tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
+ tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
+ tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_server', ''))
+ tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
+ # Compute gateway from server_identities (lowest last-octet), fallback to first subnet host
+ try:
+ import ipaddress as _ipaddress
+ ident_ips = [s['ip'] for s in wg_vlan.get('server_identities', []) if s.get('ip')]
+ if ident_ips:
+ default_gw = str(min((_ipaddress.IPv4Address(ip) for ip in ident_ips),
+ key=lambda x: x.packed[-1]))
+ else:
+ wg_net = _ipaddress.IPv4Network(
+ f"{wg_vlan['subnet']}/{wg_vlan['subnet_mask']}", strict=False)
+ default_gw = str(next(wg_net.hosts()))
+ tokens['VPN_GATEWAY'] = overrides.get('gateway') or default_gw
+ except Exception:
+ tokens['VPN_GATEWAY'] = ''
ip_str, sub_str, next_interval = _public_ip_info(ddns)
tokens['STAT_PUBLIC_IP'] = ip_str
@@ -441,6 +538,28 @@ def collect_tokens():
blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]
)
+ tokens['PROTOCOL_OPTIONS'] = json.dumps([
+ {'value': 'tcp', 'label': 'TCP'},
+ {'value': 'udp', 'label': 'UDP'},
+ {'value': 'both', 'label': 'TCP/UDP'},
+ ])
+
+ tokens['BLOCKLIST_FORMAT_OPTIONS'] = json.dumps([
+ {'value': 'hosts', 'label': 'hosts (hosts file format)'},
+ {'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
+ ])
+
+ tokens['BLOCKLIST_NAME_OPTIONS'] = json.dumps([
+ {'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
+ for bl in core.get('blocklists', [])
+ ])
+
+ tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
+ {'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
+ {'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
+ {'value': 'manager', 'label': 'Manager (full access including account management)'},
+ ])
+
return tokens
@@ -511,6 +630,9 @@ def _render_item(item, tokens, inherited_req=None):
if t == 'spacer':
return ''
+ if t == 'divider':
+ return '
'
+
if t in ('button_primary', 'button_secondary', 'button_danger', 'button_ghost'):
cls_map = {
'button_primary': 'btn-primary',
@@ -522,11 +644,12 @@ def _render_item(item, tokens, inherited_req=None):
extra = item.get('class', '')
if extra:
cls = f'{cls} {extra}'
- text = e(apply_tokens(item.get('text', ''), tokens))
- action = e(apply_tokens(item.get('action', '#'), tokens))
+ text = e(apply_tokens(item.get('text', ''), tokens))
+ action = e(apply_tokens(item.get('action', '#'), tokens))
+ disabled = ' disabled' if item.get('disabled') else ''
if item.get('method', '').lower() == 'post':
return (f'')
+ f'')
return f'{text}'
if t == 'button_cancel':
@@ -616,13 +739,46 @@ def _render_item(item, tokens, inherited_req=None):
if t == 'field':
return _render_field(item, tokens)
+ if t == 'field_row':
+ inner = render_items(item.get('items', []), tokens, req)
+ cols = item.get('cols', 2)
+ return f'{inner}
'
+
+ if t == 'subnet_row':
+ subnet_name = e(item.get('subnet_name', 'subnet'))
+ prefix_name = e(item.get('prefix_name', 'subnet_mask'))
+ subnet_val = apply_tokens(item.get('subnet_value', ''), tokens)
+ prefix_raw = apply_tokens(item.get('prefix_value', '24'), tokens)
+ subnet_ph = e(apply_tokens(item.get('subnet_placeholder', ''), tokens))
+ show_derived = item.get('show_derived_vlan_id', False)
+ try:
+ pf = max(1, min(30, int(prefix_raw)))
+ except (ValueError, TypeError):
+ pf = 24
+ dotted = _prefix_to_dotted(pf)
+
+ return (
+ f''
+ )
+
if t == 'editable_list':
return _render_editable_list(item, tokens)
if t == 'select':
- name = e(item.get('name', ''))
- options = apply_tokens(item.get('options', ''), tokens)
- return f''
+ name = e(item.get('name', ''))
+ options = apply_tokens(item.get('options', ''), tokens)
+ filter_col = item.get('filter_col', '')
+ extra = f' data-filter-col="{e(filter_col)}"' if filter_col else ''
+ return f''
if t == 'button_row':
inner = render_items(item.get('items', []), tokens, req)
@@ -642,12 +798,22 @@ def _render_field(item, tokens):
placeholder = e(apply_tokens(item.get('placeholder', ''), tokens))
hint = e(apply_tokens(item.get('hint', ''), tokens))
hint_html = f'{hint}
' if hint else ''
+ extra_cls = f' {e(item["class"])}' if item.get('class') else ''
+ readonly = ' readonly' if item.get('readonly') else ''
if input_type == 'hidden':
return f''
if input_type == 'checkbox':
- checked = 'checked' if value.lower() in ('true', '1', 'yes') else ''
+ checked = 'checked' if value.lower() in ('true', '1', 'yes') else ''
+ cb_label = item.get('checkbox_label')
+ if cb_label:
+ return (f''
+ f''
+ f'{hint_html}
')
return (f'')
+ dyn_hint = '' if item.get('readonly') else ''
return (f''
f'{hint_html}
')
+ f' placeholder="{placeholder}" class="form-input{extra_cls}"{readonly}>{hint_html}{dyn_hint}')
def _render_editable_list(item, tokens):
@@ -784,6 +951,10 @@ def _render_table(item, tokens, inherited_req=None):
action = e(apply_tokens(ra.get('action', '#'), tokens))
method = ra.get('method', 'post').lower()
if method == 'post':
+ disable_if = ra.get('disable_if')
+ if disable_if and row.get(disable_if.get('field')) == disable_if.get('value'):
+ btns += f''
+ continue
btns += (f'