1130 lines
47 KiB
Python
1130 lines
47 KiB
Python
from flask import Blueprint, session, redirect, get_flashed_messages
|
|
from markupsafe import Markup
|
|
import json, re, subprocess, os, sys
|
|
import sanitize
|
|
import validation as validate
|
|
from datetime import datetime, timezone
|
|
from config_utils import config_hash, get_pending_entries, get_dashboard_pending, get_dashboard_done, load_all_groups, revert_group, get_done_timestamps, queue_command, _find_cmd_in_queues, _entry_ts_from_queue, _apply_changes_immediately, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME, CONFIGS_DIR, DATA_DIR, WWW_DIR, ACCOUNTS_FILE, APP_DIR
|
|
import factory
|
|
from factory import LEVEL_RANK, e, client_level, passes, build_items, build_snap_val, snap_expand_row
|
|
PAGES_DIR = os.path.join(APP_DIR, 'pages')
|
|
NAVBAR_FILE = os.path.join(APP_DIR, 'navbar.json')
|
|
CSS_FILE = os.path.join(DATA_DIR, 'styles.css')
|
|
COMMON_JS_FILE = os.path.join(DATA_DIR, 'common.js')
|
|
BLOCKLISTS_DIR = os.path.join(CONFIGS_DIR, 'blocklists')
|
|
HEALTH_FILE = os.path.join(CONFIGS_DIR, '.health')
|
|
|
|
bp = Blueprint('view_page', __name__)
|
|
|
|
|
|
# File loaders ======================================================
|
|
|
|
def load_json(path):
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except Exception as ex:
|
|
print(f'[view_page] ERROR loading {path}: {ex}', file=sys.stderr)
|
|
return {}
|
|
|
|
def load_config(): return load_json(f'{CONFIGS_DIR}/config.json')
|
|
def load_ddns(): return load_config().get('ddns', {})
|
|
def load_accounts(): return load_json(ACCOUNTS_FILE)
|
|
|
|
def _load_css():
|
|
try:
|
|
with open(CSS_FILE) as f:
|
|
return f.read()
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
def _load_icon(name):
|
|
try:
|
|
with open(f'{WWW_DIR}/icons/{name}.svg') as f:
|
|
return f.read().strip()
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
|
|
|
|
# Shell helper ======================================================
|
|
|
|
def run(cmd):
|
|
try:
|
|
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
|
|
return r.stdout.strip()
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
def get_system_interfaces():
|
|
_EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth',
|
|
'tun', 'tap', 'ppp', 'virbr',
|
|
'podman', 'vnet', 'macvtap', 'fc-')
|
|
try:
|
|
return sorted(
|
|
n for n in os.listdir('/sys/class/net')
|
|
if not n.startswith(_EXCLUDE_PREFIXES)
|
|
and os.path.exists(f'/sys/class/net/{n}/device')
|
|
)
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def iface_info(iface):
|
|
base = f'/sys/class/net/{iface}'
|
|
def _rd(path):
|
|
try:
|
|
with open(f'{base}/{path}') as f:
|
|
return f.read().strip()
|
|
except Exception:
|
|
return None
|
|
wireless = os.path.isdir(f'{base}/wireless')
|
|
state = (_rd('operstate') or 'unknown').upper()
|
|
if state == 'UNKNOWN':
|
|
state = 'UP'
|
|
carrier_raw = _rd('carrier')
|
|
carrier = (carrier_raw == '1') if carrier_raw is not None else None
|
|
speed_raw = _rd('speed')
|
|
try:
|
|
mbps = int(speed_raw)
|
|
if mbps <= 0:
|
|
speed = None
|
|
elif mbps >= 1000 and mbps % 1000 == 0:
|
|
speed = f'{mbps // 1000} Gbps'
|
|
else:
|
|
speed = f'{mbps} Mbps'
|
|
except (TypeError, ValueError):
|
|
speed = None
|
|
mac = _rd('address')
|
|
perm_mac = _rd('perm_address')
|
|
if perm_mac and perm_mac == '00:00:00:00:00:00':
|
|
perm_mac = None
|
|
# DEBUG
|
|
# if not perm_mac: perm_mac = 'de:ad:be:ef:f0:0d'
|
|
def _int(val):
|
|
try: return int(val) if val else None
|
|
except ValueError: return None
|
|
return {
|
|
'name': iface,
|
|
'wireless': wireless,
|
|
'state': state,
|
|
'carrier': carrier,
|
|
'speed': speed,
|
|
'mtu': _rd('mtu'),
|
|
'min_mtu': _int(_rd('min_mtu')),
|
|
'max_mtu': _int(_rd('max_mtu')),
|
|
'mac': mac,
|
|
'perm_mac': perm_mac,
|
|
}
|
|
|
|
|
|
def iface_status(iface):
|
|
"""Return link state for iface by reading /sys/class/net/<iface>/operstate.
|
|
Returns INVALID if the interface does not exist, otherwise UP/DOWN/UNKNOWN/etc."""
|
|
if not iface:
|
|
return 'INVALID'
|
|
safe = re.sub(r'[^A-Za-z0-9._-]', '', iface)
|
|
if not safe:
|
|
return 'INVALID'
|
|
try:
|
|
with open(f'/sys/class/net/{safe}/operstate') as f:
|
|
state = f.read().strip().upper()
|
|
return state if state else 'UP'
|
|
except OSError:
|
|
return 'INVALID'
|
|
|
|
|
|
def resolve_iface(vlan, cfg):
|
|
"""Compute interface name from is_vpn + stored vlan_id + general.lan_interface."""
|
|
if vlan.get('is_vpn'):
|
|
wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')]
|
|
wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
|
|
idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
|
|
return f'wg{idx}'
|
|
lan = cfg.get('network_interfaces', {}).get('lan_interface', 'eth0')
|
|
vid = vlan.get('vlan_id') or 1
|
|
return lan if vid == 1 else f'{lan}.{vid}'
|
|
|
|
|
|
# Live data loaders =================================================
|
|
|
|
def live_dhcp_leases():
|
|
rows = []
|
|
leases_file = '/var/lib/misc/dnsmasq.leases'
|
|
try:
|
|
with open(leases_file) as f:
|
|
for line in f:
|
|
parts = line.strip().split()
|
|
if len(parts) >= 4:
|
|
rows.append({
|
|
'hostname': parts[3] if parts[3] != '*' else '-',
|
|
'ip_address': parts[2],
|
|
'mac_address': parts[1],
|
|
'vlan_name': _vlan_name_for_ip(parts[2]),
|
|
'expires': fmt_timestamp(int(parts[0])),
|
|
})
|
|
except Exception:
|
|
pass
|
|
return rows
|
|
|
|
def _vlan_name_for_ip(ip):
|
|
import ipaddress
|
|
for vlan in load_config().get('vlans', []):
|
|
subnet = vlan.get('subnet', '')
|
|
mask = vlan.get('subnet_mask', 24)
|
|
if not subnet:
|
|
continue
|
|
try:
|
|
if ipaddress.ip_address(ip) in ipaddress.ip_network(f'{subnet}/{mask}', strict=False):
|
|
return vlan.get('name', '-')
|
|
except Exception:
|
|
pass
|
|
return '-'
|
|
|
|
def fmt_timestamp(ts):
|
|
try:
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
|
|
except Exception:
|
|
return '-'
|
|
|
|
def relative_time(ts):
|
|
try:
|
|
diff = int(datetime.now(tz=timezone.utc).timestamp()) - int(ts)
|
|
if diff < 60:
|
|
n = max(0, diff)
|
|
return f'{n} second{"s" if n != 1 else ""} ago'
|
|
m = diff // 60
|
|
if m < 60:
|
|
return f'{m} minute{"s" if m != 1 else ""} ago'
|
|
h = m // 60
|
|
if h < 24:
|
|
return f'{h} hour{"s" if h != 1 else ""} ago'
|
|
d = h // 24
|
|
if d < 365:
|
|
return f'{d} day{"s" if d != 1 else ""} ago'
|
|
y = d // 365
|
|
return f'{y} year{"s" if y != 1 else ""} ago'
|
|
except Exception:
|
|
return ''
|
|
|
|
def live_vpn_sessions():
|
|
rows = []
|
|
out = run('wg show all dump 2>/dev/null')
|
|
for line in out.splitlines():
|
|
parts = line.split('\t')
|
|
if len(parts) == 9:
|
|
interface, _pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
|
|
rows.append({
|
|
'peer_name': _pubkey[:16] + '...',
|
|
'interface': interface,
|
|
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
|
|
'endpoint': endpoint if endpoint != '(none)' else '-',
|
|
'last_handshake': fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
|
|
'rx_bytes': fmt_bytes(int(rx)) if rx.isdigit() else '-',
|
|
'tx_bytes': fmt_bytes(int(tx)) if tx.isdigit() else '-',
|
|
})
|
|
return rows
|
|
|
|
def fmt_bytes(n):
|
|
for unit in ('B', 'KB', 'MB', 'GB'):
|
|
if n < 1024:
|
|
return f'{n:.1f} {unit}'
|
|
n /= 1024
|
|
return f'{n:.1f} TB'
|
|
|
|
|
|
# Config data loaders ===============================================
|
|
|
|
def config_datasource(name):
|
|
cfg = load_config()
|
|
vlans = cfg.get('vlans', [])
|
|
|
|
if name == 'interfaces':
|
|
gen = cfg.get('network_interfaces', {})
|
|
wan = gen.get('wan_interface', '')
|
|
lan = gen.get('lan_interface', '')
|
|
return [
|
|
{'iface_type': 'WAN', 'interface': wan, 'status': iface_status(wan)},
|
|
{'iface_type': 'LAN', 'interface': lan, 'status': iface_status(lan)},
|
|
]
|
|
|
|
if name == 'banned_ips':
|
|
return cfg.get('banned_ips', [])
|
|
|
|
if name == 'host_overrides':
|
|
return cfg.get('host_overrides', [])
|
|
|
|
if name == 'blocklists':
|
|
rows = []
|
|
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
|
|
row = dict(bl)
|
|
bl_path = os.path.join(BLOCKLISTS_DIR, bl.get("save_as", ""))
|
|
try:
|
|
with open(bl_path) as f:
|
|
row['domain_count'] = str(sum(1 for _ in f))
|
|
row['last_updated'] = fmt_timestamp(int(os.path.getmtime(bl_path)))
|
|
except Exception:
|
|
row['domain_count'] = '-'
|
|
row['last_updated'] = '-'
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if name == 'vlans':
|
|
bl_desc = {b['name']: b.get('description', b['name']) for b in cfg.get('dns_blocking', {}).get('blocklists', []) if 'name' in b}
|
|
rows = []
|
|
for v in sorted(vlans, key=lambda x: x.get('vlan_id') or 0):
|
|
row = {k: v.get(k) for k in ('name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn', 'dnsmasq_log_queries')}
|
|
row['vlan_id'] = v.get('vlan_id')
|
|
row['interface'] = resolve_iface(v, cfg)
|
|
row['use_blocklists'] = json.dumps([
|
|
{'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', [])
|
|
])
|
|
_prefix = v.get('subnet_mask', 24)
|
|
_n_octets = 1 if _prefix >= 24 else 2 if _prefix >= 16 else 3 if _prefix >= 8 else 4
|
|
row['server_identity_ips'] = json.dumps([
|
|
{
|
|
'n': s['ip'],
|
|
'd': ' | '.join(filter(None, [s['ip'], s.get('description'), s.get('hostname')])),
|
|
'short': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
|
|
'mini': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
|
|
}
|
|
for s in v.get('server_identities', []) if s.get('ip')
|
|
])
|
|
row['server_identity_descriptions'] = json.dumps([
|
|
s.get('description', '') for s in v.get('server_identities', []) if s.get('ip')
|
|
])
|
|
row['server_identity_hostnames'] = json.dumps([
|
|
s.get('hostname', '') for s in v.get('server_identities', []) if s.get('ip')
|
|
])
|
|
row['server_identity_gateway'] = (
|
|
v.get('dhcp_information', {}).get('explicit_overrides', {}).get('gateway', '')
|
|
)
|
|
_dns = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('dns_server', [])
|
|
row['server_identity_dns_server'] = '\n'.join(_dns) if isinstance(_dns, list) else str(_dns or '')
|
|
_ntp = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('ntp_server', [])
|
|
row['server_identity_ntp_server'] = '\n'.join(_ntp) if isinstance(_ntp, list) else str(_ntp or '')
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if name == 'inter_vlan_exceptions':
|
|
return cfg.get('inter_vlan_exceptions', [])
|
|
|
|
if name == 'port_forwarding':
|
|
return cfg.get('port_forwarding', [])
|
|
|
|
if name == 'dhcp_reservations':
|
|
rows = []
|
|
for vlan in vlans:
|
|
for res in vlan.get('reservations', []):
|
|
row = dict(res)
|
|
row['vlan_name'] = vlan.get('name', '-')
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if name == 'ddns_providers':
|
|
ddns = load_ddns()
|
|
rows = []
|
|
for p in ddns.get('providers', []):
|
|
row = dict(p)
|
|
ptype = p.get('provider', '').lower()
|
|
if ptype == 'noip':
|
|
row['credentials'] = (
|
|
'<div style="line-height:1.3">'
|
|
f'<b>U:</b> {e(p.get("username", "-"))}<br/>'
|
|
'<b>P:</b> ••••••</div>'
|
|
)
|
|
elif ptype in ('cloudflare', 'duckdns'):
|
|
tok = p.get('api_token', '')
|
|
row['credentials'] = f'<b>API Token:</b> {e(tok[:20])}...' if tok else '(not set)'
|
|
else:
|
|
row['credentials'] = '-'
|
|
row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', [])))
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if name == 'accounts':
|
|
rows = []
|
|
for acct in load_accounts().get('accounts', []):
|
|
row = dict(acct)
|
|
row['account_status'] = 'active' if acct.get('hashed_password') else 'pending'
|
|
rows.append(row)
|
|
return rows
|
|
|
|
if name == 'vpn_peers':
|
|
rows = []
|
|
_wg_sorted = sorted(
|
|
[v for v in vlans if v.get('is_vpn')],
|
|
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
|
|
)
|
|
for i, vlan in enumerate(_wg_sorted):
|
|
iface = f'wg{i}'
|
|
vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})'
|
|
for peer in vlan.get('peers', []):
|
|
row = dict(peer)
|
|
row['vlan_display'] = vlan_display
|
|
row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
|
|
row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
|
|
rows.append(row)
|
|
return rows
|
|
|
|
return []
|
|
|
|
|
|
def load_datasource(spec):
|
|
if spec.startswith('live:'):
|
|
name = spec[5:]
|
|
if name == 'dhcp_leases': return live_dhcp_leases()
|
|
if name == 'vpn_sessions': return live_vpn_sessions()
|
|
return []
|
|
if spec.startswith('config:'):
|
|
return config_datasource(spec[7:])
|
|
return []
|
|
|
|
factory.load_datasource = load_datasource
|
|
|
|
# Live stat helpers =================================================
|
|
|
|
def get_dnsmasq_stats():
|
|
stats = {'queries': '-', 'hits': '-', 'hit_rate': '-',
|
|
'forwarded': '-', 'auth': '-', 'tcp_peak': '-'}
|
|
out = run('journalctl -u dnsmasq -n 200 --no-pager 2>/dev/null')
|
|
for line in reversed(out.splitlines()):
|
|
if 'queries forwarded' in line:
|
|
m = re.search(r'queries forwarded (\d+)', line)
|
|
if m: stats['forwarded'] = m.group(1)
|
|
m = re.search(r'queries answered locally (\d+)', line)
|
|
if m: stats['hits'] = m.group(1)
|
|
fwd = int(stats['forwarded']) if stats['forwarded'] != '-' else 0
|
|
hit = int(stats['hits']) if stats['hits'] != '-' else 0
|
|
total = fwd + hit
|
|
stats['queries'] = str(total) if total else '-'
|
|
if total > 0:
|
|
stats['hit_rate'] = f'{hit / total * 100:.0f}%'
|
|
break
|
|
if 'auth answered' in line:
|
|
m = re.search(r'auth answered (\d+)', line)
|
|
if m and stats['auth'] == '-':
|
|
stats['auth'] = m.group(1)
|
|
if 'max TCP connections' in line:
|
|
m = re.search(r'max TCP connections (\d+)', line)
|
|
if m and stats['tcp_peak'] == '-':
|
|
stats['tcp_peak'] = m.group(1)
|
|
return stats
|
|
|
|
def _count_blocked_today():
|
|
out = run("journalctl -u dnsmasq --since today --no-pager 2>/dev/null | grep -c 'is NXDOMAIN'")
|
|
return out or '0'
|
|
|
|
def _count_blocked_domains():
|
|
bl_dir = BLOCKLISTS_DIR
|
|
try:
|
|
total = sum(
|
|
int(run(f'wc -l < "{bl_dir}/{f}"') or 0)
|
|
for f in os.listdir(bl_dir) if f.endswith('.con')
|
|
)
|
|
return str(total)
|
|
except Exception:
|
|
return '-'
|
|
|
|
def _bl_last_update():
|
|
bl_dir = BLOCKLISTS_DIR
|
|
try:
|
|
mtime = max(
|
|
os.path.getmtime(f'{bl_dir}/{f}')
|
|
for f in os.listdir(bl_dir) if f.endswith('.con')
|
|
)
|
|
return fmt_timestamp(int(mtime))
|
|
except Exception:
|
|
return '-'
|
|
|
|
def _blocklist_stats_html(cfg):
|
|
bl_dir = BLOCKLISTS_DIR
|
|
rows = ''
|
|
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
|
|
name = e(bl.get('name', ''))
|
|
save_as = bl.get('save_as', '')
|
|
bl_path = f'{bl_dir}/{save_as}' if save_as else ''
|
|
try:
|
|
with open(bl_path) as f:
|
|
entries = sum(1 for _ in f)
|
|
mtime = int(os.path.getmtime(bl_path))
|
|
size_str = fmt_bytes(os.path.getsize(bl_path))
|
|
last_refreshed = f'{datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M")} ({relative_time(mtime)})'
|
|
except Exception:
|
|
entries, size_str, last_refreshed = '-', '-', 'Never'
|
|
rows += (
|
|
'<tr>'
|
|
f'<td class="table-cell">{name}</td>'
|
|
f'<td class="table-cell">{entries}</td>'
|
|
f'<td class="table-cell">{size_str}</td>'
|
|
f'<td class="table-cell">{e(last_refreshed)}</td>'
|
|
'</tr>'
|
|
)
|
|
if not rows:
|
|
return ''
|
|
return (
|
|
'<table class="data-table">'
|
|
'<thead><tr>'
|
|
'<th class="table-header">Blocklist</th>'
|
|
'<th class="table-header">Entries</th>'
|
|
'<th class="table-header">Size</th>'
|
|
'<th class="table-header">Last Refreshed</th>'
|
|
'</tr></thead>'
|
|
f'<tbody>{rows}</tbody>'
|
|
'</table>'
|
|
)
|
|
|
|
|
|
DDNS_LOG_MAX = 50
|
|
|
|
def _ddns_log_tail():
|
|
log_path = f'{CONFIGS_DIR}/ddns.log'
|
|
try:
|
|
log_max_kb = load_ddns().get('general', {}).get('log_max_kb', 1024)
|
|
size_kb = os.path.getsize(log_path) / 1024
|
|
with open(log_path) as f:
|
|
lines = f.readlines()
|
|
if not lines:
|
|
return '(log is empty)', ''
|
|
total = len(lines)
|
|
tail = lines[-DDNS_LOG_MAX:]
|
|
shown = len(tail)
|
|
hidden = total - shown
|
|
pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0
|
|
left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines'
|
|
right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)'
|
|
summary = (
|
|
'<div class="text-muted" style="display:flex;justify-content:space-between;margin-top:0.5em;">'
|
|
f'<span>{left}</span><span>{right}</span></div>'
|
|
)
|
|
return ''.join(tail).strip(), summary
|
|
except FileNotFoundError:
|
|
return '(log file not found)', ''
|
|
except Exception:
|
|
return '(error reading log)', ''
|
|
|
|
def _fmt_seconds(secs):
|
|
secs = int(secs)
|
|
if secs < 60:
|
|
return f'{secs}s'
|
|
m, s = divmod(secs, 60)
|
|
if m < 60:
|
|
return f'{m}m {s}s' if s else f'{m}m'
|
|
h, m = divmod(m, 60)
|
|
return f'{h}h {m}m' if m else f'{h}h'
|
|
|
|
def _parse_interval_to_seconds(s):
|
|
m = re.match(r'^(\d+)([mhd])$', str(s).strip())
|
|
if not m:
|
|
return None
|
|
val, unit = int(m.group(1)), m.group(2)
|
|
return val * {'m': 60, 'h': 3600, 'd': 86400}[unit]
|
|
|
|
def _parse_time_remaining(text):
|
|
for line in text.splitlines():
|
|
if 'Trigger:' in line:
|
|
total, found = 0, False
|
|
for amt, unit in re.findall(r'(\d+)\s*(day|h|min|s)\b', line):
|
|
total += int(amt) * {'day': 86400, 'h': 3600, 'min': 60, 's': 1}[unit]
|
|
found = True
|
|
if found:
|
|
return total
|
|
return None
|
|
|
|
def _read_cached_ip():
|
|
"""Return (ip, mtime) from the most recent .ddns-last-ip-* file, or ('', None)."""
|
|
try:
|
|
best_ip, best_mtime = '', 0.0
|
|
for fname in os.listdir(CONFIGS_DIR):
|
|
if fname.startswith('.ddns-last-ip-'):
|
|
path = f'{CONFIGS_DIR}/{fname}'
|
|
mtime = os.path.getmtime(path)
|
|
if mtime > best_mtime:
|
|
ip = open(path).read().strip()
|
|
if ip:
|
|
best_ip, best_mtime = ip, mtime
|
|
return best_ip, (best_mtime if best_ip else None)
|
|
except Exception:
|
|
return '', None
|
|
|
|
def _public_ip_info(ddns_cfg):
|
|
"""Return (ip_str, domains_sub, next_interval_str, last_obtained_str) for stat cards."""
|
|
enabled_p = [p for p in ddns_cfg.get('providers', []) if p.get('enabled', True)]
|
|
all_hosts = []
|
|
for p in enabled_p:
|
|
all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
|
|
domains_sub = ', '.join(all_hosts)
|
|
ip, mtime = _read_cached_ip()
|
|
last_obtained = f'Obtained: {relative_time(mtime)}' if mtime else ''
|
|
if ip:
|
|
return ip, domains_sub, '-', last_obtained
|
|
return 'Offline', domains_sub, '-', ''
|
|
|
|
def _ddns_last_checked():
|
|
try:
|
|
mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
|
|
return f'Last checked: {relative_time(mtime)}'
|
|
except OSError:
|
|
return 'Last checked: ---'
|
|
|
|
def _vpn_info():
|
|
for vlan in load_config().get('vlans', []):
|
|
if 'vpn_information' in vlan:
|
|
return vlan['vpn_information']
|
|
return {}
|
|
|
|
|
|
# Token collection ==================================================
|
|
|
|
def collect_tokens():
|
|
tokens = {}
|
|
cfg = load_config()
|
|
net = cfg.get('network_interfaces', {})
|
|
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
|
|
dns = cfg.get('upstream_dns', {})
|
|
vlans = cfg.get('vlans', [])
|
|
tokens['GENERAL_WAN_INTERFACE'] = str(net.get('wan_interface', '-'))
|
|
tokens['GENERAL_LAN_INTERFACE'] = str(net.get('lan_interface', '-'))
|
|
tokens['GENERAL_WAN_STATUS'] = iface_status(net.get('wan_interface', ''))
|
|
tokens['GENERAL_LAN_STATUS'] = iface_status(net.get('lan_interface', ''))
|
|
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
|
|
sys_ifaces = get_system_interfaces()
|
|
|
|
# Always include currently-configured values so dropdowns are never blank.
|
|
for configured in [net.get('wan_interface', ''), net.get('lan_interface', '')]:
|
|
if configured and configured not in sys_ifaces:
|
|
sys_ifaces.append(configured)
|
|
sys_ifaces.sort()
|
|
tokens['NETWORK_INTERFACE_OPTIONS'] = json.dumps(
|
|
[{'value': i, 'label': i} for i in sys_ifaces]
|
|
)
|
|
tokens['NETWORK_INTERFACE_STATUS_OPTIONS'] = json.dumps(
|
|
[{'value': i, 'label': f'{i} - {iface_status(i).title()}'} for i in sys_ifaces]
|
|
)
|
|
iface_data = [iface_info(i) for i in sys_ifaces]
|
|
tokens['NETWORK_INTERFACE_DATA_JSON'] = json.dumps(iface_data)
|
|
max_speed_len = max(
|
|
(len(str(d.get('speed') or '')) for d in iface_data),
|
|
default=len('Speed')
|
|
)
|
|
tokens['NETWORK_INTERFACE_STATS_SPEED_PAD'] = str(max(max_speed_len, len('Speed')))
|
|
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
|
|
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-'))
|
|
tokens['GENERAL_APPLY_ON_SAVE'] = 'true' if session.get('apply_changes_immediately', False) else 'false'
|
|
|
|
# Queue health fix before building the pending table so it appears immediately.
|
|
_level = client_level()
|
|
if _level >= LEVEL_RANK['administrator']:
|
|
try:
|
|
import json as _hj
|
|
_st = _hj.load(open(HEALTH_FILE))
|
|
_has_problems = any(
|
|
item.get('status') == 'problem'
|
|
for section in ('configurations', 'logs', 'services')
|
|
for item in _st.get(section, [])
|
|
)
|
|
if _has_problems:
|
|
_fix_uuid, _ = _find_cmd_in_queues('fix problems')
|
|
if _fix_uuid is None:
|
|
queue_command('fix problems', user=session.get('email_address', ''))
|
|
except Exception:
|
|
pass
|
|
|
|
all_groups = load_all_groups() # [(group_dict, [change_dicts])]
|
|
_group_uuid_set = {g['uuid'] for g, _ in all_groups}
|
|
pending_items = get_dashboard_pending()
|
|
if pending_items:
|
|
from collections import defaultdict
|
|
_pgroups = defaultdict(list)
|
|
for _uuid, _ts, cmd, user in pending_items:
|
|
_pgroups[cmd].append((_uuid, user))
|
|
rows = ''
|
|
for cmd, entries in _pgroups.items():
|
|
users = ', '.join(sorted({u for _, u in entries if u and u != 'unknown'}))
|
|
snap_uuids = [_uuid for _uuid, _ in entries if _uuid in _group_uuid_set]
|
|
if snap_uuids:
|
|
req_tags = ''.join(
|
|
f'<span class="tag" data-tooltip="{_uuid}" data-uuid="{_uuid}">'
|
|
f'<span class="tl-full">{_uuid[:8]}</span>'
|
|
f'<span class="tl-short">{_uuid[:8]}</span>'
|
|
f'<span class="tl-min">{_uuid[:8]}</span>'
|
|
'</span>'
|
|
for _uuid in snap_uuids
|
|
)
|
|
req_cell = f'<td class="table-cell"><div class="tag-list">{req_tags}</div></td>'
|
|
else:
|
|
req_cell = '<td class="table-cell">-</td>'
|
|
rows += (
|
|
'<tr>'
|
|
f'<td class="table-cell">{e(cmd)}</td>'
|
|
f'<td class="table-cell">{e(users)}</td>'
|
|
f'{req_cell}'
|
|
'</tr>'
|
|
)
|
|
pending_html = (
|
|
'<table class="data-table">'
|
|
'<thead><tr>'
|
|
'<th class="table-header">Command</th>'
|
|
'<th class="table-header">User</th>'
|
|
'<th class="table-header">Required By</th>'
|
|
'</tr></thead>'
|
|
f'<tbody>{rows}</tbody>'
|
|
'</table>'
|
|
)
|
|
else:
|
|
pending_html = '<p class="text-muted">No pending actions.</p>'
|
|
|
|
tokens['PENDING_ACTIONS_HTML'] = pending_html
|
|
tokens['NO_PENDING'] = 'true' if not pending_items else ''
|
|
tokens['NO_DISMISSIBLE_PENDING'] = 'true' if not any(c != 'fix problems' for _, _, c, _ in pending_items) else ''
|
|
tokens['APPLY_WARNING'] = (
|
|
f'<span style="color:var(--warning)"><p>{_load_icon("arrow-left")} <strong>Applying actions will briefly disrupt connections as network services are restarted.</strong></p></span>'
|
|
if pending_items else ''
|
|
)
|
|
done_ts_map = get_done_timestamps()
|
|
if all_groups:
|
|
_no_revert = set()
|
|
for g, _ in all_groups:
|
|
if g['reverts_group']:
|
|
_no_revert.add(g['uuid'])
|
|
_no_revert.add(g['reverts_group'])
|
|
hist_rows = ''
|
|
_hist_onclick = (
|
|
'onclick="if(event.target.type!==\'checkbox\')'
|
|
'this.nextElementSibling.hidden=!this.nextElementSibling.hidden"'
|
|
)
|
|
for g, changes in all_groups:
|
|
_uuid = g['uuid']
|
|
applied_ts = done_ts_map.get(_uuid)
|
|
dt_str = datetime.fromtimestamp(applied_ts).strftime('%Y-%m-%d %H:%M') if applied_ts else '-'
|
|
all_before_null = all(c['before'] is None for c in changes)
|
|
all_after_null = all(c['after'] is None for c in changes)
|
|
if g['reverts_group']:
|
|
verb = 'Reverted'
|
|
elif all_before_null:
|
|
verb = 'Added'
|
|
elif all_after_null:
|
|
verb = 'Deleted'
|
|
else:
|
|
verb = 'Edited'
|
|
item = g.get('item_value') or ''
|
|
summary = f'{verb} {g["parent_path"]}: {item}' if item else f'{verb} {g["parent_path"]}'
|
|
snap_tag = (
|
|
f'<div class="tag-list"><span class="tag" data-tooltip="{e(_uuid)}" data-uuid="{e(_uuid)}">'
|
|
f'<span class="tl-full">{e(_uuid[:8])}</span>'
|
|
f'<span class="tl-short">{e(_uuid[:8])}</span>'
|
|
f'<span class="tl-min">{e(_uuid[:8])}</span>'
|
|
'</span></div>'
|
|
)
|
|
snap_user = e(g.get('user', ''))
|
|
_cb_attrs = 'disabled title="Cannot revert"' if _uuid in _no_revert else ''
|
|
hist_rows += (
|
|
f'<tr class="row-expandable" data-uuid="{e(_uuid)}" {_hist_onclick}>'
|
|
f'<td class="table-cell"><input type="checkbox" name="selected_uuids" value="{e(_uuid)}" {_cb_attrs}/></td>'
|
|
f'<td class="table-cell">{e(dt_str)}</td>'
|
|
f'<td class="table-cell">{e(summary)}</td>'
|
|
f'<td class="table-cell">{build_snap_val(changes)}</td>'
|
|
f'<td class="table-cell">{snap_tag}</td>'
|
|
f'<td class="table-cell">{snap_user}</td>'
|
|
'</tr>'
|
|
f'{snap_expand_row(changes, 6)}'
|
|
)
|
|
select_all = (
|
|
'<input type="checkbox" '
|
|
'onchange="document.querySelectorAll(\'[name=selected_uuids]:not(:disabled)\').forEach(c=>c.checked=this.checked)"/>'
|
|
)
|
|
history_html = (
|
|
'<table class="data-table">'
|
|
'<thead><tr>'
|
|
f'<th class="table-header">{select_all}</th>'
|
|
'<th class="table-header">Applied</th>'
|
|
'<th class="table-header">Change</th>'
|
|
'<th class="table-header">Fields</th>'
|
|
'<th class="table-header">Group</th>'
|
|
'<th class="table-header">User</th>'
|
|
'</tr></thead>'
|
|
f'<tbody>{hist_rows}</tbody>'
|
|
'</table>'
|
|
)
|
|
else:
|
|
history_html = '<p class="text-muted">No change history.</p>'
|
|
|
|
tokens['CHANGE_HISTORY_HTML'] = history_html
|
|
tokens['NO_HISTORY'] = 'true' if not all_groups else ''
|
|
|
|
servers = dns.get('upstream_servers', [])
|
|
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
|
|
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
|
|
tokens['DNS_UPSTREAM_SERVERS_JSON'] = json.dumps(servers)
|
|
tokens['OVERVIEW_UPSTREAM_SERVERS'] = ', '.join(servers) or '-'
|
|
|
|
non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
|
|
vlan_names = [v.get('name', '') for v in vlans]
|
|
tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-'
|
|
tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans))
|
|
tokens['STAT_LEASE_COUNT'] = str(len(live_dhcp_leases()))
|
|
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
|
|
f'<option value="{e(n)}">{e(n)}</option>' for n in vlan_names
|
|
)
|
|
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
|
|
tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
|
|
tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn')))
|
|
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
|
|
_dv = next((v for v in vlans if v.get('radius_default')), None)
|
|
tokens['RADIUS_DEFAULT_VLAN'] = f'"{_dv["name"]}" (VLAN {_dv["vlan_id"]})' if _dv else 'none set'
|
|
try:
|
|
tokens['RADIUS_SECRET'] = open(f'{CONFIGS_DIR}/.radius-secret').read().strip()
|
|
except OSError:
|
|
tokens['RADIUS_SECRET'] = '(Generation is pending - visit Actions to apply generation command)'
|
|
tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True)))
|
|
tokens['STAT_BLOCKLIST_COUNT'] = str(len(cfg.get('dns_blocking', {}).get('blocklists', [])))
|
|
tokens['BLOCKLIST_STATS_HTML'] = _blocklist_stats_html(cfg)
|
|
|
|
ddns = load_ddns()
|
|
ddns_gen = ddns.get('general', {})
|
|
tokens['DDNS_TIMER_INTERVAL'] = ddns_gen.get('timer_interval', '-')
|
|
_interval_secs = _parse_interval_to_seconds(ddns_gen.get('timer_interval', '')) or 600
|
|
tokens['DDNS_TIMER_INTERVAL_MINS'] = str(_interval_secs // 60)
|
|
tokens['DDNS_GEN_LOG_MAX_KB'] = str(ddns_gen.get('log_max_kb', 1024))
|
|
tokens['DDNS_GEN_LOG_ERRORS_ONLY'] = 'true' if ddns_gen.get('log_errors_only') else 'false'
|
|
enabled_p = [p for p in ddns.get('providers', []) if p.get('enabled', True)]
|
|
tokens['STAT_DDNS_PROVIDER_COUNT'] = str(len(enabled_p))
|
|
_ip_check = ddns.get('ip_check_services', [])
|
|
_http_svc = [s['url'] for s in _ip_check if s.get('type') == 'http']
|
|
_dig_svc = [s['url'] for s in _ip_check if s.get('type') == 'dig']
|
|
tokens['STAT_IP_CHECK_TOTAL'] = str(len(_ip_check))
|
|
tokens['STAT_IP_CHECK_SUB'] = f'{len(_http_svc)} http and {len(_dig_svc)} dig'
|
|
tokens['IP_CHECK_HTTP_JSON'] = json.dumps(_http_svc)
|
|
tokens['IP_CHECK_DIG_JSON'] = json.dumps(_dig_svc)
|
|
_ddns_labels = {'noip': 'No-IP', 'cloudflare': 'Cloudflare', 'duckdns': 'DuckDNS'}
|
|
tokens['DDNS_PROVIDER_OPTIONS'] = json.dumps([
|
|
{'value': p, 'label': _ddns_labels.get(p, p.title())}
|
|
for p in validate.VALID_DDNS_PROVIDERS
|
|
])
|
|
|
|
wg_vlans_list = sorted(
|
|
[v for v in vlans if v.get('is_vpn')],
|
|
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
|
|
)
|
|
tokens['VPN_VLAN_OPTIONS'] = json.dumps([
|
|
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
|
|
for i, v in enumerate(wg_vlans_list)
|
|
])
|
|
wg_vlan = wg_vlans_list[0] if wg_vlans_list else {}
|
|
vpn = wg_vlan.get('vpn_information', {})
|
|
overrides = vpn.get('explicit_overrides', {})
|
|
tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
|
|
tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
|
|
tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
|
|
tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_server', ''))
|
|
tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
|
|
|
|
# Compute gateway from server_identities (lowest last-octet), fallback to first subnet host
|
|
try:
|
|
import ipaddress as _ipaddress
|
|
ident_ips = [s['ip'] for s in wg_vlan.get('server_identities', []) if s.get('ip')]
|
|
if ident_ips:
|
|
default_gw = str(min((_ipaddress.IPv4Address(ip) for ip in ident_ips),
|
|
key=lambda x: x.packed[-1]))
|
|
else:
|
|
wg_net = _ipaddress.IPv4Network(
|
|
f"{wg_vlan['subnet']}/{wg_vlan['subnet_mask']}", strict=False)
|
|
default_gw = str(next(wg_net.hosts()))
|
|
tokens['VPN_GATEWAY'] = overrides.get('gateway') or default_gw
|
|
except Exception:
|
|
tokens['VPN_GATEWAY'] = ''
|
|
|
|
ip_str, sub_str, next_interval, last_obtained = _public_ip_info(ddns)
|
|
tokens['STAT_PUBLIC_IP'] = ip_str
|
|
tokens['STAT_DDNS_HOSTNAME'] = sub_str
|
|
tokens['STAT_DDNS_NEXT_INTERVAL'] = next_interval
|
|
tokens['STAT_PUBLIC_IP_LAST_OBTAINED'] = last_obtained
|
|
tokens['STAT_PUBLIC_IP_LAST_CHECKED'] = _ddns_last_checked()
|
|
tokens['DDNS_LOG_TAIL'], tokens['DDNS_LOG_SUMMARY'] = _ddns_log_tail()
|
|
tokens['STAT_UPTIME'] = run('uptime -p') or '-'
|
|
tokens['STAT_NFTABLES_STATUS'] = 'Active' if run('nft list tables 2>/dev/null').strip() else 'Inactive'
|
|
|
|
dns_stats = get_dnsmasq_stats()
|
|
tokens['DNS_STAT_QUERIES'] = dns_stats['queries']
|
|
tokens['DNS_STAT_HITS'] = dns_stats['hits']
|
|
tokens['DNS_STAT_HIT_RATE'] = dns_stats['hit_rate']
|
|
tokens['DNS_STAT_FORWARDED'] = dns_stats['forwarded']
|
|
tokens['DNS_STAT_AUTH'] = dns_stats['auth']
|
|
tokens['DNS_STAT_TCP_PEAK'] = dns_stats['tcp_peak']
|
|
tokens['STAT_BLOCKED_TODAY'] = _count_blocked_today()
|
|
tokens['STAT_BLOCKED_DOMAINS'] = _count_blocked_domains()
|
|
tokens['STAT_BL_LAST_UPDATE'] = _bl_last_update()
|
|
tokens['PREF_EMAIL'] = session.get('email_address', '')
|
|
tokens['PREF_TIMEZONE'] = session.get('timezone', '')
|
|
|
|
blank = [{'value': '', 'label': '-- Select timezone --'}]
|
|
tokens['TIMEZONE_OPTIONS'] = json.dumps(
|
|
blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]
|
|
)
|
|
tokens['PROTOCOL_OPTIONS'] = json.dumps([
|
|
{'value': 'tcp', 'label': 'TCP'},
|
|
{'value': 'udp', 'label': 'UDP'},
|
|
{'value': 'both', 'label': 'TCP/UDP'},
|
|
])
|
|
tokens['BLOCKLIST_FORMAT_OPTIONS'] = json.dumps([
|
|
{'value': 'hosts', 'label': 'hosts (hosts file format)'},
|
|
{'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
|
|
])
|
|
tokens['BLOCKLIST_NAME_OPTIONS'] = json.dumps([
|
|
{'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
|
|
for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
|
|
])
|
|
tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
|
|
{'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
|
|
{'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
|
|
{'value': 'manager', 'label': 'Manager (full access including account management)'},
|
|
])
|
|
|
|
return tokens
|
|
|
|
|
|
# Layout renderer ===================================================
|
|
|
|
def render_layout(view_id, content_html, tokens, page_name=None):
|
|
css = _load_css()
|
|
level = client_level()
|
|
has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
|
|
titlebar_html = f'<div class="titlebar"><span class="titlebar-brand">{WEB_APP_DISPLAY_NAME}</span></div>'
|
|
navbar_html = build_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
|
|
footer_html = f'<footer class="footer">{WEB_APP_DISPLAY_NAME}</footer>'
|
|
|
|
page_hash = config_hash()
|
|
lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
|
|
vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
|
|
current_user = session.get('email_address', '')
|
|
pending = get_pending_entries()
|
|
my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
|
|
|
|
secs = _seconds_until_next_run()
|
|
locked = _is_locked()
|
|
lock_mtime = _lock_mtime()
|
|
other_bars = ''
|
|
seen_other_users = set()
|
|
for o_uuid, o_ts, o_cmd, o_user in pending:
|
|
if o_user == current_user:
|
|
continue
|
|
if o_user in seen_other_users:
|
|
continue
|
|
seen_other_users.add(o_user)
|
|
_display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
|
|
if locked and lock_mtime and o_ts < lock_mtime:
|
|
text = f'{_display_user}\'s changes are being applied now...'
|
|
cls = 'info-bar-warning info-bar-running'
|
|
else:
|
|
timing = _format_timing(secs)
|
|
text = f'{_display_user} has pending changes which will be applied {timing}.' if timing else f'{_display_user} has pending changes. The processing service is not running.'
|
|
cls = 'info-bar-warning'
|
|
other_bars += f'<div class="info-bar {cls}" data-apply-uuid="{e(o_uuid)}" data-apply-user="{e(o_user)}"><span>{text}</span></div>\n'
|
|
|
|
problem_bars = ''
|
|
if level >= LEVEL_RANK['viewer']:
|
|
try:
|
|
import json as _j
|
|
st = _j.load(open(HEALTH_FILE))
|
|
problems = []
|
|
for section in ('configurations', 'logs'):
|
|
for item in st.get(section, []):
|
|
if item.get('status') == 'problem':
|
|
problems.append(e(item.get('detail', item.get('name', ''))))
|
|
for item in st.get('services', []):
|
|
if item.get('status') == 'problem':
|
|
name = item.get('name', '')
|
|
utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
|
|
exp_parts, act_parts = [], []
|
|
if not item.get('active_ok'):
|
|
exp_parts.append(item.get('expected_active', 'active'))
|
|
act_parts.append(item.get('active', 'unknown'))
|
|
if not item.get('enabled_ok'):
|
|
exp_parts.append(item.get('expected_enabled', 'enabled'))
|
|
act_parts.append(item.get('enabled', 'unknown'))
|
|
problems.append(e(
|
|
f"The {utype} `{name}` is expected to be "
|
|
f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
|
|
))
|
|
has_problems = bool(problems)
|
|
fix_suffix = ''
|
|
fix_uuid = None
|
|
if has_problems:
|
|
if level < LEVEL_RANK['administrator']:
|
|
fix_suffix = 'Please contact an administrator.'
|
|
else:
|
|
fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
|
|
if _apply_changes_immediately():
|
|
if _is_locked():
|
|
mtime = _lock_mtime()
|
|
fix_suffix = ('Fix is being applied now...' if fix_ts and mtime and fix_ts < mtime
|
|
else 'Fix will be applied on the next run.')
|
|
else:
|
|
timing = _format_timing(_seconds_until_next_run())
|
|
fix_suffix = (f'Fix will be applied {timing}.' if timing
|
|
else 'Fix pending. The processing service is not running.')
|
|
else:
|
|
fix_suffix = ('Fix pending. Click <strong>Apply Now</strong> below to fix.'
|
|
if view_id == 'actions' else
|
|
'Fix pending. Visit the <strong>Actions</strong> page ASAP to apply fix.')
|
|
if problems:
|
|
problems_list = ('<ul style="margin:0.25em 0;padding-left:1.25em">'
|
|
+ ''.join(f'<li>{d}</li>' for d in problems)
|
|
+ '</ul>')
|
|
uuid_attr = (f' data-health-uuid="{e(fix_uuid)}"'
|
|
if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else '')
|
|
fix_html = (f'<div style="margin-top:0.5em"{uuid_attr}>{fix_suffix}</div>'
|
|
if fix_suffix else '')
|
|
content = ('<div style="width:100%">'
|
|
'<div style="font-weight:600;margin-bottom:0.25em">Health check - problems found:</div>'
|
|
+ problems_list + fix_html
|
|
+ '</div>')
|
|
problem_bars += f'<div class="info-bar info-bar-danger">{content}</div>\n'
|
|
except Exception:
|
|
pass
|
|
|
|
pending_bar = ''
|
|
if has_pending_alert and not problem_bars and view_id != 'actions':
|
|
pending_bar = (
|
|
'<div class="info-bar info-bar-warning">'
|
|
'<span>You have actions pending. Please visit the <strong>Actions</strong> page.</span>'
|
|
'</div>\n'
|
|
)
|
|
|
|
return (
|
|
'<!DOCTYPE html>\n<html lang="en">\n<head>\n'
|
|
' <meta charset="UTF-8"/>\n'
|
|
' <meta name="viewport" content="width=device-width, initial-scale=1.0"/>\n'
|
|
f' <title>{WEB_APP_DISPLAY_NAME}</title>\n'
|
|
f' <style>{css}</style>\n'
|
|
'</head>\n<body>\n'
|
|
f'{titlebar_html}\n'
|
|
f'{navbar_html}\n'
|
|
f'<main class="main-content">\n{pending_bar}{problem_bars}{other_bars}{content_html}\n</main>\n'
|
|
f'{footer_html}\n'
|
|
f'<script>var CONFIG_HASH="{page_hash}";var LAN_IFACE="{lan_iface}";var VPN_VLAN_COUNT={vpn_count};var APPLY_UUID={json.dumps(my_uuid)};</script>\n'
|
|
f'<script>{_inline_js(page_name)}</script>\n'
|
|
'</body>\n</html>'
|
|
)
|
|
|
|
|
|
def build_navbar(active_view, level, tokens, pending_alert=False):
|
|
navbar_data = load_json(NAVBAR_FILE)
|
|
left, right = [], []
|
|
for item in navbar_data.get('items', []):
|
|
req = item.get('client_requirement')
|
|
align = item.get('align', 'left')
|
|
if not passes(req, level):
|
|
continue
|
|
frag = build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
|
|
(right if align == 'right' else left).append(frag)
|
|
|
|
return (
|
|
'<nav class="nav-bar">'
|
|
f'<div class="nav-left">{"".join(left)}</div>'
|
|
f'<div class="nav-right">{"".join(right)}</div>'
|
|
'</nav>'
|
|
)
|
|
|
|
|
|
def build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
|
|
req = item.get('client_requirement', inherited_req)
|
|
t = item.get('type', '')
|
|
|
|
if t in ('nav_item', 'nav_action'):
|
|
label = e(item.get('label', ''))
|
|
map_to = item.get('map_to', '')
|
|
action = item.get('action', '')
|
|
is_active = ' active' if map_to and map_to == active_view else ''
|
|
pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
|
|
cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
|
|
if action:
|
|
return (
|
|
f'<form method="post" action="/action/{e(action)}" class="form-inline">'
|
|
f'<button type="submit" class="{cls}">{label}</button></form>'
|
|
)
|
|
if map_to:
|
|
return f'<a href="/{e(map_to)}" class="{cls}">{label}</a>'
|
|
return f'<span class="{cls}">{label}</span>'
|
|
|
|
if t == 'nav_menu':
|
|
raw_label = item.get('label', '')
|
|
if raw_label == '%MENU_LABEL%':
|
|
raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
|
|
label = e(raw_label)
|
|
children = ''
|
|
for child in item.get('items', []):
|
|
child_req = child.get('client_requirement', req)
|
|
if not passes(child_req, level):
|
|
continue
|
|
children += build_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
|
|
if not children:
|
|
return ''
|
|
return (
|
|
'<div class="nav-menu">'
|
|
f'<button class="nav-item nav-menu-trigger" aria-haspopup="true">{label}</button>'
|
|
f'<div class="nav-dropdown">{children}</div>'
|
|
'</div>'
|
|
)
|
|
return ''
|
|
|
|
|
|
# Inline JavaScript =================================================
|
|
|
|
def _inline_js(page_name=None):
|
|
big_validate_js = factory.build_big_validate()
|
|
try:
|
|
with open(COMMON_JS_FILE) as f:
|
|
app_js = f.read()
|
|
except Exception:
|
|
app_js = ''
|
|
page_js = ''
|
|
if page_name:
|
|
page_js_path = os.path.join(PAGES_DIR, page_name, 'page.js')
|
|
try:
|
|
with open(page_js_path) as f:
|
|
page_js = f.read()
|
|
except Exception:
|
|
pass
|
|
return big_validate_js + '\n' + app_js + ('\n' + page_js if page_js else '')
|
|
|
|
|
|
# Routes ============================================================
|
|
|
|
@bp.route('/')
|
|
def index():
|
|
return serve_view('overview')
|
|
|
|
@bp.route('/<page_name>')
|
|
def view(page_name):
|
|
return serve_view(page_name)
|
|
|
|
def serve_view(page_name):
|
|
view_def = load_json(os.path.join(PAGES_DIR, page_name, 'content.json'))
|
|
|
|
if view_def is None:
|
|
from flask import abort
|
|
abort(404)
|
|
|
|
view_req = view_def.get('client_requirement')
|
|
level = client_level()
|
|
if not passes(view_req, level):
|
|
return redirect('/overview' if level > 0 else '/accountlogin')
|
|
|
|
tokens = collect_tokens()
|
|
|
|
if page_name == 'radius' and not os.path.exists(f'{CONFIGS_DIR}/.radius-secret'):
|
|
from config_utils import queue_command
|
|
queue_command('gen radius')
|
|
|
|
flash_html = ''
|
|
for category, message in get_flashed_messages(with_categories=True):
|
|
variant = {'error': 'danger', 'warning': 'warning', 'success': 'success'}.get(category, 'info')
|
|
msg_html = message if isinstance(message, Markup) else e(message)
|
|
flash_html += f'<div class="info-bar info-bar-{variant} info-bar-flash"><span>{msg_html}</span></div>'
|
|
|
|
content_html = flash_html + build_items(view_def.get('items', []), tokens, view_req)
|
|
return render_layout(page_name, content_html, tokens, page_name=page_name)
|