linuxrouter/docker/routlin-dash/app/view_common.py
2026-06-02 00:47:03 -04:00

841 lines
32 KiB
Python

from flask import Blueprint, session, redirect, get_flashed_messages
from markupsafe import Markup
import json, re, subprocess, os, sys, glob, importlib.util as _importlib_util
import validation as validate
from datetime import datetime, timezone
from config_utils import (
config_hash, get_pending_entries, get_dashboard_pending, load_all_groups,
get_done_timestamps, queue_command, _find_cmd_in_queues, _entry_ts_from_queue,
_apply_changes_immediately, _seconds_until_next_run, _format_timing,
_is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME,
CONFIGS_DIR, DATA_DIR, WWW_DIR, ACCOUNTS_FILE, APP_DIR,
)
import factory
from factory import LEVEL_RANK, e, client_level, passes, build_items
PAGES_DIR = os.path.join(APP_DIR, 'pages')
NAVBAR_FILE = os.path.join(APP_DIR, 'navbar.json')
CSS_FILE = os.path.join(DATA_DIR, 'styles.css')
COMMON_JS_FILE = os.path.join(DATA_DIR, 'common.js')
BLOCKLISTS_DIR = os.path.join(CONFIGS_DIR, 'blocklists')
HEALTH_FILE = os.path.join(CONFIGS_DIR, '.health')
bp = Blueprint('view_page', __name__)
try:
import manuf as _manuf_mod
_mac_parser = _manuf_mod.MacParser()
except Exception:
_mac_parser = None
try:
from mac_vendor_lookup import MacLookup as _MacLookup
_mac_lookup = _MacLookup()
except Exception:
_mac_lookup = None
def _get_vendor(mac):
short, long = '', ''
if _mac_parser:
try:
short = _mac_parser.get_manuf(mac) or ''
except Exception:
pass
if _mac_lookup:
try:
long = _mac_lookup.lookup(mac) or ''
except Exception:
pass
return (short, long)
def _vendor_cell(vendor):
short, long = vendor
display = short if short else (long[:8] if long else '')
if not display:
return '-'
if long:
return f'<span data-vendor-long="{e(long)}">{e(display)}</span>'
return e(display)
# File loaders ======================================================
def load_json(path):
try:
with open(path) as f:
return json.load(f)
except Exception as ex:
print(f'[view_common] ERROR loading {path}: {ex}', file=sys.stderr)
return {}
def load_config(): return load_json(f'{CONFIGS_DIR}/config.json')
def load_ddns(): return load_config().get('ddns', {})
def load_accounts(): return load_json(ACCOUNTS_FILE)
def _load_css():
try:
with open(CSS_FILE) as f:
return f.read()
except Exception:
return ''
def _load_icon(name):
try:
with open(f'{WWW_DIR}/icons/{name}.svg') as f:
return f.read().strip()
except Exception:
return ''
# Shell helpers =====================================================
def run(cmd):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return r.stdout.strip()
except Exception:
return ''
def get_system_interfaces():
_EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth',
'tun', 'tap', 'ppp', 'virbr',
'podman', 'vnet', 'macvtap', 'fc-')
try:
return sorted(
n for n in os.listdir('/sys/class/net')
if not n.startswith(_EXCLUDE_PREFIXES)
and os.path.exists(f'/sys/class/net/{n}/device')
)
except Exception:
return []
def iface_info(iface):
base = f'/sys/class/net/{iface}'
def _rd(path):
try:
with open(f'{base}/{path}') as f:
return f.read().strip()
except Exception:
return None
wireless = os.path.isdir(f'{base}/wireless')
state = (_rd('operstate') or 'unknown').upper()
if state == 'UNKNOWN':
state = 'UP'
carrier_raw = _rd('carrier')
carrier = (carrier_raw == '1') if carrier_raw is not None else None
speed_raw = _rd('speed')
try:
mbps = int(speed_raw)
if mbps <= 0:
speed = None
elif mbps >= 1000 and mbps % 1000 == 0:
speed = f'{mbps // 1000} Gbps'
else:
speed = f'{mbps} Mbps'
except (TypeError, ValueError):
speed = None
mac = _rd('address')
perm_mac = _rd('perm_address')
if perm_mac and perm_mac == '00:00:00:00:00:00':
perm_mac = None
def _int(val):
try: return int(val) if val else None
except ValueError: return None
return {
'name': iface,
'wireless': wireless,
'state': state,
'carrier': carrier,
'speed': speed,
'mtu': _rd('mtu'),
'min_mtu': _int(_rd('min_mtu')),
'max_mtu': _int(_rd('max_mtu')),
'mac': mac,
'perm_mac': perm_mac,
}
def iface_status(iface):
"""Return link state for iface by reading /sys/class/net/<iface>/operstate.
Returns INVALID if the interface does not exist, otherwise UP/DOWN/UNKNOWN/etc."""
if not iface:
return 'INVALID'
safe = re.sub(r'[^A-Za-z0-9._-]', '', iface)
if not safe:
return 'INVALID'
try:
with open(f'/sys/class/net/{safe}/operstate') as f:
state = f.read().strip().upper()
return state if state else 'UP'
except OSError:
return 'INVALID'
def resolve_iface(vlan, cfg):
"""Compute interface name from is_vpn + stored vlan_id + general.lan_interface."""
if vlan.get('is_vpn'):
wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')]
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
return f'wg{idx}'
lan = cfg.get('network_interfaces', {}).get('lan_interface', 'eth0')
vid = vlan.get('vlan_id') or 1
return lan if vid == 1 else f'{lan}.{vid}'
# Time and format helpers ===========================================
def fmt_timestamp(ts):
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
except Exception:
return '-'
def relative_time(ts1, ts2, short=False):
try:
diff = abs(int(ts1) - int(ts2))
if diff < 60:
return f'{diff}s' if short else f'{diff} second{"s" if diff != 1 else ""}'
m = diff // 60
if m < 60:
return f'{m}m' if short else f'{m} minute{"s" if m != 1 else ""}'
h, rem_m = divmod(m, 60)
if h < 24:
if short:
return f'{h}h {rem_m}m' if rem_m else f'{h}h'
return f'{h}h {rem_m}m' if rem_m else f'{h} hour{"s" if h != 1 else ""}'
d = h // 24
if d < 365:
return f'{d}d' if short else f'{d} day{"s" if d != 1 else ""}'
y = d // 365
return f'{y}y' if short else f'{y} year{"s" if y != 1 else ""}'
except Exception:
return ''
def fmt_bytes(n):
for unit in ('B', 'KB', 'MB', 'GB'):
if n < 1024:
return f'{n:.1f} {unit}'
n /= 1024
return f'{n:.1f} TB'
# Live data loaders =================================================
def _parse_lease_secs(s):
s = str(s).strip().lower()
try:
if s.endswith('h'): return int(s[:-1]) * 3600
if s.endswith('m'): return int(s[:-1]) * 60
if s.endswith('d'): return int(s[:-1]) * 86400
except ValueError:
pass
return None
def live_dhcp_leases():
rows = []
now = int(datetime.now(tz=timezone.utc).timestamp())
cfg = load_config()
vlans = cfg.get('vlans', [])
vlan_lease_secs = {
v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', ''))
for v in vlans if v.get('name')
}
mac_to_res = {
r['mac'].lower(): r['hostname']
for r in cfg.get('dhcp_reservations', [])
if r.get('mac') and r.get('hostname')
}
for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'):
stem = os.path.basename(leases_file)
vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')]
lease_secs = vlan_lease_secs.get(vlan_name)
try:
with open(leases_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) < 4:
continue
expiry = int(parts[0])
if expiry < now:
continue
obtained_ts = (expiry - lease_secs) if lease_secs else None
renews_ts = (expiry - lease_secs // 2) if lease_secs else None
if obtained_ts is None:
last_active = '-'
elif obtained_ts <= now:
last_active = relative_time(obtained_ts, now, short=True) + ' ago'
elif renews_ts and renews_ts > now:
last_active = 'ETA ' + relative_time(renews_ts, now, short=True)
else:
last_active = 'ETA soon'
mac_norm = parts[1].lower()
device_h = parts[3] if parts[3] != '*' else None
res_h = mac_to_res.get(mac_norm)
if res_h and device_h and device_h.lower() != res_h.lower():
hostname_html = f'<strong>{e(res_h)}</strong><br/>({e(device_h)})'
elif res_h:
hostname_html = f'<strong>{e(res_h)}</strong>'
elif device_h:
hostname_html = e(device_h)
else:
hostname_html = '-'
rows.append({
'hostname': hostname_html,
'ip_address': parts[2],
'mac_address': parts[1],
'vendor': _vendor_cell(_get_vendor(parts[1])),
'vlan_name': vlan_name,
'last_active': last_active,
'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
})
except Exception:
pass
return rows
def live_vpn_sessions():
rows = []
out = run('wg show all dump 2>/dev/null')
for line in out.splitlines():
parts = line.split('\t')
if len(parts) == 9:
interface, _pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
rows.append({
'peer_name': _pubkey[:16] + '...',
'interface': interface,
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
'endpoint': endpoint if endpoint != '(none)' else '-',
'last_handshake': fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
'rx_bytes': fmt_bytes(int(rx)) if rx.isdigit() else '-',
'tx_bytes': fmt_bytes(int(tx)) if tx.isdigit() else '-',
})
return rows
# Config datasource =================================================
def config_datasource(name):
cfg = load_config()
vlans = cfg.get('vlans', [])
if name == 'interfaces':
gen = cfg.get('network_interfaces', {})
wan = gen.get('wan_interface', '')
lan = gen.get('lan_interface', '')
return [
{'iface_type': 'WAN', 'interface': wan, 'status': iface_status(wan)},
{'iface_type': 'LAN', 'interface': lan, 'status': iface_status(lan)},
]
if name == 'banned_ips':
return cfg.get('banned_ips', [])
if name == 'host_overrides':
return cfg.get('host_overrides', [])
if name == 'blocklists':
rows = []
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
row = dict(bl)
bl_path = os.path.join(BLOCKLISTS_DIR, bl.get('save_as', ''))
try:
with open(bl_path) as f:
row['domain_count'] = str(sum(1 for _ in f))
row['last_updated'] = fmt_timestamp(int(os.path.getmtime(bl_path)))
except Exception:
row['domain_count'] = '-'
row['last_updated'] = '-'
rows.append(row)
return rows
if name == 'vlans':
bl_desc = {
b['name']: b.get('description', b['name'])
for b in cfg.get('dns_blocking', {}).get('blocklists', [])
if 'name' in b
}
rows = []
for v in sorted(vlans, key=lambda x: x.get('vlan_id') or 0):
row = {k: v.get(k) for k in (
'name', 'subnet', 'subnet_mask', 'radius_default',
'mdns_reflection', 'is_vpn', 'dnsmasq_log_queries'
)}
row['vlan_id'] = v.get('vlan_id')
row['interface'] = resolve_iface(v, cfg)
row['use_blocklists'] = json.dumps([
{'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', [])
])
prefix = v.get('subnet_mask', 24)
n_octets = 1 if prefix >= 24 else 2 if prefix >= 16 else 3 if prefix >= 8 else 4
row['server_identity_ips'] = json.dumps([
{
'n': s['ip'],
'd': ' | '.join(filter(None, [s['ip'], s.get('description'), s.get('hostname')])),
'short': '.' + '.'.join(s['ip'].split('.')[-n_octets:]),
'mini': '.' + '.'.join(s['ip'].split('.')[-n_octets:]),
}
for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_descriptions'] = json.dumps([
s.get('description', '') for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_hostnames'] = json.dumps([
s.get('hostname', '') for s in v.get('server_identities', []) if s.get('ip')
])
row['server_identity_gateway'] = (
v.get('dhcp_information', {}).get('explicit_overrides', {}).get('gateway', '')
)
dns = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('dns_servers', [])
row['server_identity_dns_servers'] = '\n'.join(dns) if isinstance(dns, list) else str(dns or '')
ntp = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('ntp_servers', [])
row['server_identity_ntp_servers'] = '\n'.join(ntp) if isinstance(ntp, list) else str(ntp or '')
row['gateway'] = row['server_identity_gateway']
row['dns_servers'] = row['server_identity_dns_servers']
row['ntp_servers'] = row['server_identity_ntp_servers']
row['dns_servers_override'] = 1 if row['server_identity_dns_servers'] else 0
row['ntp_servers_override'] = 1 if row['server_identity_ntp_servers'] else 0
dhi = v.get('dhcp_information', {})
row['dhcp_pool_start'] = dhi.get('dynamic_pool_start', '')
row['dhcp_pool_end'] = dhi.get('dynamic_pool_end', '')
lt = dhi.get('lease_time', '')
if lt and len(lt) > 1 and lt[:-1].isdigit() and lt[-1] in 'mhd':
row['dhcp_lease_time'] = lt[:-1]
row['dhcp_lease_unit'] = {'m': 'minutes', 'h': 'hours', 'd': 'days'}[lt[-1]]
else:
row['dhcp_lease_time'] = ''
row['dhcp_lease_unit'] = ''
row['dhcp_domain'] = dhi.get('domain', '')
row['server_identities_json'] = json.dumps(v.get('server_identities', []))
rows.append(row)
return rows
if name == 'inter_vlan_exceptions':
return cfg.get('inter_vlan_exceptions', [])
if name == 'port_forwarding':
return cfg.get('port_forwarding', [])
if name == 'port_wrangling':
rows = []
for r in cfg.get('port_wrangling', []):
row = dict(r)
row['vlan_name'] = r.get('vlan', '-')
rows.append(row)
return rows
if name == 'dhcp_reservations':
rows = []
for res in cfg.get('dhcp_reservations', []):
row = dict(res)
row['vlan_name'] = res.get('vlan', '-')
rows.append(row)
return rows
if name == 'ddns_providers':
ddns = load_ddns()
rows = []
for p in ddns.get('providers', []):
row = dict(p)
ptype = p.get('provider', '').lower()
if ptype == 'noip':
row['credentials'] = (
'<div style="line-height:1.3">'
f'<b>U:</b> {e(p.get("username", "-"))}<br/>'
'<b>P:</b> &bull;&bull;&bull;&bull;&bull;&bull;</div>'
)
elif ptype in ('cloudflare', 'duckdns'):
tok = p.get('api_token', '')
row['credentials'] = f'<b>API Token:</b> {e(tok[:20])}...' if tok else '(not set)'
else:
row['credentials'] = '-'
row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', [])))
rows.append(row)
return rows
if name == 'accounts':
rows = []
for acct in load_accounts().get('accounts', []):
row = dict(acct)
row['account_status'] = 'active' if acct.get('hashed_password') else 'pending'
rows.append(row)
return rows
if name == 'vpn_peers':
rows = []
wg_sorted = sorted(
[v for v in vlans if v.get('is_vpn')],
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
)
for i, vlan in enumerate(wg_sorted):
iface = f'wg{i}'
vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})'
for peer in vlan.get('peers', []):
row = dict(peer)
row['vlan_display'] = vlan_display
row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
rows.append(row)
return rows
return []
def load_datasource(spec):
if spec.startswith('live:'):
name = spec[5:]
if name == 'dhcp_leases': return live_dhcp_leases()
if name == 'vpn_sessions': return live_vpn_sessions()
return []
if spec.startswith('config:'):
return config_datasource(spec[7:])
return []
factory.load_datasource = load_datasource
# Shared IP/DDNS helpers ============================================
def _read_cached_ip():
"""Return (ip, mtime) from the most recent .ddns-last-ip-* file, or ('', None)."""
try:
best_ip, best_mtime = '', 0.0
for fname in os.listdir(CONFIGS_DIR):
if fname.startswith('.ddns-last-ip-'):
path = f'{CONFIGS_DIR}/{fname}'
mtime = os.path.getmtime(path)
if mtime > best_mtime:
ip = open(path).read().strip()
if ip:
best_ip, best_mtime = ip, mtime
return best_ip, (best_mtime if best_ip else None)
except Exception:
return '', None
def public_ip_info(ddns_cfg):
"""Return (ip_str, domains_sub, last_obtained_str) for stat cards."""
enabled_p = [p for p in ddns_cfg.get('providers', []) if p.get('enabled', True)]
all_hosts = []
for p in enabled_p:
all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
domains_sub = ', '.join(all_hosts)
ip, mtime = _read_cached_ip()
last_obtained = f'Obtained: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago' if mtime else ''
if ip:
return ip, domains_sub, last_obtained
return 'Offline', domains_sub, ''
def ddns_last_checked():
try:
mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
return f'Last checked: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago'
except OSError:
return 'Last checked: ---'
# Layout tokens =====================================================
def collect_layout_tokens(cfg):
vlans = cfg.get('vlans', [])
net = cfg.get('network_interfaces', {})
return {
'GENERAL_LAN_INTERFACE': str(net.get('lan_interface', '-')),
'VPN_VLAN_COUNT': str(sum(1 for v in vlans if v.get('is_vpn'))),
}
# Layout renderer ===================================================
def render_layout(view_id, content_html, tokens, page_name=None):
css = _load_css()
level = client_level()
has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
titlebar_html = f'<div class="titlebar"><span class="titlebar-brand">{WEB_APP_DISPLAY_NAME}</span></div>'
navbar_html = build_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
footer_html = f'<footer class="footer">{WEB_APP_DISPLAY_NAME}</footer>'
page_hash = config_hash()
lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
current_user = session.get('email_address', '')
pending = get_pending_entries()
my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
secs = _seconds_until_next_run()
locked = _is_locked()
lock_mtime = _lock_mtime()
other_bars = ''
seen_other_users = set()
for o_uuid, o_ts, o_cmd, o_user in pending:
if o_user == current_user:
continue
if o_user in seen_other_users:
continue
seen_other_users.add(o_user)
display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
if locked and lock_mtime and o_ts < lock_mtime:
text = f'{display_user}\'s changes are being applied now...'
cls = 'info-bar-warning info-bar-running'
else:
timing = _format_timing(secs)
text = (
f'{display_user} has pending changes which will be applied {timing}.'
if timing else
f'{display_user} has pending changes. The processing service is not running.'
)
cls = 'info-bar-warning'
other_bars += f'<div class="info-bar {cls}" data-apply-uuid="{e(o_uuid)}" data-apply-user="{e(o_user)}"><span>{text}</span></div>\n'
problem_bars = ''
if level >= LEVEL_RANK['viewer']:
try:
st = json.load(open(HEALTH_FILE))
problems = []
for section in ('configurations', 'logs'):
for item in st.get(section, []):
if item.get('status') == 'problem':
problems.append(e(item.get('detail', item.get('name', ''))))
for item in st.get('services', []):
if item.get('status') == 'problem':
name = item.get('name', '')
utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
exp_parts, act_parts = [], []
if not item.get('active_ok'):
exp_parts.append(item.get('expected_active', 'active'))
act_parts.append(item.get('active', 'unknown'))
if not item.get('enabled_ok'):
exp_parts.append(item.get('expected_enabled', 'enabled'))
act_parts.append(item.get('enabled', 'unknown'))
problems.append(e(
f"The {utype} `{name}` is expected to be "
f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
))
has_problems = bool(problems)
fix_suffix = ''
fix_uuid = None
if has_problems:
if level < LEVEL_RANK['administrator']:
fix_suffix = 'Please contact an administrator.'
else:
fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
if _apply_changes_immediately():
if _is_locked():
mtime = _lock_mtime()
fix_suffix = (
'Fix is being applied now...'
if fix_ts and mtime and fix_ts < mtime
else 'Fix will be applied on the next run.'
)
else:
timing = _format_timing(_seconds_until_next_run())
fix_suffix = (
f'Fix will be applied {timing}.'
if timing else
'Fix pending. The processing service is not running.'
)
else:
fix_suffix = (
'Fix pending. Click <strong>Apply Now</strong> below to fix.'
if view_id == 'actions' else
'Fix pending. Visit the <strong>Actions</strong> page ASAP to apply fix.'
)
if problems:
problems_list = (
'<ul style="margin:0.25em 0;padding-left:1.25em">'
+ ''.join(f'<li>{d}</li>' for d in problems)
+ '</ul>'
)
uuid_attr = (
f' data-health-uuid="{e(fix_uuid)}"'
if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else ''
)
fix_html = (
f'<div style="margin-top:0.5em"{uuid_attr}>{fix_suffix}</div>'
if fix_suffix else ''
)
content = (
'<div style="width:100%">'
'<div style="font-weight:600;margin-bottom:0.25em">Health check - problems found:</div>'
+ problems_list + fix_html
+ '</div>'
)
problem_bars += f'<div class="info-bar info-bar-danger">{content}</div>\n'
except Exception:
pass
pending_bar = ''
if has_pending_alert and not problem_bars and view_id != 'actions':
pending_bar = (
'<div class="info-bar info-bar-warning">'
'<span>You have actions pending. Please visit the <strong>Actions</strong> page.</span>'
'</div>\n'
)
return (
'<!DOCTYPE html>\n<html lang="en">\n<head>\n'
' <meta charset="UTF-8"/>\n'
' <meta name="viewport" content="width=device-width, initial-scale=1.0"/>\n'
f' <title>{WEB_APP_DISPLAY_NAME}</title>\n'
f' <style>{css}</style>\n'
'</head>\n<body>\n'
f'{titlebar_html}\n'
f'{navbar_html}\n'
f'<main class="main-content">\n{pending_bar}{problem_bars}{other_bars}{content_html}\n</main>\n'
f'{footer_html}\n'
f'<script>var CONFIG_HASH="{page_hash}";var LAN_IFACE="{lan_iface}";var VPN_VLAN_COUNT={vpn_count};var APPLY_UUID={json.dumps(my_uuid)};</script>\n'
f'<script>{_inline_js(page_name)}</script>\n'
'</body>\n</html>'
)
def build_navbar(active_view, level, tokens, pending_alert=False):
navbar_data = load_json(NAVBAR_FILE)
left, right = [], []
for item in navbar_data.get('items', []):
req = item.get('client_requirement')
align = item.get('align', 'left')
if not passes(req, level):
continue
frag = build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
(right if align == 'right' else left).append(frag)
return (
'<nav class="nav-bar">'
f'<div class="nav-left">{"".join(left)}</div>'
f'<div class="nav-right">{"".join(right)}</div>'
'</nav>'
)
def build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
req = item.get('client_requirement', inherited_req)
t = item.get('type', '')
if t in ('nav_item', 'nav_action'):
label = e(item.get('label', ''))
map_to = item.get('map_to', '')
action = item.get('action', '')
is_active = ' active' if map_to and map_to == active_view else ''
pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
if action:
return (
f'<form method="post" action="/action/{e(action)}" class="form-inline">'
f'<button type="submit" class="{cls}">{label}</button></form>'
)
if map_to:
return f'<a href="/{e(map_to)}" class="{cls}">{label}</a>'
return f'<span class="{cls}">{label}</span>'
if t == 'nav_menu':
raw_label = item.get('label', '')
if raw_label == '%MENU_LABEL%':
raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
label = e(raw_label)
children = ''
for child in item.get('items', []):
child_req = child.get('client_requirement', req)
if not passes(child_req, level):
continue
children += build_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
if not children:
return ''
return (
'<div class="nav-menu">'
f'<button class="nav-item nav-menu-trigger" aria-haspopup="true">{label}</button>'
f'<div class="nav-dropdown">{children}</div>'
'</div>'
)
return ''
# Inline JavaScript =================================================
def _inline_js(page_name=None):
big_validate_js = factory.build_big_validate()
try:
with open(COMMON_JS_FILE) as f:
app_js = f.read()
except Exception:
app_js = ''
page_js = ''
if page_name:
page_js_path = os.path.join(PAGES_DIR, page_name, 'page.js')
try:
with open(page_js_path) as f:
page_js = f.read()
except Exception:
pass
return big_validate_js + '\n' + app_js + ('\n' + page_js if page_js else '')
# Dynamic page view loader ==========================================
_page_view_cache = {}
def _load_page_view(page_name):
if page_name not in _page_view_cache:
path = os.path.join(PAGES_DIR, page_name, 'view.py')
if not os.path.exists(path):
_page_view_cache[page_name] = None
else:
spec = _importlib_util.spec_from_file_location(f'page_view_{page_name}', path)
mod = _importlib_util.module_from_spec(spec)
spec.loader.exec_module(mod)
_page_view_cache[page_name] = mod
return _page_view_cache[page_name]
# Routes ============================================================
@bp.route('/')
def index():
return serve_view('overview')
@bp.route('/<page_name>')
def view(page_name):
return serve_view(page_name)
def serve_view(page_name):
view_def = load_json(os.path.join(PAGES_DIR, page_name, 'content.json'))
if not view_def:
from flask import abort
abort(404)
view_req = view_def.get('client_requirement')
level = client_level()
if not passes(view_req, level):
return redirect('/overview' if level > 0 else '/accountlogin')
cfg = load_config()
tokens = collect_layout_tokens(cfg)
# Auto-queue health fix for every administrator page load
if level >= LEVEL_RANK['administrator']:
try:
st = json.load(open(HEALTH_FILE))
has_problems = any(
item.get('status') == 'problem'
for section in ('configurations', 'logs', 'services')
for item in st.get(section, [])
)
if has_problems:
fix_uuid, _ = _find_cmd_in_queues('fix problems')
if fix_uuid is None:
queue_command('fix problems', user=session.get('email_address', ''))
except Exception:
pass
page_view = _load_page_view(page_name)
if page_view and hasattr(page_view, 'collect_tokens'):
tokens.update(page_view.collect_tokens(cfg))
if page_name == 'radius' and not os.path.exists(f'{CONFIGS_DIR}/.radius-secret'):
queue_command('gen radius')
flash_html = ''
for category, message in get_flashed_messages(with_categories=True):
variant = {'error': 'danger', 'warning': 'warning', 'success': 'success'}.get(category, 'info')
msg_html = message if isinstance(message, Markup) else e(message)
flash_html += f'<div class="info-bar info-bar-{variant} info-bar-flash"><span>{msg_html}</span></div>'
content_html = flash_html + build_items(view_def.get('items', []), tokens, view_req)
return render_layout(page_name, content_html, tokens, page_name=page_name)