linuxrouter/docker/routlin-dash/app/view_page.py
2026-05-27 23:52:47 -04:00

2199 lines
93 KiB
Python

from flask import Blueprint, session, redirect, get_flashed_messages
from markupsafe import Markup
import json, re, subprocess, os, sys, html as html_mod
import sanitize
import validation as validate
from datetime import datetime, timezone
from config_utils import config_hash, get_pending_entries, get_dashboard_pending, get_dashboard_done, load_snapshot_for_uuid, load_all_snapshots, get_done_timestamps, queue_command, _find_cmd_in_queues, _entry_ts_from_queue, _apply_changes_immediately, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME, CONFIGS_DIR, DATA_DIR, WWW_DIR, ACCOUNTS_FILE, APP_DIR
import os as _os
_PAGES_DIR = _os.path.join(APP_DIR, 'pages')
bp = Blueprint('view_page', __name__)
LEVEL_RANK = {'nothing': 0, 'viewer': 1, 'administrator': 2, 'manager': 3}
# Access level ======================================================
def _client_level():
return LEVEL_RANK.get(session.get('access_level', 'nothing'), 0)
def _passes(req, level):
if not req:
return False
for suffix, check in (('+', lambda n, l: l >= n),
('-', lambda n, l: l <= n),
('=', lambda n, l: l == n)):
if req.endswith(suffix):
role = req[:-1].replace('client_is_', '', 1)
needed = LEVEL_RANK.get(role)
if needed is None:
print(f'[view_page] WARNING: unknown role "{role}" in client_requirement "{req}"', file=sys.stderr)
return False
return check(needed, level)
print(f'[view_page] WARNING: client_requirement "{req}" has no valid suffix (+, -, =)', file=sys.stderr)
return False
# File loaders ======================================================
def _load_json(path):
try:
with open(path) as f:
return json.load(f)
except Exception as ex:
print(f'[view_page] ERROR loading {path}: {ex}', file=sys.stderr)
return {}
def _load_config(): return _load_json(f'{CONFIGS_DIR}/config.json')
def _load_ddns(): return _load_config().get('ddns', {})
def _load_accounts(): return _load_json(ACCOUNTS_FILE)
def _load_css():
try:
with open(f'{DATA_DIR}/page_styles.css') as f:
return f.read()
except Exception:
return ''
def _load_icon(name):
try:
with open(f'{WWW_DIR}/icons/{name}.svg') as f:
return f.read().strip()
except Exception:
return ''
# Shell helper ======================================================
def _run(cmd):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return r.stdout.strip()
except Exception:
return ''
def _prefix_to_dotted(n):
mask = (0xFFFFFFFF << (32 - n)) & 0xFFFFFFFF
return '.'.join(str((mask >> (8 * i)) & 0xFF) for i in (3, 2, 1, 0))
def _get_system_interfaces():
_EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth',
'tun', 'tap', 'ppp', 'virbr',
'podman', 'vnet', 'macvtap', 'fc-')
try:
return sorted(
n for n in os.listdir('/sys/class/net')
if not n.startswith(_EXCLUDE_PREFIXES)
and os.path.exists(f'/sys/class/net/{n}/device')
)
except Exception:
return []
def _iface_info(iface):
base = f'/sys/class/net/{iface}'
def _rd(path):
try:
with open(f'{base}/{path}') as f:
return f.read().strip()
except Exception:
return None
wireless = os.path.isdir(f'{base}/wireless')
state = (_rd('operstate') or 'unknown').upper()
if state == 'UNKNOWN':
state = 'UP'
carrier_raw = _rd('carrier')
carrier = (carrier_raw == '1') if carrier_raw is not None else None
speed_raw = _rd('speed')
try:
mbps = int(speed_raw)
if mbps <= 0:
speed = None
elif mbps >= 1000 and mbps % 1000 == 0:
speed = f'{mbps // 1000} Gbps'
else:
speed = f'{mbps} Mbps'
except (TypeError, ValueError):
speed = None
mac = _rd('address')
perm_mac = _rd('perm_address')
if perm_mac and perm_mac == '00:00:00:00:00:00':
perm_mac = None
# DEBUG
# if not perm_mac: perm_mac = 'de:ad:be:ef:f0:0d'
def _int(val):
try: return int(val) if val else None
except ValueError: return None
return {
'name': iface,
'wireless': wireless,
'state': state,
'carrier': carrier,
'speed': speed,
'mtu': _rd('mtu'),
'min_mtu': _int(_rd('min_mtu')),
'max_mtu': _int(_rd('max_mtu')),
'mac': mac,
'perm_mac': perm_mac,
}
def _iface_status(iface):
"""Return link state for iface by reading /sys/class/net/<iface>/operstate.
Returns INVALID if the interface does not exist, otherwise UP/DOWN/UNKNOWN/etc."""
if not iface:
return 'INVALID'
safe = re.sub(r'[^A-Za-z0-9._-]', '', iface)
if not safe:
return 'INVALID'
try:
with open(f'/sys/class/net/{safe}/operstate') as f:
state = f.read().strip().upper()
return state if state else 'UP'
except OSError:
return 'INVALID'
def _resolve_iface(vlan, cfg):
"""Compute interface name from is_vpn + stored vlan_id + general.lan_interface."""
if vlan.get('is_vpn'):
wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')]
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
return f'wg{idx}'
lan = cfg.get('network_interfaces', {}).get('lan_interface', 'eth0')
vid = vlan.get('vlan_id') or 1
return lan if vid == 1 else f'{lan}.{vid}'
# Live data loaders =================================================
def _live_dhcp_leases():
rows = []
leases_file = '/var/lib/misc/dnsmasq.leases'
try:
with open(leases_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 4:
rows.append({
'hostname': parts[3] if parts[3] != '*' else '-',
'ip_address': parts[2],
'mac_address': parts[1],
'vlan_name': _vlan_name_for_ip(parts[2]),
'expires': _fmt_timestamp(int(parts[0])),
})
except Exception:
pass
return rows
def _vlan_name_for_ip(ip):
import ipaddress
for vlan in _load_config().get('vlans', []):
subnet = vlan.get('subnet', '')
mask = vlan.get('subnet_mask', 24)
if not subnet:
continue
try:
if ipaddress.ip_address(ip) in ipaddress.ip_network(f'{subnet}/{mask}', strict=False):
return vlan.get('name', '-')
except Exception:
pass
return '-'
def _fmt_timestamp(ts):
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
except Exception:
return '-'
def _relative_time(ts):
try:
diff = int(datetime.now(tz=timezone.utc).timestamp()) - int(ts)
if diff < 60:
n = max(0, diff)
return f'{n} second{"s" if n != 1 else ""} ago'
m = diff // 60
if m < 60:
return f'{m} minute{"s" if m != 1 else ""} ago'
h = m // 60
if h < 24:
return f'{h} hour{"s" if h != 1 else ""} ago'
d = h // 24
if d < 365:
return f'{d} day{"s" if d != 1 else ""} ago'
y = d // 365
return f'{y} year{"s" if y != 1 else ""} ago'
except Exception:
return ''
def _live_vpn_sessions():
rows = []
out = _run('wg show all dump 2>/dev/null')
for line in out.splitlines():
parts = line.split('\t')
if len(parts) == 9:
interface, _pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
rows.append({
'peer_name': _pubkey[:16] + '...',
'interface': interface,
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
'endpoint': endpoint if endpoint != '(none)' else '-',
'last_handshake': _fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
'rx_bytes': _fmt_bytes(int(rx)) if rx.isdigit() else '-',
'tx_bytes': _fmt_bytes(int(tx)) if tx.isdigit() else '-',
})
return rows
def _fmt_bytes(n):
for unit in ('B', 'KB', 'MB', 'GB'):
if n < 1024:
return f'{n:.1f} {unit}'
n /= 1024
return f'{n:.1f} TB'
# Config data loaders ===============================================
def _config_datasource(name):
cfg = _load_config()
vlans = cfg.get('vlans', [])
if name == 'interfaces':
gen = cfg.get('network_interfaces', {})
wan = gen.get('wan_interface', '')
lan = gen.get('lan_interface', '')
return [
{'iface_type': 'WAN', 'interface': wan, 'status': _iface_status(wan)},
{'iface_type': 'LAN', 'interface': lan, 'status': _iface_status(lan)},
]
if name == 'banned_ips':
return cfg.get('banned_ips', [])
if name == 'host_overrides':
return cfg.get('host_overrides', [])
if name == 'blocklists':
rows = []
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
row = dict(bl)
bl_path = f'{CONFIGS_DIR}/blocklists/{bl.get("save_as", "")}'
try:
with open(bl_path) as f:
row['domain_count'] = str(sum(1 for _ in f))
row['last_updated'] = _fmt_timestamp(int(os.path.getmtime(bl_path)))
except Exception:
row['domain_count'] = '-'
row['last_updated'] = '-'
rows.append(row)
return rows
if name == 'vlans':
bl_desc = {b['name']: b.get('description', b['name']) for b in cfg.get('dns_blocking', {}).get('blocklists', []) if 'name' in b}
rows = []
for v in sorted(vlans, key=lambda x: x.get('vlan_id') or 0):
row = {k: v.get(k) for k in ('name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn', 'dnsmasq_log_queries')}
row['vlan_id'] = v.get('vlan_id')
row['interface'] = _resolve_iface(v, cfg)
row['use_blocklists'] = json.dumps([
{'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', [])
])
_prefix = v.get('subnet_mask', 24)
_n_octets = 1 if _prefix >= 24 else 2 if _prefix >= 16 else 3 if _prefix >= 8 else 4
row['server_identity_ips'] = json.dumps([
{
'n': s['ip'],
'd': ' | '.join(filter(None, [s['ip'], s.get('description'), s.get('hostname')])),
'short': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
'mini': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
}
for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_descriptions'] = json.dumps([
s.get('description', '') for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_hostnames'] = json.dumps([
s.get('hostname', '') for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_gateway'] = (
v.get('dhcp_information', {}).get('explicit_overrides', {}).get('gateway', '')
)
_dns = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('dns_server', [])
row['server_identity_dns_server'] = '\n'.join(_dns) if isinstance(_dns, list) else str(_dns or '')
_ntp = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('ntp_server', [])
row['server_identity_ntp_server'] = '\n'.join(_ntp) if isinstance(_ntp, list) else str(_ntp or '')
rows.append(row)
return rows
if name == 'inter_vlan_exceptions':
return cfg.get('inter_vlan_exceptions', [])
if name == 'port_forwarding':
return cfg.get('port_forwarding', [])
if name == 'dhcp_reservations':
rows = []
for vlan in vlans:
for res in vlan.get('reservations', []):
row = dict(res)
row['vlan_name'] = vlan.get('name', '-')
rows.append(row)
return rows
if name == 'ddns_providers':
ddns = _load_ddns()
rows = []
for p in ddns.get('providers', []):
row = dict(p)
ptype = p.get('provider', '').lower()
if ptype == 'noip':
row['credentials'] = (
'<div style="line-height:1.3">'
f'<b>U:</b> {e(p.get("username", "-"))}<br/>'
'<b>P:</b> &bull;&bull;&bull;&bull;&bull;&bull;</div>'
)
elif ptype in ('cloudflare', 'duckdns'):
tok = p.get('api_token', '')
row['credentials'] = f'<b>API Token:</b> {e(tok[:20])}...' if tok else '(not set)'
else:
row['credentials'] = '-'
row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', [])))
rows.append(row)
return rows
if name == 'accounts':
rows = []
for acct in _load_accounts().get('accounts', []):
row = dict(acct)
row['account_status'] = 'active' if acct.get('hashed_password') else 'pending'
rows.append(row)
return rows
if name == 'vpn_peers':
rows = []
_wg_sorted = sorted(
[v for v in vlans if v.get('is_vpn')],
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
)
for i, vlan in enumerate(_wg_sorted):
iface = f'wg{i}'
vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})'
for peer in vlan.get('peers', []):
row = dict(peer)
row['vlan_display'] = vlan_display
row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
rows.append(row)
return rows
return []
def _load_datasource(spec):
if spec.startswith('live:'):
name = spec[5:]
if name == 'dhcp_leases': return _live_dhcp_leases()
if name == 'vpn_sessions': return _live_vpn_sessions()
return []
if spec.startswith('config:'):
return _config_datasource(spec[7:])
return []
# Live stat helpers =================================================
def _get_dnsmasq_stats():
stats = {'queries': '-', 'hits': '-', 'hit_rate': '-',
'forwarded': '-', 'auth': '-', 'tcp_peak': '-'}
out = _run('journalctl -u dnsmasq -n 200 --no-pager 2>/dev/null')
for line in reversed(out.splitlines()):
if 'queries forwarded' in line:
m = re.search(r'queries forwarded (\d+)', line)
if m: stats['forwarded'] = m.group(1)
m = re.search(r'queries answered locally (\d+)', line)
if m: stats['hits'] = m.group(1)
fwd = int(stats['forwarded']) if stats['forwarded'] != '-' else 0
hit = int(stats['hits']) if stats['hits'] != '-' else 0
total = fwd + hit
stats['queries'] = str(total) if total else '-'
if total > 0:
stats['hit_rate'] = f'{hit / total * 100:.0f}%'
break
if 'auth answered' in line:
m = re.search(r'auth answered (\d+)', line)
if m and stats['auth'] == '-':
stats['auth'] = m.group(1)
if 'max TCP connections' in line:
m = re.search(r'max TCP connections (\d+)', line)
if m and stats['tcp_peak'] == '-':
stats['tcp_peak'] = m.group(1)
return stats
def _count_blocked_today():
out = _run("journalctl -u dnsmasq --since today --no-pager 2>/dev/null | grep -c 'is NXDOMAIN'")
return out or '0'
def _count_blocked_domains():
bl_dir = f'{CONFIGS_DIR}/blocklists'
try:
total = sum(
int(_run(f'wc -l < "{bl_dir}/{f}"') or 0)
for f in os.listdir(bl_dir) if f.endswith('.con')
)
return str(total)
except Exception:
return '-'
def _bl_last_update():
bl_dir = f'{CONFIGS_DIR}/blocklists'
try:
mtime = max(
os.path.getmtime(f'{bl_dir}/{f}')
for f in os.listdir(bl_dir) if f.endswith('.con')
)
return _fmt_timestamp(int(mtime))
except Exception:
return '-'
def _blocklist_stats_html(cfg):
bl_dir = f'{CONFIGS_DIR}/blocklists'
rows = ''
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
name = e(bl.get('name', ''))
save_as = bl.get('save_as', '')
bl_path = f'{bl_dir}/{save_as}' if save_as else ''
try:
with open(bl_path) as f:
entries = sum(1 for _ in f)
mtime = int(os.path.getmtime(bl_path))
size_str = _fmt_bytes(os.path.getsize(bl_path))
last_refreshed = f'{datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M")} ({_relative_time(mtime)})'
except Exception:
entries, size_str, last_refreshed = '-', '-', 'Never'
rows += (
'<tr>'
f'<td class="table-cell">{name}</td>'
f'<td class="table-cell">{entries}</td>'
f'<td class="table-cell">{size_str}</td>'
f'<td class="table-cell">{e(last_refreshed)}</td>'
'</tr>'
)
if not rows:
return ''
return (
'<table class="data-table">'
'<thead><tr>'
'<th class="table-header">Blocklist</th>'
'<th class="table-header">Entries</th>'
'<th class="table-header">Size</th>'
'<th class="table-header">Last Refreshed</th>'
'</tr></thead>'
f'<tbody>{rows}</tbody>'
'</table>'
)
DDNS_LOG_MAX = 50
def _ddns_log_tail():
log_path = f'{CONFIGS_DIR}/ddns.log'
try:
log_max_kb = _load_ddns().get('general', {}).get('log_max_kb', 1024)
size_kb = os.path.getsize(log_path) / 1024
with open(log_path) as f:
lines = f.readlines()
if not lines:
return '(log is empty)', ''
total = len(lines)
tail = lines[-DDNS_LOG_MAX:]
shown = len(tail)
hidden = total - shown
pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0
left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines'
right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)'
summary = (
'<div class="text-muted" style="display:flex;justify-content:space-between;margin-top:0.5em;">'
f'<span>{left}</span><span>{right}</span></div>'
)
return ''.join(tail).strip(), summary
except FileNotFoundError:
return '(log file not found)', ''
except Exception:
return '(error reading log)', ''
def _fmt_seconds(secs):
secs = int(secs)
if secs < 60:
return f'{secs}s'
m, s = divmod(secs, 60)
if m < 60:
return f'{m}m {s}s' if s else f'{m}m'
h, m = divmod(m, 60)
return f'{h}h {m}m' if m else f'{h}h'
def _parse_interval_to_seconds(s):
m = re.match(r'^(\d+)([mhd])$', str(s).strip())
if not m:
return None
val, unit = int(m.group(1)), m.group(2)
return val * {'m': 60, 'h': 3600, 'd': 86400}[unit]
def _parse_time_remaining(text):
for line in text.splitlines():
if 'Trigger:' in line:
total, found = 0, False
for amt, unit in re.findall(r'(\d+)\s*(day|h|min|s)\b', line):
total += int(amt) * {'day': 86400, 'h': 3600, 'min': 60, 's': 1}[unit]
found = True
if found:
return total
return None
def _read_cached_ip():
"""Return (ip, mtime) from the most recent .ddns-last-ip-* file, or ('', None)."""
try:
best_ip, best_mtime = '', 0.0
for fname in os.listdir(CONFIGS_DIR):
if fname.startswith('.ddns-last-ip-'):
path = f'{CONFIGS_DIR}/{fname}'
mtime = os.path.getmtime(path)
if mtime > best_mtime:
ip = open(path).read().strip()
if ip:
best_ip, best_mtime = ip, mtime
return best_ip, (best_mtime if best_ip else None)
except Exception:
return '', None
def _public_ip_info(ddns_cfg):
"""Return (ip_str, domains_sub, next_interval_str, last_obtained_str) for stat cards."""
enabled_p = [p for p in ddns_cfg.get('providers', []) if p.get('enabled', True)]
all_hosts = []
for p in enabled_p:
all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
domains_sub = ', '.join(all_hosts)
ip, mtime = _read_cached_ip()
last_obtained = f'Obtained: {_relative_time(mtime)}' if mtime else ''
if ip:
return ip, domains_sub, '-', last_obtained
return 'Offline', domains_sub, '-', ''
def _ddns_last_checked():
try:
mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
return f'Last checked: {_relative_time(mtime)}'
except OSError:
return 'Last checked: ---'
def _vpn_info():
for vlan in _load_config().get('vlans', []):
if 'vpn_information' in vlan:
return vlan['vpn_information']
return {}
# Token collection ==================================================
def collect_tokens():
tokens = {}
cfg = _load_config()
net = cfg.get('network_interfaces', {})
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
dns = cfg.get('upstream_dns', {})
vlans = cfg.get('vlans', [])
tokens['GENERAL_WAN_INTERFACE'] = str(net.get('wan_interface', '-'))
tokens['GENERAL_LAN_INTERFACE'] = str(net.get('lan_interface', '-'))
tokens['GENERAL_WAN_STATUS'] = _iface_status(net.get('wan_interface', ''))
tokens['GENERAL_LAN_STATUS'] = _iface_status(net.get('lan_interface', ''))
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
sys_ifaces = _get_system_interfaces()
# Always include currently-configured values so dropdowns are never blank.
for configured in [net.get('wan_interface', ''), net.get('lan_interface', '')]:
if configured and configured not in sys_ifaces:
sys_ifaces.append(configured)
sys_ifaces.sort()
tokens['NETWORK_INTERFACE_OPTIONS'] = json.dumps(
[{'value': i, 'label': i} for i in sys_ifaces]
)
tokens['NETWORK_INTERFACE_STATUS_OPTIONS'] = json.dumps(
[{'value': i, 'label': f'{i} - {_iface_status(i).title()}'} for i in sys_ifaces]
)
iface_data = [_iface_info(i) for i in sys_ifaces]
tokens['NETWORK_INTERFACE_DATA_JSON'] = json.dumps(iface_data)
max_speed_len = max(
(len(str(d.get('speed') or '')) for d in iface_data),
default=len('Speed')
)
tokens['NETWORK_INTERFACE_STATS_SPEED_PAD'] = str(max(max_speed_len, len('Speed')))
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-'))
tokens['GENERAL_APPLY_ON_SAVE'] = 'true' if session.get('apply_changes_immediately', False) else 'false'
# Queue health fix before building the pending table so it appears immediately.
_level = _client_level()
if _level >= LEVEL_RANK['administrator']:
try:
import json as _hj
_st = _hj.load(open(f'{CONFIGS_DIR}/.health'))
_has_problems = any(
item.get('status') == 'problem'
for section in ('configurations', 'logs', 'services')
for item in _st.get(section, [])
)
if _has_problems:
_fix_uuid, _ = _find_cmd_in_queues('fix problems')
if _fix_uuid is None:
queue_command('fix problems', user=session.get('email_address', ''))
except Exception:
pass
all_snaps = load_all_snapshots()
_snap_uuid_set = {s.get('uuid') for s in all_snaps}
pending_items = get_dashboard_pending()
if pending_items:
# Group by command; each group = one row in the Pending Actions table.
from collections import defaultdict
groups = defaultdict(list)
for _uuid, _ts, cmd, user in pending_items:
groups[cmd].append((_uuid, user))
rows = ''
for cmd, entries in groups.items():
users = ', '.join(sorted({u for _, u in entries if u and u != 'unknown'}))
snap_uuids = [_uuid for _uuid, _ in entries if _uuid in _snap_uuid_set]
if snap_uuids:
req_tags = ''.join(
f'<span class="tag" data-tooltip="{_uuid}" data-uuid="{_uuid}">'
f'<span class="tl-full">{_uuid[:8]}</span>'
f'<span class="tl-short">{_uuid[:8]}</span>'
f'<span class="tl-min">{_uuid[:8]}</span>'
'</span>'
for _uuid in snap_uuids
)
req_cell = f'<td class="table-cell"><div class="tag-list">{req_tags}</div></td>'
else:
req_cell = '<td class="table-cell">-</td>'
rows += (
'<tr>'
f'<td class="table-cell">{e(cmd)}</td>'
f'<td class="table-cell">{e(users)}</td>'
f'{req_cell}'
'</tr>'
)
pending_html = (
'<table class="data-table">'
'<thead><tr>'
'<th class="table-header">Command</th>'
'<th class="table-header">User</th>'
'<th class="table-header">Required By</th>'
'</tr></thead>'
f'<tbody>{rows}</tbody>'
'</table>'
)
else:
pending_html = '<p class="text-muted">No pending actions.</p>'
tokens['PENDING_ACTIONS_HTML'] = pending_html
tokens['NO_PENDING'] = 'true' if not pending_items else ''
tokens['APPLY_WARNING'] = (
f'<span class="btn-notice btn-notice-warning">'
f'{_load_icon("arrow-right")}'
f'Applying actions will briefly disrupt connections as network services are restarted.'
f'</span>'
if pending_items else ''
)
done_ts_map = get_done_timestamps()
if all_snaps:
# UUIDs that cannot be reverted: revert entries themselves, and entries
# that have already been reverted (referenced in another snap's 'reverts').
_no_revert = set()
for _s in all_snaps:
if _s.get('operation') == 'revert':
_no_revert.add(_s.get('uuid', ''))
if _s.get('reverts'):
_no_revert.add(_s['reverts'])
hist_rows = ''
_hist_onclick = (
'onclick="if(event.target.type!==\'checkbox\')'
'this.nextElementSibling.hidden=!this.nextElementSibling.hidden"'
)
for snap in all_snaps:
_uuid = snap.get('uuid', '')
applied_ts = done_ts_map.get(_uuid)
dt_str = datetime.fromtimestamp(applied_ts).strftime('%Y-%m-%d %H:%M') if applied_ts else '-'
snap_desc = e(snap.get('description', ''))
before_val = snap.get('before')
after_val = snap.get('after')
snap_tag = (
f'<div class="tag-list"><span class="tag" data-tooltip="{e(_uuid)}" data-uuid="{e(_uuid)}">'
f'<span class="tl-full">{e(_uuid[:8])}</span>'
f'<span class="tl-short">{e(_uuid[:8])}</span>'
f'<span class="tl-min">{e(_uuid[:8])}</span>'
'</span></div>'
)
snap_user = e(snap.get('user', ''))
_cb_attrs = 'disabled title="Cannot revert"' if _uuid in _no_revert else ''
hist_rows += (
f'<tr class="row-expandable" data-uuid="{e(_uuid)}" {_hist_onclick}>'
f'<td class="table-cell"><input type="checkbox" name="selected_uuids" value="{e(_uuid)}" {_cb_attrs}/></td>'
f'<td class="table-cell">{e(dt_str)}</td>'
f'<td class="table-cell">{snap_desc}</td>'
f'<td class="table-cell">{_render_snap_val(before_val)}</td>'
f'<td class="table-cell">{_render_snap_val(after_val)}</td>'
f'<td class="table-cell">{snap_tag}</td>'
f'<td class="table-cell">{snap_user}</td>'
'</tr>'
f'{_snap_expand_row(before_val, after_val, 7)}'
)
select_all = (
'<input type="checkbox" '
'onchange="document.querySelectorAll(\'[name=selected_uuids]:not(:disabled)\').forEach(c=>c.checked=this.checked)"/>'
)
history_html = (
'<table class="data-table">'
'<thead><tr>'
f'<th class="table-header">{select_all}</th>'
'<th class="table-header">Applied</th>'
'<th class="table-header">Description</th>'
'<th class="table-header">Before</th>'
'<th class="table-header">After</th>'
'<th class="table-header">Snapshot</th>'
'<th class="table-header">User</th>'
'</tr></thead>'
f'<tbody>{hist_rows}</tbody>'
'</table>'
)
else:
history_html = '<p class="text-muted">No change history.</p>'
tokens['CHANGE_HISTORY_HTML'] = history_html
tokens['NO_HISTORY'] = 'true' if not all_snaps else ''
servers = dns.get('upstream_servers', [])
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
tokens['DNS_UPSTREAM_SERVERS_JSON'] = json.dumps(servers)
tokens['OVERVIEW_UPSTREAM_SERVERS'] = ', '.join(servers) or '-'
non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
vlan_names = [v.get('name', '') for v in vlans]
tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-'
tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans))
tokens['STAT_LEASE_COUNT'] = str(len(_live_dhcp_leases()))
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{e(n)}">{e(n)}</option>' for n in vlan_names
)
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn')))
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name', '') for v in vlans])
tokens['EXISTING_VLAN_INTERFACES_JSON'] = json.dumps([_resolve_iface(v, cfg) for v in vlans])
tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True)))
tokens['STAT_BLOCKLIST_COUNT'] = str(len(cfg.get('dns_blocking', {}).get('blocklists', [])))
tokens['BLOCKLIST_STATS_HTML'] = _blocklist_stats_html(cfg)
ddns = _load_ddns()
ddns_gen = ddns.get('general', {})
tokens['DDNS_TIMER_INTERVAL'] = ddns_gen.get('timer_interval', '-')
_interval_secs = _parse_interval_to_seconds(ddns_gen.get('timer_interval', '')) or 600
tokens['DDNS_TIMER_INTERVAL_MINS'] = str(_interval_secs // 60)
tokens['DDNS_GEN_LOG_MAX_KB'] = str(ddns_gen.get('log_max_kb', 1024))
tokens['DDNS_GEN_LOG_ERRORS_ONLY'] = 'true' if ddns_gen.get('log_errors_only') else 'false'
enabled_p = [p for p in ddns.get('providers', []) if p.get('enabled', True)]
tokens['STAT_DDNS_PROVIDER_COUNT'] = str(len(enabled_p))
_ip_check = ddns.get('ip_check_services', [])
_http_svc = [s['url'] for s in _ip_check if s.get('type') == 'http']
_dig_svc = [s['url'] for s in _ip_check if s.get('type') == 'dig']
tokens['STAT_IP_CHECK_TOTAL'] = str(len(_ip_check))
tokens['STAT_IP_CHECK_SUB'] = f'{len(_http_svc)} http and {len(_dig_svc)} dig'
tokens['IP_CHECK_HTTP_JSON'] = json.dumps(_http_svc)
tokens['IP_CHECK_DIG_JSON'] = json.dumps(_dig_svc)
_ddns_labels = {'noip': 'No-IP', 'cloudflare': 'Cloudflare', 'duckdns': 'DuckDNS'}
tokens['DDNS_PROVIDER_OPTIONS'] = json.dumps([
{'value': p, 'label': _ddns_labels.get(p, p.title())}
for p in validate.VALID_DDNS_PROVIDERS
])
wg_vlans_list = sorted(
[v for v in vlans if v.get('is_vpn')],
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
)
tokens['VPN_VLAN_OPTIONS'] = json.dumps([
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
for i, v in enumerate(wg_vlans_list)
])
wg_vlan = wg_vlans_list[0] if wg_vlans_list else {}
vpn = wg_vlan.get('vpn_information', {})
overrides = vpn.get('explicit_overrides', {})
tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_server', ''))
tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
# Compute gateway from server_identities (lowest last-octet), fallback to first subnet host
try:
import ipaddress as _ipaddress
ident_ips = [s['ip'] for s in wg_vlan.get('server_identities', []) if s.get('ip')]
if ident_ips:
default_gw = str(min((_ipaddress.IPv4Address(ip) for ip in ident_ips),
key=lambda x: x.packed[-1]))
else:
wg_net = _ipaddress.IPv4Network(
f"{wg_vlan['subnet']}/{wg_vlan['subnet_mask']}", strict=False)
default_gw = str(next(wg_net.hosts()))
tokens['VPN_GATEWAY'] = overrides.get('gateway') or default_gw
except Exception:
tokens['VPN_GATEWAY'] = ''
ip_str, sub_str, next_interval, last_obtained = _public_ip_info(ddns)
tokens['STAT_PUBLIC_IP'] = ip_str
tokens['STAT_DDNS_HOSTNAME'] = sub_str
tokens['STAT_DDNS_NEXT_INTERVAL'] = next_interval
tokens['STAT_PUBLIC_IP_LAST_OBTAINED'] = last_obtained
tokens['STAT_PUBLIC_IP_LAST_CHECKED'] = _ddns_last_checked()
tokens['DDNS_LOG_TAIL'], tokens['DDNS_LOG_SUMMARY'] = _ddns_log_tail()
tokens['STAT_UPTIME'] = _run('uptime -p') or '-'
tokens['STAT_NFTABLES_STATUS'] = 'Active' if _run('nft list tables 2>/dev/null').strip() else 'Inactive'
dns_stats = _get_dnsmasq_stats()
tokens['DNS_STAT_QUERIES'] = dns_stats['queries']
tokens['DNS_STAT_HITS'] = dns_stats['hits']
tokens['DNS_STAT_HIT_RATE'] = dns_stats['hit_rate']
tokens['DNS_STAT_FORWARDED'] = dns_stats['forwarded']
tokens['DNS_STAT_AUTH'] = dns_stats['auth']
tokens['DNS_STAT_TCP_PEAK'] = dns_stats['tcp_peak']
tokens['STAT_BLOCKED_TODAY'] = _count_blocked_today()
tokens['STAT_BLOCKED_DOMAINS'] = _count_blocked_domains()
tokens['STAT_BL_LAST_UPDATE'] = _bl_last_update()
tokens['PREF_EMAIL'] = session.get('email_address', '')
tokens['PREF_TIMEZONE'] = session.get('timezone', '')
blank = [{'value': '', 'label': '-- Select timezone --'}]
tokens['TIMEZONE_OPTIONS'] = json.dumps(
blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]
)
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
tokens['BLOCKLIST_FORMAT_OPTIONS'] = json.dumps([
{'value': 'hosts', 'label': 'hosts (hosts file format)'},
{'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
])
tokens['BLOCKLIST_NAME_OPTIONS'] = json.dumps([
{'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
])
tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
{'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
{'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
{'value': 'manager', 'label': 'Manager (full access including account management)'},
])
return tokens
# HTML helpers ======================================================
def e(text):
return html_mod.escape(str(text))
def _snap_text(val):
"""Return the plain-text representation of a snapshot before/after value."""
if val is None:
return ''
if isinstance(val, dict) and len(val) == 1:
k, v = next(iter(val.items()))
return f'{k}: {v}'
if isinstance(val, (dict, list)):
return json.dumps(val, separators=(',', ':'))
return str(val)
def _render_snap_val(val):
"""Return truncated escaped HTML for a snapshot before/after table cell."""
text = _snap_text(val)
if not text:
return ''
trunc = (text[:23] + '') if len(text) > 24 else text
return e(trunc)
def _snap_expand_row(before_val, after_val, colspan):
"""Return a hidden <tr> that expands with full before/after content."""
def box(label, val):
text = _snap_text(val) if val is not None else ''
if isinstance(val, (dict, list)):
text = json.dumps(val, indent=2)
body = e(text) if text else '<em class="snap-expand-none">(none)</em>'
return (
'<div class="snap-expand-col">'
f'<span class="snap-expand-label">{label}</span>'
f'<pre class="snap-expand-pre">{body}</pre></div>'
)
inner = f'<div class="snap-expand-cols">{box("Before", before_val)}{box("After", after_val)}</div>'
return f'<tr hidden><td colspan="{colspan}" class="snap-expand-cell">{inner}</td></tr>'
def apply_tokens(text, tokens):
"""Substitute %TOKEN% placeholders. Values are NOT auto-escaped - callers
that use results in HTML attribute or text context should call e() around
the expanded value (or around individual fields) as appropriate."""
return re.sub(r'%([A-Z_]+)%', lambda m: str(tokens.get(m.group(1), m.group(0))), text)
def _expand_fields(obj, tokens):
"""Recursively apply token substitution to a field-definition object.
String values that resolve to a JSON array or object are parsed back into
Python structures so they serialize correctly into data-fields JSON."""
if isinstance(obj, list):
return [_expand_fields(item, tokens) for item in obj]
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
if isinstance(v, str):
s = apply_tokens(v, tokens)
if s != v and s[:1] in ('[', '{'):
try:
out[k] = json.loads(s)
continue
except Exception:
pass
out[k] = s
else:
out[k] = _expand_fields(v, tokens)
return out
return obj
# Content item renderers ============================================
def render_items(items, tokens, inherited_req=None):
level = _client_level()
parts = []
for item in items:
req = item.get('client_requirement', inherited_req)
if not _passes(req, level):
continue
parts.append(_render_item(item, tokens, req))
return ''.join(parts)
def _render_item(item, tokens, inherited_req=None):
t = item.get('type', '')
req = item.get('client_requirement', inherited_req)
if t == 'h1':
return f'<h1>{e(apply_tokens(item.get("text", ""), tokens))}</h1>'
if t == 'hr':
return '<hr class="divider"/>'
if t == 'p':
text = e(apply_tokens(item.get('text', ''), tokens))
link = item.get('link')
if link:
href = e(apply_tokens(link.get('action', '#'), tokens))
ltext = e(apply_tokens(link.get('text', ''), tokens))
return f'<p>{text} <a href="{href}" class="auth-link">{ltext}</a></p>'
return f'<p>{text}</p>'
if t == 'spacer':
return '<div class="spacer"></div>'
if t in ('button_primary', 'button_secondary', 'button_danger', 'button_ghost'):
cls_map = {
'button_primary': 'btn-primary',
'button_secondary': 'btn-secondary',
'button_danger': 'btn-danger',
'button_ghost': 'btn-ghost',
}
cls = cls_map[t]
extra = item.get('class', '')
if extra:
cls = f'{cls} {extra}'
text = e(apply_tokens(item.get('text', ''), tokens))
action_raw = item.get('action', '')
action = e(apply_tokens(action_raw, tokens))
disabled_val = apply_tokens(str(item.get('disabled', '')), tokens)
disabled = ' disabled' if disabled_val and disabled_val not in ('false', '0') else ''
formaction = item.get('formaction', '')
if formaction:
formaction = e(apply_tokens(formaction, tokens))
return f'<button type="submit" class="btn {e(cls)}" formaction="{formaction}"{disabled}>{text}</button>'
if item.get('method', '').lower() == 'post':
return (
f'<form method="post" action="{action}" class="form-inline">'
f'<button type="submit" class="btn {e(cls)}"{disabled}>{text}</button></form>'
)
if action_raw:
return f'<a href="{action}" class="btn {e(cls)}">{text}</a>'
return f'<button type="submit" class="btn {e(cls)}"{disabled}>{text}</button>'
if t == 'button_cancel':
text = e(apply_tokens(item.get('text', 'Cancel'), tokens))
extra_cls = (' ' + item['class']) if item.get('class') else ''
return f'<button type="button" class="btn btn-secondary btn-cancel{extra_cls}" disabled>{text}</button>'
if t == 'header_page_title':
return f'<div class="page-header">{render_items(item.get("items", []), tokens, req)}</div>'
if t in ('section', 'auth_wrapper'):
tag = 'div'
cls = 'auth-wrapper' if t == 'auth_wrapper' else 'section'
return f'<{tag} class="{cls}">{render_items(item.get("items", []), tokens, req)}</{tag}>'
if t == 'auth_card':
return f'<div class="auth-card">{render_items(item.get("items", []), tokens, req)}</div>'
if t == 'stat_card_grid':
return f'<div class="stat-card-grid">{render_items(item.get("items", []), tokens, req)}</div>'
if t == 'stat_card':
label = e(apply_tokens(item.get('label', ''), tokens))
raw_value = apply_tokens(item.get('value', ''), tokens)
value = e(raw_value)
sub = e(apply_tokens(item.get('sub', ''), tokens))
variant = item.get('variant', '')
cls = f'stat-card{(" stat-card-" + variant) if variant else ""}'
edit_action = item.get('edit_action', '')
edit_field = item.get('edit_field', '')
edit_input_type = item.get('edit_input_type', 'text')
edit_suffix = item.get('edit_suffix', '')
edit_min = item.get('edit_min', '')
edit_raw = apply_tokens(item.get('edit_value', item.get('value', '')), tokens)
reveal_card_id = item.get('reveal_card_id', '')
if reveal_card_id:
return (
f'<div class="{cls}">'
f'<div class="stat-card-label">{label}</div>'
'<div class="stat-card-value-row">'
f'<span class="stat-card-value">{value}</span>'
'<button type="button" class="btn btn-ghost btn-sm"'
f' data-reveal-card="{e(reveal_card_id)}">Edit</button>'
'</div>'
f'<div class="stat-card-sub">{sub}</div>'
'</div>'
)
if edit_action and edit_field:
min_attr = f' min="{e(edit_min)}"' if edit_min else ''
suffix_html = f'<span>{e(edit_suffix)}</span>' if edit_suffix else ''
input_wrap = (
'<div class="stat-card-value-row">'
f'<input type="{e(edit_input_type)}" name="{e(edit_field)}" value="{e(edit_raw)}"'
f' data-original="{e(edit_raw)}" class="form-input stat-card-edit-input"{min_attr}/>'
f'{suffix_html}</div>'
)
return (
f'<div class="{cls} stat-card-editable">'
f'<div class="stat-card-label">{label}</div>'
'<div class="stat-card-view">'
f'<span class="stat-card-value">{value}</span>'
'<button type="button" class="btn btn-ghost btn-sm stat-card-edit-btn">Edit</button>'
'</div>'
f'<form class="stat-card-edit-form hidden" action="{e(edit_action)}" method="post">'
f'<input type="hidden" name="config_hash" value="{e(config_hash())}"/>'
f'{input_wrap}'
'<div class="stat-card-edit-actions">'
'<button type="submit" class="btn btn-primary btn-sm" disabled>Save</button>'
'<button type="button" class="btn btn-secondary btn-sm stat-card-cancel-btn">Cancel</button>'
'</div>'
'</form>'
f'<div class="stat-card-sub">{sub}</div>'
'</div>'
)
return (
f'<div class="{cls}">'
f'<div class="stat-card-label">{label}</div>'
f'<div class="stat-card-value">{value}</div>'
f'<div class="stat-card-sub">{sub}</div>'
'</div>'
)
if t == 'card':
label = item.get('label', '')
id_attr = f' id="{e(item["id"])}"' if item.get('id') else ''
cls_hidden = ' hidden' if item.get('hidden') else ''
header = f'<div class="card-header"><h2 class="card-title">{e(label)}</h2></div>' if label else ''
body = render_items(item.get('items', []), tokens, req)
return f'<div class="card{cls_hidden}"{id_attr}>{header}<div class="card-body">{body}</div></div>'
if t == 'field_status':
label = e(item.get('label', ''))
raw = apply_tokens(item.get('value', ''), tokens).upper()
badge_map = {
'UP': ('badge-enabled', 'Up'),
'DOWN': ('badge-warning', 'Down'),
'INVALID': ('badge-danger', 'Invalid'),
}
badge_cls, badge_text = badge_map.get(raw, ('badge-disabled', raw.title() or 'Unknown'))
return (
'<div class="form-group">'
f'<label class="form-label">{label}</label>'
f'<div class="field-status-badge"><span class="badge {badge_cls}">{badge_text}</span></div>'
'</div>'
)
if t == 'info_bar':
variant = item.get('variant', 'info')
text = e(apply_tokens(item.get('text', ''), tokens))
return f'<div class="info-bar info-bar-inline info-bar-{e(variant)}">{text}</div>'
if t == 'pre_block':
text = e(apply_tokens(item.get('text', ''), tokens))
extra = ' data-scroll-bottom' if item.get('scroll_to_bottom') else ''
return f'<pre class="pre-block"{extra}>{text}</pre>'
if t == 'credential_fields':
psel = e(item.get('provider_select', 'provider'))
return (
f'<div class="credential-fields" data-provider-select="{psel}">'
'<div class="cred-group-token hidden">'
'<div class="form-group"><label class="form-label">API Token</label>'
'<input type="text" name="api_token" class="form-input"/></div>'
'</div>'
'<div class="cred-group-noip hidden">'
'<div class="form-group"><label class="form-label">Username</label>'
'<input type="text" name="username" class="form-input"/></div>'
'<div class="form-group"><label class="form-label">Password</label>'
'<input type="password" name="password" class="form-input"/></div>'
'</div>'
'</div>'
)
if t == 'grid':
rows_html = ''
for row in item.get('rows', []):
cells = ''.join(_render_item(c, tokens, req) for c in row.get('cells', []))
rows_html += f'<div class="info-grid-row">{cells}</div>'
return f'<div class="info-grid">{rows_html}</div>'
if t == 'grid_label':
return f'<div class="info-grid-label">{e(apply_tokens(item.get("text", ""), tokens))}</div>'
if t == 'grid_value':
return f'<div class="info-grid-value">{e(apply_tokens(item.get("text", ""), tokens))}</div>'
if t == 'form':
action = e(apply_tokens(item.get('action', ''), tokens))
method = e(item.get('method', 'post'))
inner = render_items(item.get('items', []), tokens, req)
hash_field = f'<input type="hidden" name="config_hash" value="{e(config_hash())}"/>'
originals = _collect_form_originals(item.get('items', []), tokens)
orig_field = (
f'<input type="hidden" name="original_values" value="{e(json.dumps(originals))}"/>'
if originals else ''
)
field_specs, submit_sel = _collect_form_specs(item.get('items', []))
script = _render_form_script(field_specs, submit_sel) if (field_specs and submit_sel) else ''
return f'<form action="{action}" method="{method}">{hash_field}{orig_field}{inner}</form>{script}'
if t == 'hidden':
name = e(item.get('name', ''))
value = e(apply_tokens(item.get('value', ''), tokens))
return f'<input type="hidden" name="{name}" value="{value}"/>'
if t == 'record_editor':
label = e(item.get('label', ''))
name = e(item.get('name', ''))
empty = e(item.get('empty_message', 'No records added.'))
fields = item.get('fields', [])
col_count = len(fields) + 1
ths = ''.join(f'<th>{e(f.get("label",""))}</th>' for f in fields) + '<th></th>'
form_rows = ''
for f in fields:
f_label = e(f.get('label', ''))
f_name = e(f.get('name', ''))
f_placeholder = e(f.get('placeholder', ''))
f_required = 'true' if f.get('required') else 'false'
f_validate = f.get('validate', '')
f_valtype = f.get('valtype', '')
f_attrs = f.get('attrs', {})
attr_str = f' data-field="{f_name}" data-required="{f_required}"'
if f_validate:
attr_str += f' data-validate="{e(f_validate)}"'
if f_valtype:
attr_str += f' data-valtype="{e(f_valtype)}"'
for ak, av in f_attrs.items():
attr_str += f' {e(ak)}="{e(str(av))}"'
inp = f'<input type="text" class="form-input"{attr_str} placeholder="{f_placeholder}"/>'
if f_validate or f_valtype:
field_inner = (
'<div class="field-wrap">'
+ inp +
'<p class="form-hint field-dyn-hint hidden"></p>'
'</div>'
)
else:
field_inner = inp
form_rows += (
f'<div class="form-group">'
f'<label class="form-label">{f_label}</label>'
f'{field_inner}'
f'</div>'
)
n = len(fields)
grid_class = f'form-row-{n}' if n in (2, 3, 4) else 'form-row-3'
return (
f'<div class="form-group record-editor" data-name="{name}" data-empty-message="{empty}">'
f'<div class="record-editor-body">'
f'<label class="form-label record-editor-label">{label}</label>'
f'<table class="data-table record-editor-table">'
f'<thead><tr>{ths}</tr></thead>'
f'<tbody class="record-editor-rows">'
f'<tr class="record-editor-empty-row">'
f'<td colspan="{col_count}" class="table-empty">{empty}</td>'
f'</tr>'
f'</tbody>'
f'</table>'
f'<div class="record-editor-form">'
f'<div class="{grid_class}">{form_rows}</div>'
f'<div style="margin-top:0.5rem">'
f'<button type="button" class="btn btn-secondary btn-sm record-editor-add-btn">Add</button>'
f'<button type="button" class="btn btn-ghost btn-sm record-editor-cancel-btn hidden" style="margin-left:0.5rem">Cancel</button>'
f'</div>'
f'</div>'
f'</div>'
f'<input type="hidden" name="{name}" class="record-editor-hidden" value="[]"/>'
f'</div>'
)
if t == 'field':
return _render_field(item, tokens)
if t == 'field_row':
inner = render_items(item.get('items', []), tokens, req)
cols = item.get('cols', 2)
return f'<div class="form-row-{cols}">{inner}</div>'
if t == 'subnet_row':
subnet_name = e(item.get('subnet_name', 'subnet'))
prefix_name = e(item.get('prefix_name', 'subnet_mask'))
subnet_val = apply_tokens(item.get('subnet_value', ''), tokens)
prefix_raw = apply_tokens(item.get('prefix_value', '24'), tokens)
subnet_ph = e(apply_tokens(item.get('subnet_placeholder', ''), tokens))
show_derived = item.get('show_derived_vlan_id', False)
try:
pf = max(1, min(30, int(prefix_raw)))
except (ValueError, TypeError):
pf = 24
dotted = _prefix_to_dotted(pf)
return (
'<div class="form-group">'
'<label class="form-label">Subnet</label>'
'<div class="field-wrap">'
'<div class="subnet-row-wrap">'
f'<input type="text" name="{subnet_name}" value="{e(subnet_val)}" placeholder="{subnet_ph}" class="form-input"/>'
'<span class="subnet-sep">/</span>'
f'<input type="number" name="{prefix_name}" value="{pf}" min="1" max="30" class="form-input subnet-prefix-input"/>'
'</div>'
f'<span class="subnet-dotted">{e(dotted)}</span>'
'<p class="form-hint field-dyn-hint hidden"></p>'
'</div>'
'</div>'
)
if t == 'editable_list':
return _render_editable_list(item, tokens)
if t == 'select':
name = e(item.get('name', ''))
options = apply_tokens(item.get('options', ''), tokens)
filter_col = item.get('filter_col', '')
extra = f' data-filter-col="{e(filter_col)}"' if filter_col else ''
return f'<select name="{name}" class="form-select"{extra}>{options}</select>'
if t == 'spacer':
return '<span style="margin-left:auto"></span>'
if t == 'button_row':
justify = item.get('justify', '')
style_attr = f' style="justify-content:{e(justify)}"' if justify else ''
inner = render_items(item.get('items', []), tokens, req)
return f'<div class="button-row"{style_attr}>{inner}</div>'
if t == 'table':
return _render_table(item, tokens, req)
if t == 'raw_html':
return Markup(apply_tokens(item.get('html', ''), tokens))
return ''
def _render_field(item, tokens):
label = e(item.get('label', ''))
name = e(item.get('name', ''))
input_type = item.get('input_type', 'text')
value = apply_tokens(item.get('value', ''), tokens)
placeholder = e(apply_tokens(item.get('placeholder', ''), tokens))
hint = e(apply_tokens(item.get('hint', ''), tokens))
hint_html = f'<p class="form-hint">{hint}</p>' if hint else ''
readonly = ' readonly' if item.get('readonly') else ''
if input_type == 'hidden':
return f'<input type="hidden" name="{name}" value="{e(value)}"/>'
if input_type == 'checkbox':
checked = 'checked' if value.lower() in ('true', '1', 'yes') else ''
cb_label = item.get('checkbox_label')
if cb_label:
label_html = f'<label class="form-label">{label}</label>' if label else ''
return (
'<div class="form-group">'
f'{label_html}'
'<label class="form-checkbox-row">'
f'<input type="checkbox" name="{name}" {checked} class="form-checkbox"/>'
f' <span class="form-checkbox-label">{e(cb_label)}</span>'
f'</label>{hint_html}</div>'
)
return (
'<div class="form-group">'
'<label class="form-label">'
f'<input type="checkbox" name="{name}" {checked} class="form-checkbox"/> {label}'
f'</label>{hint_html}</div>'
)
if input_type == 'checkbox_group':
try:
opts = json.loads(apply_tokens(item.get('options', '[]'), tokens))
selected = json.loads(value) if value else []
except Exception:
opts, selected = [], []
boxes = ''.join(
'<label class="checkbox-group-item">'
f'<input type="checkbox" name="{name}" value="{e(o.get("value",""))}"'
f'{"checked" if o.get("value") in selected else ""}/> {e(o.get("label",""))}'
'</label>'
for o in opts
)
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<div class="checkbox-group">{boxes}</div>{hint_html}</div>'
)
if input_type == 'select':
options = item.get('options', [])
if isinstance(options, str):
try:
options = json.loads(apply_tokens(options, tokens))
except Exception:
options = []
current = apply_tokens(item.get('value', ''), tokens)
opts_html = ''.join(
f'<option value="{e(o["value"])}"{" selected" if o["value"] == current else ""}>{e(o["label"])}</option>'
for o in options
)
validate = item.get('validate', '')
depends = item.get('depends', [])
validate_attr = f' data-validate="{e(validate)}"' if validate else ''
depends_attr = f' data-depends="{e(",".join(depends))}"' if depends else ''
if validate:
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<div class="field-wrap"><select name="{name}" class="form-select"{validate_attr}{depends_attr}>{opts_html}</select>'
f'<p class="form-hint field-dyn-hint hidden"></p></div>'
f'{hint_html}</div>'
)
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<select name="{name}" class="form-select"{validate_attr}{depends_attr}>{opts_html}</select>'
f'{hint_html}</div>'
)
if input_type == 'number':
min_attr = f' min="{item["min"]}"' if 'min' in item else ''
max_attr = f' max="{item["max"]}"' if 'max' in item else ''
validate = item.get('validate', 'positive_int')
depends = item.get('depends', [])
validate_attr = f' data-validate="{e(validate)}"'
depends_attr = f' data-depends="{e(",".join(depends))}"' if depends else ''
dyn_hint_html = '<p class="form-hint field-dyn-hint hidden"></p>'
inp = (
f'<input type="number" name="{name}" value="{e(value)}"{min_attr}{max_attr}'
f' class="form-input form-input-mono"{readonly}{validate_attr}{depends_attr}/>'
)
if item.get('layout') == 'inline':
return (
'<div class="form-group" style="display:flex;align-items:center;gap:0.75em">'
f'<label class="form-label" style="margin:0;white-space:nowrap">{label}</label>'
f'<div class="field-wrap" style="width:6rem">{inp}{dyn_hint_html}</div>'
f'{hint_html}</div>'
)
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<div class="field-wrap">{inp}{dyn_hint_html}</div>{hint_html}</div>'
)
if input_type == 'textarea':
rows = item.get('rows', 4)
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<textarea name="{name}" rows="{rows}" placeholder="{placeholder}"'
f' class="form-input">{e(value)}</textarea>'
f'{hint_html}</div>'
)
if input_type == 'interface_picker':
current = apply_tokens(item.get('value', ''), tokens)
try:
ifaces = json.loads(apply_tokens(item.get('data', '[]'), tokens))
except Exception:
ifaces = []
state_map = {
'UP': ('badge-enabled', 'Up'),
'DOWN': ('badge-warning', 'Down'),
'INVALID': ('badge-danger', 'Invalid'),
}
rows_html = ''
cur_sc, cur_st = 'badge-disabled', ''
cur_speed = cur_mtu = cur_mac = cur_perm_mac = cur_min_mtu = cur_max_mtu = None
try:
speed_pad = int(tokens.get('NETWORK_INTERFACE_STATS_SPEED_PAD', '0'))
except Exception:
speed_pad = 0
def _pad_speed(val):
s = val or '-'
return '&nbsp;' * max(0, speed_pad - len(s)) + e(s)
for ifc in ifaces:
iname = ifc.get('name', '')
wireless = ifc.get('wireless', False)
state = ifc.get('state', 'UNKNOWN')
carrier = ifc.get('carrier')
raw_speed = ifc.get('speed')
raw_mtu = ifc.get('mtu')
raw_mac = ifc.get('mac')
speed = raw_speed or '-'
mtu = raw_mtu or '-'
mac = raw_mac or '-'
sc, st = state_map.get(state, ('badge-disabled', state.title()))
type_txt = 'Wireless' if wireless else 'Wired'
if wireless:
carrier_txt = '-'
else:
carrier_txt = 'Yes' if carrier else ('No' if carrier is False else '-')
sel_cls = ' selected' if iname == current else ''
if iname == current:
cur_sc, cur_st = sc, st
cur_speed, cur_mtu, cur_mac = raw_speed, raw_mtu, raw_mac
cur_perm_mac = ifc.get('perm_mac')
cur_min_mtu = ifc.get('min_mtu')
cur_max_mtu = ifc.get('max_mtu')
padded_speed = _pad_speed(raw_speed)
padded_mtu = '&nbsp;' * max(0, 4 - len(raw_mtu or '-')) + e(raw_mtu or '-')
rows_html += (
f'<tr class="iface-picker-row{sel_cls}" data-iface="{e(iname)}"'
f' data-state-class="{e(sc)}" data-state-label="{e(st)}"'
f' data-speed="{padded_speed}" data-mtu="{padded_mtu}"'
f' data-mac="{e(raw_mac or "")}">'
f'<td class="col-mono">{e(iname)}</td>'
f'<td>{e(type_txt)}</td>'
f'<td><span class="badge {sc}">{st}</span></td>'
f'<td>{e(carrier_txt)}</td>'
f'<td>{e(speed)}</td>'
f'<td>{e(mtu)}</td>'
f'<td class="col-mono">{e(mac)}</td>'
'</tr>'
)
table_html = (
'<div class="table-wrapper">'
'<table class="data-table iface-picker-table">'
'<thead><tr><th>Interface</th><th>Type</th><th>State</th>'
'<th>Carrier</th><th>Speed</th><th>MTU</th><th>MAC</th></tr></thead>'
f'<tbody>{rows_html}</tbody>'
'</table></div>'
)
btn_label = f'<span class="iface-picker-name">{e(current) or "Select..."}</span>'
btn_badge = (
f'<span class="badge {cur_sc} iface-picker-badge">{e(cur_st)}</span>'
if current else ''
)
if current and any([cur_speed, cur_mtu, cur_mac]):
ext_meta = (
'<table class="iface-picker-stats">'
'<thead><tr><th>Speed</th><th>MTU</th><th>MAC</th></tr></thead>'
'<tbody><tr>'
f'<td>{_pad_speed(cur_speed)}</td>'
f'<td>{"&nbsp;" * max(0, 4 - len(cur_mtu or "-"))}{e(cur_mtu or "-")}</td>'
f'<td class="col-mono">{e(cur_mac or "-")}</td>'
'</tr></tbody>'
'</table>'
)
else:
ext_meta = ''
configure_btn = (
'<button type="button" class="btn btn-secondary iface-configure-btn"'
f' data-iface="{e(current)}" data-mtu="{e(cur_mtu or "")}"'
f' data-mac="{e(cur_mac or "")}" data-perm-mac="{e(cur_perm_mac or "")}"'
f' data-min-mtu="{cur_min_mtu if cur_min_mtu is not None else ""}"'
f' data-max-mtu="{cur_max_mtu if cur_max_mtu is not None else ""}">'
'Configure</button>'
) if current else ''
return (
'<div class="form-group">'
f'<label class="form-label">{label}</label>'
'<div class="iface-picker">'
f'<input type="hidden" name="{name}" value="{e(current)}"/>'
'<div class="iface-picker-header">'
f'<button type="button" class="iface-picker-btn">{btn_label}{btn_badge}</button>'
f'{ext_meta}'
f'{configure_btn}'
'</div>'
f'<div class="iface-picker-dropdown">{table_html}</div>'
'</div>'
'</div>'
)
validate = item.get('validate', '')
depends = item.get('depends', [])
validate_attr = f' data-validate="{e(validate)}"' if validate else ''
depends_attr = f' data-depends="{e(",".join(depends))}"' if depends else ''
if validate:
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<div class="field-wrap"><input type="{e(input_type)}" name="{name}" value="{e(value)}"'
f' placeholder="{placeholder}" class="form-input"{readonly}{validate_attr}{depends_attr}/>'
f'<p class="form-hint field-dyn-hint hidden"></p></div>'
f'{hint_html}</div>'
)
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<input type="{e(input_type)}" name="{name}" value="{e(value)}"'
f' placeholder="{placeholder}" class="form-input"{readonly}{validate_attr}{depends_attr}/>'
f'{hint_html}</div>'
)
def _collect_form_specs(items):
"""Walk form items; return (field_specs, submit_sel) for factory script generation."""
fields = []
submit_sel = None
for item in items:
t = item.get('type', '')
if t == 'field':
itype = item.get('input_type', 'text')
if item.get('validate') or itype == 'checkbox' or itype == 'number':
fields.append(item)
elif t == 'subnet_row':
fields.append(item)
elif t == 'button_primary' and item.get('class'):
first_cls = item['class'].split()[0]
submit_sel = submit_sel or ('.' + first_cls)
elif t in ('field_row', 'button_row', 'section', 'form'):
sub, sub_btn = _collect_form_specs(item.get('items', []))
fields.extend(sub)
submit_sel = submit_sel or sub_btn
return fields, submit_sel
def _render_form_script(field_specs, submit_sel):
"""Generate an inline <script> for a form's validation and submit-gate wiring."""
import re
_safe = re.compile(r'^[a-zA-Z0-9_-]+$')
lines = ['(function() {']
lines.append(" var _prev = document.currentScript.previousElementSibling;")
lines.append(" var _card = _prev.closest('.card') || _prev.parentElement;")
lines.append(f" var _submit = _card ? _card.querySelector('{submit_sel}') : null;")
lines.append('')
# Classify each spec =================================================
subnet_items = [] # (subnet_var, prefix_var, subnet_name, prefix_name)
validate_items = [] # (js_var, field_name) — validated via validateEl
checkbox_only = [] # js_var — checkboxes that only need change→_upd
gate_vars = [] # JS boolean expressions that must all be true for submit
for spec in field_specs:
t = spec.get('type', '')
if t == 'subnet_row':
sn = spec.get('subnet_name', 'subnet')
pn = spec.get('prefix_name', 'subnet_mask')
if not (_safe.match(sn) and _safe.match(pn)):
continue
sv = '_' + sn.replace('-', '_')
pv = '_' + pn.replace('-', '_')
lines.append(f" var {sv} = _card.querySelector('[name=\"{sn}\"]');")
lines.append(f" var {pv} = _card.querySelector('[name=\"{pn}\"]');")
subnet_items.append((sv, pv))
gate_vars.append(f'{sv} && {sv}._valid')
elif t == 'field':
nm = spec.get('name', '')
itype = spec.get('input_type', 'text')
if not nm or not _safe.match(nm):
continue
vn = '_' + nm.replace('-', '_')
lines.append(f" var {vn} = _card.querySelector('[name=\"{nm}\"]');")
if itype == 'checkbox':
if spec.get('validate'):
validate_items.append((vn, nm))
gate_vars.append(f'{vn} && {vn}._valid')
else:
checkbox_only.append(vn)
else:
validate_items.append((vn, nm))
gate_vars.append(f'{vn} && {vn}._valid')
lines.append('')
# Submit gate =========================================================
gate_expr = ' && '.join(gate_vars) if gate_vars else 'true'
lines.append(' function _upd() {')
lines.append(' if (!_submit) return;')
lines.append(f' _submit.disabled = !({gate_expr});')
lines.append(' }')
lines.append('')
# validateEl listeners ================================================
for vn, _ in validate_items:
lines.append(f" if ({vn}) {vn}.addEventListener('input', function() {{ validateEl({vn}); _upd(); }});")
# subnet_row custom block =============================================
for sv, pv in subnet_items:
lines.append(f' function _chkSubnet() {{')
lines.append(f' if (!{sv} || !{pv}) return;')
lines.append(f" var res = _ipv4SubnetValid({sv}.value.trim(), {pv}.value.trim());")
lines.append(f" setFieldHint({sv}, res.ok ? '' : (res.msg||''), res.ok ? 'ok' : (res.partial ? 'warning' : 'error'));")
lines.append(f' {sv}._valid = res.ok;')
lines.append(f" var dot = {pv}.closest('.form-group').querySelector('.subnet-dotted');")
lines.append(f' var n = parseInt({pv}.value, 10);')
lines.append(f" if (dot) dot.textContent = (!isNaN(n) && n >= 1 && n <= 30) ? prefixToDotted(n) : '';")
lines.append(f' _upd();')
lines.append(f' }}')
lines.append(f" if ({sv}) {sv}.addEventListener('input', _chkSubnet);")
lines.append(f" if ({pv}) {pv}.addEventListener('input', _chkSubnet);")
# Checkbox change → _upd only =========================================
for vn in checkbox_only:
lines.append(f" if ({vn}) {vn}.addEventListener('change', _upd);")
lines.append('}());')
return '<script>' + '\n'.join(lines) + '</script>'
def _collect_form_originals(items, tokens):
"""Walk form items and return {name: value} for all input fields (used for original_values)."""
result = {}
for item in items:
t = item.get('type', '')
if t == 'field':
name = item.get('name', '')
input_type = item.get('input_type', 'text')
if not name or input_type == 'hidden':
continue
value = apply_tokens(item.get('value', ''), tokens)
if input_type == 'checkbox':
result[name] = '1' if value.lower() in ('true', '1', 'yes') else '0'
elif input_type == 'select' and not value:
try:
opts = json.loads(apply_tokens(item.get('options', '[]'), tokens))
value = opts[0]['value'] if opts else ''
except Exception:
pass
result[name] = value
else:
result[name] = value
elif t == 'editable_list':
name = item.get('name', '')
if not name:
continue
try:
vals = json.loads(apply_tokens(item.get('items', '[]'), tokens))
vals = [str(v) for v in vals]
except Exception:
vals = []
result[name] = vals
elif t == 'subnet_row':
result[item.get('subnet_name', 'subnet')] = apply_tokens(item.get('subnet_value', ''), tokens)
result[item.get('prefix_name', 'subnet_mask')] = apply_tokens(item.get('prefix_value', '24'), tokens)
elif t == 'field_row':
result.update(_collect_form_originals(item.get('items', []), tokens))
return result
def _render_editable_list(item, tokens):
label = e(item.get('label', ''))
name = e(item.get('name', ''))
ph = e(apply_tokens(item.get('item_placeholder', ''), tokens))
add_lbl = e(apply_tokens(item.get('add_label', 'Add'), tokens))
hint = e(apply_tokens(item.get('hint', ''), tokens))
hint_html = f'<p class="form-hint">{hint}</p>' if hint else ''
validate = e(item.get('validate', ''))
try:
items_list = json.loads(apply_tokens(item.get('items', '[]'), tokens))
except Exception:
items_list = []
rows = ''.join(
'<div class="editable-list-item">'
f'<input type="text" name="{name}" value="{e(v)}" class="form-input"/>'
'<button type="button" class="btn btn-ghost btn-sm editable-list-remove">Remove</button>'
'</div>'
for v in items_list
)
validate_attr = f' data-validate="{validate}"' if validate else ''
return (
f'<div class="form-group"><label class="form-label">{label}</label>'
f'<div class="editable-list" data-name="{name}" data-placeholder="{ph}"{validate_attr}>'
f'{rows}'
f'<button type="button" class="btn btn-ghost btn-sm editable-list-add">+ {add_lbl}</button>'
f'</div>{hint_html}</div>'
)
def _render_table(item, tokens, inherited_req=None):
level = _client_level()
columns = item.get('columns', [])
rows = _load_datasource(item.get('datasource', ''))
empty = e(item.get('empty_message', 'No data.'))
row_actions = item.get('row_actions', [])
hash_val = config_hash()
toolbar_html = ''
toolbar = item.get('toolbar')
if toolbar:
req = toolbar.get('client_requirement', inherited_req)
if _passes(req, level):
t_inner = render_items(toolbar.get('items', []), tokens, req)
toolbar_html = f'<div class="table-toolbar">{t_inner}</div>'
thead = ''.join(
f'<th class="{e(c["class"])}">{e(c.get("label",""))}</th>' if c.get("class") else f'<th>{e(c.get("label",""))}</th>'
for c in columns
)
if row_actions:
thead += '<th></th>'
if not rows:
colspan = len(columns) + (1 if row_actions else 0)
tbody = f'<tr><td colspan="{colspan}" class="table-empty">{empty}</td></tr>'
else:
tbody = ''
for idx, row in enumerate(rows):
cells = ''
for col in columns:
val = row
for part in col.get('field', '').split('.'):
val = val.get(part, '') if isinstance(val, dict) else ''
col_req = col.get('client_requirement', inherited_req)
toggle_allowed = _passes(col_req, level) if col_req else True
cells += _render_table_cell(
str(val) if val != '' else '-',
col.get('render', ''),
col.get('class', ''),
field=col.get('field', ''),
row_idx=idx,
toggle_action=col.get('toggle_action'),
toggle_allowed=toggle_allowed,
render_options=col.get('render_options', {}),
)
if row_actions:
btns = ''
for ra in row_actions:
req = ra.get('client_requirement', inherited_req)
if not _passes(req, level):
continue
text = e(ra.get('text', ''))
cls = e(ra.get('class', 'btn-ghost btn-sm'))
action = e(apply_tokens(ra.get('action', '#'), tokens))
method = ra.get('method', 'post').lower()
if method == 'post':
disable_if = ra.get('disable_i')
if disable_if and row.get(disable_if.get('field')) == disable_if.get('value'):
btns += f'<button type="button" class="btn {cls}" disabled>{text}</button>'
continue
btns += (
f'<form method="post" action="{action}" class="form-inline">'
f'<input type="hidden" name="row_index" value="{idx}"/>'
f'<input type="hidden" name="config_hash" value="{e(hash_val)}"/>'
f'<button type="submit" class="btn {cls}">{text}</button></form>'
)
elif method == 'js_edit':
target = e(ra.get('target', 'edit-form'))
row_json = e(json.dumps(row))
btns += (
f'<button type="button" class="btn {cls} row-edit-btn"'
f' data-row-index="{idx}" data-row="{row_json}"'
f' data-target="{target}">{text}</button>'
)
elif method == 'inline_edit':
fields_json = e(json.dumps(_expand_fields(ra.get('fields', []), tokens)))
row_json = e(json.dumps(row))
btns += (
f'<button type="button" class="btn {cls} row-inline-edit-btn"'
f' data-row-index="{idx}" data-row="{row_json}"'
f' data-action="{action}" data-fields="{fields_json}">{text}</button>'
)
else:
btns += f'<a href="{action}?row_index={idx}" class="btn {cls}">{text}</a>'
cells += f'<td class="col-actions">{btns}</td>'
tbody += f'<tr>{cells}</tr>'
return (
f'{toolbar_html}'
'<div class="table-wrapper">'
'<table class="data-table">'
f'<thead><tr>{thead}</tr></thead>'
f'<tbody>{tbody}</tbody>'
'</table></div>'
)
def _render_table_cell(value, render_fn, col_class='', field='', row_idx=None,
toggle_action=None, toggle_allowed=True, render_options=None):
parts = []
if col_class:
parts.append(f'class="{e(col_class)}"')
if field:
parts.append(f'data-field="{e(field)}"')
td_open = f'<td {" ".join(parts)}>' if parts else '<td>'
if not render_fn:
return f'{td_open}{e(value)}</td>'
if render_fn == 'badge_enabled_disabled':
if str(value).lower() in ('true', '1', 'yes', 'enabled'):
inner = '<span class="badge badge-enabled">Enabled</span>'
else:
inner = '<span class="badge badge-disabled">Disabled</span>'
return f'{td_open}{inner}</td>'
if render_fn == 'badge_yes_no':
opts = render_options or {}
if str(value).lower() in ('true', '1', 'yes', 'enabled'):
tip = f' data-tooltip="{e(opts["title_true"])}"' if opts.get('title_true') else ''
inner = f'<span class="badge badge-enabled"{tip}>Yes</span>'
else:
tip = f' data-tooltip="{e(opts["title_false"])}"' if opts.get('title_false') else ''
inner = f'<span class="badge badge-disabled"{tip}>No</span>'
return f'{td_open}{inner}</td>'
if render_fn == 'badge_recording_on_off':
if str(value).lower() in ('true', '1', 'yes'):
inner = '<span class="badge badge-enabled">Recording On</span>'
else:
inner = '<span class="badge badge-disabled">Recording Off</span>'
return f'{td_open}{inner}</td>'
if render_fn == 'badge_toggle':
if str(value).lower() in ('true', '1', 'yes', 'enabled'):
label = 'Enabled'; badge_cls = 'badge-enabled'
else:
label = 'Disabled'; badge_cls = 'badge-disabled'
if toggle_action and row_idx is not None and toggle_allowed:
inner = (
f'<form method="post" action="{e(toggle_action)}" class="form-inline">'
f'<input type="hidden" name="row_index" value="{row_idx}"/>'
'<button type="submit" class="btn-badge">'
f'<span class="badge {badge_cls}">{label}</span></button></form>'
)
else:
inner = f'<span class="badge {badge_cls}">{label}</span>'
return f'{td_open}{inner}</td>'
if render_fn == 'badge_active_inactive':
badges = {'active': 'badge-enabled', 'pending': 'badge-warning'}
cls = badges.get(value.lower(), 'badge-disabled')
return f'{td_open}<span class="badge {cls}">{e(value.title())}</span></td>'
if render_fn == 'raw_html':
return f'{td_open}{value}</td>'
if render_fn == 'tag_list':
try:
items = json.loads(value) if value.startswith('[') else [s.strip() for s in value.split(',')]
except Exception:
items = [value]
def _tag(t):
if isinstance(t, dict):
s, tooltip = str(t.get('n', '')).strip(), str(t.get('d', t.get('n', ''))).strip()
short = str(t['short']).strip() if 'short' in t else s.split('-')[0]
mini = str(t['mini']).strip() if 'mini' in t else (s[0] if s else '')
else:
s = tooltip = str(t).strip()
short = s.split('-')[0]
mini = s[0] if s else ''
if not s:
return ''
return (
f'<span class="tag" data-tooltip="{e(tooltip)}">'
f'<span class="tl-full">{e(s)}</span>'
f'<span class="tl-short">{e(short)}</span>'
f'<span class="tl-min">{e(mini)}</span>'
'</span>'
)
tags = ''.join(_tag(t) for t in items)
return f'{td_open}<div class="tag-list">{tags}</div></td>'
if render_fn == 'interface_status':
v = value.upper()
if v == 'INVALID':
inner = '<span class="badge badge-danger">Invalid</span>'
elif v == 'UP':
inner = '<span class="badge badge-enabled">Up</span>'
elif v == 'DOWN':
inner = '<span class="badge badge-warning">Down</span>'
else:
inner = f'<span class="badge badge-disabled">{e(value.title())}</span>'
return f'{td_open}{inner}</td>'
return f'{td_open}{e(value)}</td>'
# Layout renderer ===================================================
def render_layout(view_id, content_html, tokens):
css = _load_css()
level = _client_level()
has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
titlebar_html = f'<div class="titlebar"><span class="titlebar-brand">{WEB_APP_DISPLAY_NAME}</span></div>'
navbar_html = _render_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
footer_html = f'<footer class="footer">{WEB_APP_DISPLAY_NAME}</footer>'
page_hash = config_hash()
lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
existing_ids = tokens.get('EXISTING_VLAN_IDS_JSON', '[]')
existing_names = tokens.get('EXISTING_VLAN_NAMES_JSON', '[]')
existing_interfaces = tokens.get('EXISTING_VLAN_INTERFACES_JSON', '[]')
current_user = session.get('email_address', '')
pending = get_pending_entries()
my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
secs = _seconds_until_next_run()
locked = _is_locked()
lock_mtime = _lock_mtime()
other_bars = ''
seen_other_users = set()
for o_uuid, o_ts, o_cmd, o_user in pending:
if o_user == current_user:
continue
if o_user in seen_other_users:
continue
seen_other_users.add(o_user)
_display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
if locked and lock_mtime and o_ts < lock_mtime:
text = f'{_display_user}\'s changes are being applied now...'
cls = 'info-bar-warning info-bar-running'
else:
timing = _format_timing(secs)
text = f'{_display_user} has pending changes which will be applied {timing}.' if timing else f'{_display_user} has pending changes. The processing service is not running.'
cls = 'info-bar-warning'
other_bars += f'<div class="info-bar {cls}" data-apply-uuid="{e(o_uuid)}" data-apply-user="{e(o_user)}"><span>{text}</span></div>\n'
problem_bars = ''
if level >= LEVEL_RANK['viewer']:
try:
import json as _j
st = _j.load(open(f'{CONFIGS_DIR}/.health'))
grouped = {'error': [], 'warning': []}
for section in ('configurations', 'logs'):
for item in st.get(section, []):
if item.get('status') == 'problem':
sev = item.get('severity', 'error')
text = e(item.get('detail', item.get('name', '')))
grouped.setdefault(sev, []).append(text)
for item in st.get('services', []):
if item.get('status') == 'problem':
name = item.get('name', '')
utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
exp_parts, act_parts = [], []
if not item.get('active_ok'):
exp_parts.append(item.get('expected_active', 'active'))
act_parts.append(item.get('active', 'unknown'))
if not item.get('enabled_ok'):
exp_parts.append(item.get('expected_enabled', 'enabled'))
act_parts.append(item.get('enabled', 'unknown'))
detail = (
f"The {utype} `{name}` is expected to be "
f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
)
grouped.setdefault(item.get('severity', 'error'), []).append(e(detail))
has_problems = any(items for items in grouped.values())
fix_suffix = ''
fix_uuid = None
if has_problems:
if level < LEVEL_RANK['administrator']:
fix_suffix = 'Please contact an administrator.'
else:
fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
if _apply_changes_immediately():
if _is_locked():
mtime = _lock_mtime()
fix_suffix = ('Fix is being applied now...' if fix_ts and mtime and fix_ts < mtime
else 'Fix will be applied on the next run.')
else:
timing = _format_timing(_seconds_until_next_run())
fix_suffix = (f'Fix will be applied {timing}.' if timing
else 'Fix pending. The processing service is not running.')
else:
fix_suffix = ('Fix pending. Click <strong>Apply Now</strong> below to fix.'
if view_id == 'actions' else
'Fix pending. Visit the <strong>Actions</strong> page ASAP to apply fix.')
for sev, items in grouped.items():
if not items:
continue
cls = 'info-bar-danger' if sev == 'error' else 'info-bar-warning'
problems_list = ('<ul style="margin:0.25em 0;padding-left:1.25em">'
+ ''.join(f'<li>{d}</li>' for d in items)
+ '</ul>')
uuid_attr = (f' data-health-uuid="{e(fix_uuid)}"'
if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else '')
fix_html = (f'<div style="margin-top:0.5em"{uuid_attr}>{fix_suffix}</div>'
if fix_suffix else '')
content = ('<div style="width:100%">'
'<div style="font-weight:600;margin-bottom:0.25em">Health check - problems found:</div>'
+ problems_list + fix_html
+ '</div>')
problem_bars += f'<div class="info-bar {cls}">{content}</div>\n'
except Exception:
pass
pending_bar = ''
if has_pending_alert and not problem_bars and view_id != 'actions':
pending_bar = (
'<div class="info-bar info-bar-warning">'
'<span>You have actions pending. Please visit the <strong>Actions</strong> page.</span>'
'</div>\n'
)
return (
'<!DOCTYPE html>\n<html lang="en">\n<head>\n'
' <meta charset="UTF-8"/>\n'
' <meta name="viewport" content="width=device-width, initial-scale=1.0"/>\n'
f' <title>{WEB_APP_DISPLAY_NAME}</title>\n'
f' <style>{css}</style>\n'
'</head>\n<body>\n'
f'{titlebar_html}\n'
f'{navbar_html}\n'
f'<main class="main-content">\n{pending_bar}{problem_bars}{other_bars}{content_html}\n</main>\n'
f'{footer_html}\n'
f'<script>var CONFIG_HASH="{page_hash}";var LAN_IFACE="{lan_iface}";var VPN_VLAN_COUNT={vpn_count};var EXISTING_VLAN_IDS={existing_ids};var EXISTING_VLAN_NAMES={existing_names};var EXISTING_VLAN_INTERFACES={existing_interfaces};var APPLY_UUID={json.dumps(my_uuid)};</script>\n'
f'<script>{_inline_js()}</script>\n'
'</body>\n</html>'
)
def _render_navbar(active_view, level, tokens, pending_alert=False):
navbar_data = _load_json(f'{APP_DIR}/navbar_content.json')
left, right = [], []
for item in navbar_data.get('items', []):
req = item.get('client_requirement')
align = item.get('align', 'left')
if not _passes(req, level):
continue
frag = _render_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
(right if align == 'right' else left).append(frag)
return (
'<nav class="nav-bar">'
f'<div class="nav-left">{"".join(left)}</div>'
f'<div class="nav-right">{"".join(right)}</div>'
'</nav>'
)
def _render_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
req = item.get('client_requirement', inherited_req)
t = item.get('type', '')
if t in ('nav_item', 'nav_action'):
label = e(item.get('label', ''))
map_to = item.get('map_to', '')
action = item.get('action', '')
is_active = ' active' if map_to and map_to == active_view else ''
pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
if action:
return (
f'<form method="post" action="/action/{e(action)}" class="form-inline">'
f'<button type="submit" class="{cls}">{label}</button></form>'
)
if map_to:
return f'<a href="/{e(map_to)}" class="{cls}">{label}</a>'
return f'<span class="{cls}">{label}</span>'
if t == 'nav_menu':
raw_label = item.get('label', '')
if raw_label == '%MENU_LABEL%':
raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
label = e(raw_label)
children = ''
for child in item.get('items', []):
child_req = child.get('client_requirement', req)
if not _passes(child_req, level):
continue
children += _render_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
if not children:
return ''
return (
'<div class="nav-menu">'
f'<button class="nav-item nav-menu-trigger" aria-haspopup="true">{label}</button>'
f'<div class="nav-dropdown">{children}</div>'
'</div>'
)
return ''
# Inline JavaScript =================================================
def _inline_js():
try:
with open(f'{DATA_DIR}/app.js') as f:
return f.read()
except Exception:
return ''
# Routes ============================================================
@bp.route('/')
def index():
return _serve_view('overview')
@bp.route('/<page_name>')
def view(page_name):
return _serve_view(page_name)
def _serve_view(page_name):
view_def = _load_json(_os.path.join(_PAGES_DIR, page_name, 'content.json'))
if view_def is None:
from flask import abort
abort(404)
view_req = view_def.get('client_requirement')
level = _client_level()
if not _passes(view_req, level):
return redirect('/overview' if level > 0 else '/accountlogin')
tokens = collect_tokens()
flash_html = ''
for category, message in get_flashed_messages(with_categories=True):
variant = {'error': 'danger', 'warning': 'warning', 'success': 'success'}.get(category, 'info')
msg_html = message if isinstance(message, Markup) else e(message)
flash_html += f'<div class="info-bar info-bar-{variant} info-bar-flash"><span>{msg_html}</span></div>'
content_html = flash_html + render_items(view_def.get('items', []), tokens, view_req)
return render_layout(page_name, content_html, tokens)