'
+ )
+ elif ptype in ('cloudflare', 'duckdns'):
+ tok = p.get('api_token', '')
+ row['credentials'] = f'API Token: {e(tok[:20])}...' if tok else '(not set)'
+ else:
+ row['credentials'] = '-'
+ row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', [])))
+ rows.append(row)
+ return rows
+
+ if name == 'accounts':
+ try:
+ with open(ACCOUNTS_FILE) as f:
+ data = json.load(f)
+ except Exception:
+ data = {}
+ rows = []
+ for acct in data.get('accounts', []):
+ row = dict(acct)
+ row['account_status'] = 'active' if acct.get('hashed_password') else 'pending'
+ rows.append(row)
+ return rows
+
+ if name == 'vpn_peers':
+ rows = []
+ wg_sorted = sorted(
+ [v for v in vlans if v.get('is_vpn')],
+ key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
+ )
+ for i, vlan in enumerate(wg_sorted):
+ iface = f'wg{i}'
+ vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})'
+ for peer in vlan.get('peers', []):
+ row = dict(peer)
+ row['vlan_display'] = vlan_display
+ row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
+ row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
+ rows.append(row)
+ return rows
+
+ return []
+
+
+def load_datasource(spec):
+ if spec.startswith('config:'):
+ return config_datasource(spec[7:])
+ return []
+
+
+def collect_layout_tokens(cfg):
+ net = cfg.get('network_interfaces', {})
+ return {
+ 'GENERAL_LAN_INTERFACE': str(net.get('lan_interface', '-')),
+ 'VPN_VLAN_COUNT': str(sum(1 for v in cfg.get('vlans', []) if v.get('is_vpn'))),
+ }
diff --git a/docker/routlin-dash/app/factory.py b/docker/routlin-dash/app/factory.py
index cfb905c..f127862 100644
--- a/docker/routlin-dash/app/factory.py
+++ b/docker/routlin-dash/app/factory.py
@@ -1,15 +1,24 @@
-# factory.py: JSON content-type renderer
-# Converts content.json item trees into HTML strings.
-# Pure type processing: no data loading, no routing, no layout.
+# factory.py: HTML renderer and shared utilities
+# Builds HTML from content.json item trees. Pure rendering - no data fetching.
from flask import session
from markupsafe import Markup
-import json, re, sys, html as html_mod
-from config_utils import config_hash
+import json, re, sys, html as html_mod, os, subprocess
+from config_utils import (
+ config_hash, load_config, CONFIGS_DIR, DATA_DIR, WWW_DIR, APP_DIR,
+ ACCOUNTS_FILE, HEALTH_FILE, BLOCKLISTS_DIR,
+ fmt_timestamp, relative_time, fmt_bytes, resolve_iface,
+ WEB_APP_DISPLAY_NAME,
+)
+from config_utils import (
+ get_pending_entries, get_dashboard_pending, _find_cmd_in_queues,
+ _apply_changes_immediately, _seconds_until_next_run, _format_timing,
+ _is_locked, _lock_mtime, _entry_ts_from_queue,
+)
-# Injected by view_page at startup ====================================
-# view_page sets this after defining load_datasource so that build_table
-# can load row data without creating a circular import.
-load_datasource = None
+PAGES_DIR = os.path.join(APP_DIR, 'pages')
+NAVBAR_FILE = os.path.join(APP_DIR, 'navbar.json')
+CSS_FILE = os.path.join(DATA_DIR, 'styles.css')
+COMMON_JS_FILE = os.path.join(DATA_DIR, 'common.js')
# Constants ===========================================================
@@ -34,6 +43,60 @@ VALIDATION_FLAGS = {
'VALIDATION_IPV4_CIDR': 1 << 13,
}
+# File / shell helpers ================================================
+
+def load_json(path):
+ try:
+ with open(path) as f:
+ return json.load(f)
+ except Exception as ex:
+ print(f'[factory] ERROR loading {path}: {ex}', file=sys.stderr)
+ return {}
+
+def load_ddns():
+ return load_config().get('ddns', {})
+
+def load_accounts():
+ return load_json(ACCOUNTS_FILE)
+
+def run(cmd):
+ try:
+ r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
+ return r.stdout.strip()
+ except Exception:
+ return ''
+
+def load_css():
+ try:
+ with open(CSS_FILE) as f:
+ return f.read()
+ except Exception:
+ return ''
+
+def load_icon(name):
+ try:
+ with open(f'{WWW_DIR}/icons/{name}.svg') as f:
+ return f.read().strip()
+ except Exception:
+ return ''
+
+def inline_js(page_name=None):
+ big_validate_js = build_big_validate()
+ try:
+ with open(COMMON_JS_FILE) as f:
+ app_js = f.read()
+ except Exception:
+ app_js = ''
+ page_js = ''
+ if page_name:
+ page_js_path = os.path.join(PAGES_DIR, page_name, 'page.js')
+ try:
+ with open(page_js_path) as f:
+ page_js = f.read()
+ except Exception:
+ pass
+ return big_validate_js + '\n' + app_js + ('\n' + page_js if page_js else '')
+
# Utilities ===========================================================
def e(text):
@@ -191,6 +254,16 @@ def get_worker_id(datasource):
return datasource[len(prefix):]
return ''
+def table_token_key(spec):
+ return 'TABLE_' + re.sub(r'[^A-Z0-9]', '_', spec.upper())
+
+def iter_table_items(items):
+ for item in items:
+ if item.get('type') == 'table':
+ yield item
+ for sub in (item.get('items') or [], (item.get('toolbar') or {}).get('items') or []):
+ yield from iter_table_items(sub)
+
# Access control ======================================================
def client_level():
@@ -215,17 +288,17 @@ def passes(req, level):
# Snapshot helpers ====================================================
-def _flatten_json(val, prefix):
+def flatten_json(val, prefix):
"""Recursively flatten a parsed JSON value into [(path, leaf_str)] pairs."""
if isinstance(val, dict):
out = []
for k, v in val.items():
- out.extend(_flatten_json(v, f'{prefix}.{k}'))
+ out.extend(flatten_json(v, f'{prefix}.{k}'))
return out
if isinstance(val, list):
out = []
for i, v in enumerate(val):
- out.extend(_flatten_json(v, f'{prefix}[{i}]'))
+ out.extend(flatten_json(v, f'{prefix}[{i}]'))
return out
if val is None:
return [(prefix, None)]
@@ -260,8 +333,8 @@ def snap_expand_row(changes, colspan):
bval = json.loads(before_text) if before_text is not None else None
aval = json.loads(after_text) if after_text is not None else None
if isinstance(bval, (dict, list)) or isinstance(aval, (dict, list)):
- bflat = dict(_flatten_json(bval, field)) if isinstance(bval, (dict, list)) else {}
- aflat = dict(_flatten_json(aval, field)) if isinstance(aval, (dict, list)) else {}
+ bflat = dict(flatten_json(bval, field)) if isinstance(bval, (dict, list)) else {}
+ aflat = dict(flatten_json(aval, field)) if isinstance(aval, (dict, list)) else {}
if bflat or aflat:
seen = set()
for k in list(aflat) + list(bflat):
@@ -1007,10 +1080,9 @@ def build_table_cell(value, render_fn, col_class='', field='', row_idx=None,
# Table renderer ======================================================
-def build_table(item, tokens, inherited_req=None):
+def build_table(item, tokens, rows, inherited_req=None):
level = client_level()
columns = item.get('columns', [])
- rows = load_datasource(item.get('datasource', ''))
empty = e(item.get('empty_message', 'No data.'))
row_actions = item.get('row_actions', [])
hash_val = config_hash()
@@ -1515,9 +1587,217 @@ def build_item(item, tokens, inherited_req=None):
return f'
{inner}
'
if t == 'table':
- return build_table(item, tokens, req)
+ ds = item.get('datasource', '')
+ key = table_token_key(ds)
+ if key in tokens:
+ return Markup(tokens[key])
+ return build_table(item, tokens, [], req)
if t == 'raw_html':
return Markup(apply_tokens(item.get('html', ''), tokens))
return ''
+
+
+# Layout renderer =====================================================
+
+def render_layout(view_id, content_html, tokens, page_name=None):
+ css = load_css()
+ level = client_level()
+ has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
+ titlebar_html = f'
{WEB_APP_DISPLAY_NAME}
'
+ navbar_html = build_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
+ footer_html = f''
+
+ page_hash = config_hash()
+ lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
+ vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
+ current_user = session.get('email_address', '')
+ pending = get_pending_entries()
+ my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
+
+ secs = _seconds_until_next_run()
+ locked = _is_locked()
+ lock_mtime = _lock_mtime()
+ other_bars = ''
+ seen_other_users = set()
+ for o_uuid, o_ts, o_cmd, o_user in pending:
+ if o_user == current_user:
+ continue
+ if o_user in seen_other_users:
+ continue
+ seen_other_users.add(o_user)
+ display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
+ if locked and lock_mtime and o_ts < lock_mtime:
+ text = f'{display_user}\'s changes are being applied now...'
+ cls = 'info-bar-warning info-bar-running'
+ else:
+ timing = _format_timing(secs)
+ text = (
+ f'{display_user} has pending changes which will be applied {timing}.'
+ if timing else
+ f'{display_user} has pending changes. The processing service is not running.'
+ )
+ cls = 'info-bar-warning'
+ other_bars += f'
{text}
\n'
+
+ problem_bars = ''
+ if level >= LEVEL_RANK['viewer']:
+ try:
+ st = json.load(open(HEALTH_FILE))
+ problems = []
+ for section in ('configurations', 'logs'):
+ for item in st.get(section, []):
+ if item.get('status') == 'problem':
+ problems.append(e(item.get('detail', item.get('name', ''))))
+ for item in st.get('services', []):
+ if item.get('status') == 'problem':
+ name = item.get('name', '')
+ utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
+ exp_parts, act_parts = [], []
+ if not item.get('active_ok'):
+ exp_parts.append(item.get('expected_active', 'active'))
+ act_parts.append(item.get('active', 'unknown'))
+ if not item.get('enabled_ok'):
+ exp_parts.append(item.get('expected_enabled', 'enabled'))
+ act_parts.append(item.get('enabled', 'unknown'))
+ problems.append(e(
+ f"The {utype} `{name}` is expected to be "
+ f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
+ ))
+ has_problems = bool(problems)
+ fix_suffix = ''
+ fix_uuid = None
+ if has_problems:
+ if level < LEVEL_RANK['administrator']:
+ fix_suffix = 'Please contact an administrator.'
+ else:
+ fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
+ if _apply_changes_immediately():
+ if _is_locked():
+ mtime = _lock_mtime()
+ fix_suffix = (
+ 'Fix is being applied now...'
+ if fix_ts and mtime and fix_ts < mtime
+ else 'Fix will be applied on the next run.'
+ )
+ else:
+ timing = _format_timing(_seconds_until_next_run())
+ fix_suffix = (
+ f'Fix will be applied {timing}.'
+ if timing else
+ 'Fix pending. The processing service is not running.'
+ )
+ else:
+ fix_suffix = (
+ 'Fix pending. Click Apply Now below to fix.'
+ if view_id == 'actions' else
+ 'Fix pending. Visit the Actions page ASAP to apply fix.'
+ )
+ if problems:
+ problems_list = (
+ '
'
+ + ''.join(f'
{d}
' for d in problems)
+ + '
'
+ )
+ uuid_attr = (
+ f' data-health-uuid="{e(fix_uuid)}"'
+ if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else ''
+ )
+ fix_html = (
+ f'
{fix_suffix}
'
+ if fix_suffix else ''
+ )
+ content = (
+ '
'
+ '
Health check - problems found:
'
+ + problems_list + fix_html
+ + '
'
+ )
+ problem_bars += f'
{content}
\n'
+ except Exception:
+ pass
+
+ pending_bar = ''
+ if has_pending_alert and not problem_bars and view_id != 'actions':
+ pending_bar = (
+ '
'
+ 'You have actions pending. Please visit the Actions page.'
+ '
\n'
+ )
+
+ return (
+ '\n\n\n'
+ ' \n'
+ ' \n'
+ f' {WEB_APP_DISPLAY_NAME}\n'
+ f' \n'
+ '\n\n'
+ f'{titlebar_html}\n'
+ f'{navbar_html}\n'
+ f'\n{pending_bar}{problem_bars}{other_bars}{content_html}\n\n'
+ f'{footer_html}\n'
+ f'\n'
+ f'\n'
+ '\n'
+ )
+
+
+def build_navbar(active_view, level, tokens, pending_alert=False):
+ navbar_data = load_json(NAVBAR_FILE)
+ left, right = [], []
+ for item in navbar_data.get('items', []):
+ req = item.get('client_requirement')
+ align = item.get('align', 'left')
+ if not passes(req, level):
+ continue
+ frag = build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
+ (right if align == 'right' else left).append(frag)
+ return (
+ ''
+ )
+
+
+def build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
+ req = item.get('client_requirement', inherited_req)
+ t = item.get('type', '')
+
+ if t in ('nav_item', 'nav_action'):
+ label = e(item.get('label', ''))
+ map_to = item.get('map_to', '')
+ action = item.get('action', '')
+ is_active = ' active' if map_to and map_to == active_view else ''
+ pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
+ cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
+ if action:
+ return (
+ f''
+ )
+ if map_to:
+ return f'{label}'
+ return f'{label}'
+
+ if t == 'nav_menu':
+ raw_label = item.get('label', '')
+ if raw_label == '%MENU_LABEL%':
+ raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
+ label = e(raw_label)
+ children = ''
+ for child in item.get('items', []):
+ child_req = child.get('client_requirement', req)
+ if not passes(child_req, level):
+ continue
+ children += build_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
+ if not children:
+ return ''
+ return (
+ '
'
+ f''
+ f'
{children}
'
+ '
'
+ )
+ return ''
diff --git a/docker/routlin-dash/app/main.py b/docker/routlin-dash/app/main.py
index 57b7469..8db0b9c 100644
--- a/docker/routlin-dash/app/main.py
+++ b/docker/routlin-dash/app/main.py
@@ -1,7 +1,14 @@
-import os, json, sys
-from flask import Flask
-from config_utils import ACCOUNTS_FILE
-from view_common import bp as view_page_bp
+import os, json, sys, importlib.util as _importlib_util
+from flask import Flask, Blueprint, session, redirect, get_flashed_messages
+from markupsafe import Markup
+from config_utils import (
+ ACCOUNTS_FILE, APP_DIR, CONFIGS_DIR, HEALTH_FILE,
+ load_config, queue_command, _find_cmd_in_queues,
+)
+from factory import (
+ LEVEL_RANK, PAGES_DIR, e, client_level, passes, build_items,
+ load_json, render_layout,
+)
from pages.actions.action import bp as actions_bp
from pages.bannedips.action import bp as bannedips_bp
from pages.ddns.action import bp as ddns_bp
@@ -27,7 +34,82 @@ from api_apply_health import bp as api_apply_health_bp
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(24))
-app.register_blueprint(view_page_bp)
+
+# View blueprint ======================================================
+
+bp = Blueprint('view_page', __name__)
+
+page_view_cache = {}
+
+def load_page_view(page_name):
+ if page_name not in page_view_cache:
+ path = os.path.join(PAGES_DIR, page_name, 'view.py')
+ if not os.path.exists(path):
+ page_view_cache[page_name] = None
+ else:
+ spec = _importlib_util.spec_from_file_location(f'page_view_{page_name}', path)
+ mod = _importlib_util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ page_view_cache[page_name] = mod
+ return page_view_cache[page_name]
+
+
+@bp.route('/')
+def index():
+ return serve_view('overview')
+
+@bp.route('/')
+def view(page_name):
+ return serve_view(page_name)
+
+def serve_view(page_name):
+ view_def = load_json(os.path.join(PAGES_DIR, page_name, 'content.json'))
+ if not view_def:
+ from flask import abort
+ abort(404)
+
+ view_req = view_def.get('client_requirement')
+ level = client_level()
+ if not passes(view_req, level):
+ return redirect('/overview' if level > 0 else '/accountlogin')
+
+ cfg = load_config()
+
+ if level >= LEVEL_RANK['administrator']:
+ try:
+ st = json.load(open(HEALTH_FILE))
+ has_problems = any(
+ item.get('status') == 'problem'
+ for section in ('configurations', 'logs', 'services')
+ for item in st.get(section, [])
+ )
+ if has_problems:
+ fix_uuid, _ = _find_cmd_in_queues('fix problems')
+ if fix_uuid is None:
+ queue_command('fix problems', user=session.get('email_address', ''))
+ except Exception:
+ pass
+
+ page_view = load_page_view(page_name)
+ tokens = {}
+ if page_view and hasattr(page_view, 'collect_tokens'):
+ tokens.update(page_view.collect_tokens(cfg))
+
+ if page_name == 'radius' and not os.path.exists(f'{CONFIGS_DIR}/.radius-secret'):
+ queue_command('gen radius')
+
+ flash_html = ''
+ for category, message in get_flashed_messages(with_categories=True):
+ variant = {'error': 'danger', 'warning': 'warning', 'success': 'success'}.get(category, 'info')
+ msg_html = message if isinstance(message, Markup) else e(message)
+ flash_html += f'
'
- )
- elif ptype in ('cloudflare', 'duckdns'):
- tok = p.get('api_token', '')
- row['credentials'] = f'API Token: {e(tok[:20])}...' if tok else '(not set)'
- else:
- row['credentials'] = '-'
- row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', [])))
- rows.append(row)
- return rows
-
- if name == 'accounts':
- rows = []
- for acct in load_accounts().get('accounts', []):
- row = dict(acct)
- row['account_status'] = 'active' if acct.get('hashed_password') else 'pending'
- rows.append(row)
- return rows
-
- if name == 'vpn_peers':
- rows = []
- wg_sorted = sorted(
- [v for v in vlans if v.get('is_vpn')],
- key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
- )
- for i, vlan in enumerate(wg_sorted):
- iface = f'wg{i}'
- vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})'
- for peer in vlan.get('peers', []):
- row = dict(peer)
- row['vlan_display'] = vlan_display
- row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no'
- row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-'
- rows.append(row)
- return rows
-
- return []
-
-
-def load_datasource(spec):
- if spec.startswith('live:'):
- name = spec[5:]
- if name == 'dhcp_leases': return live_dhcp_leases()
- if name == 'vpn_sessions': return live_vpn_sessions()
- return []
- if spec.startswith('config:'):
- return config_datasource(spec[7:])
- return []
-
-factory.load_datasource = load_datasource
-
-
-# Shared IP/DDNS helpers ============================================
-
-def _read_cached_ip():
- """Return (ip, mtime) from the most recent .ddns-last-ip-* file, or ('', None)."""
- try:
- best_ip, best_mtime = '', 0.0
- for fname in os.listdir(CONFIGS_DIR):
- if fname.startswith('.ddns-last-ip-'):
- path = f'{CONFIGS_DIR}/{fname}'
- mtime = os.path.getmtime(path)
- if mtime > best_mtime:
- ip = open(path).read().strip()
- if ip:
- best_ip, best_mtime = ip, mtime
- return best_ip, (best_mtime if best_ip else None)
- except Exception:
- return '', None
-
-def public_ip_info(ddns_cfg):
- """Return (ip_str, domains_sub, last_obtained_str) for stat cards."""
- enabled_p = [p for p in ddns_cfg.get('providers', []) if p.get('enabled', True)]
- all_hosts = []
- for p in enabled_p:
- all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
- domains_sub = ', '.join(all_hosts)
- ip, mtime = _read_cached_ip()
- last_obtained = f'Obtained: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago' if mtime else ''
- if ip:
- return ip, domains_sub, last_obtained
- return 'Offline', domains_sub, ''
-
-def ddns_last_checked():
- try:
- mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
- return f'Last checked: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago'
- except OSError:
- return 'Last checked: ---'
-
-
-# Layout tokens =====================================================
-
-def collect_layout_tokens(cfg):
- vlans = cfg.get('vlans', [])
- net = cfg.get('network_interfaces', {})
- return {
- 'GENERAL_LAN_INTERFACE': str(net.get('lan_interface', '-')),
- 'VPN_VLAN_COUNT': str(sum(1 for v in vlans if v.get('is_vpn'))),
- }
-
-
-# Layout renderer ===================================================
-
-def render_layout(view_id, content_html, tokens, page_name=None):
- css = _load_css()
- level = client_level()
- has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
- titlebar_html = f'
{WEB_APP_DISPLAY_NAME}
'
- navbar_html = build_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
- footer_html = f''
-
- page_hash = config_hash()
- lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
- vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
- current_user = session.get('email_address', '')
- pending = get_pending_entries()
- my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
-
- secs = _seconds_until_next_run()
- locked = _is_locked()
- lock_mtime = _lock_mtime()
- other_bars = ''
- seen_other_users = set()
- for o_uuid, o_ts, o_cmd, o_user in pending:
- if o_user == current_user:
- continue
- if o_user in seen_other_users:
- continue
- seen_other_users.add(o_user)
- display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
- if locked and lock_mtime and o_ts < lock_mtime:
- text = f'{display_user}\'s changes are being applied now...'
- cls = 'info-bar-warning info-bar-running'
- else:
- timing = _format_timing(secs)
- text = (
- f'{display_user} has pending changes which will be applied {timing}.'
- if timing else
- f'{display_user} has pending changes. The processing service is not running.'
- )
- cls = 'info-bar-warning'
- other_bars += f'
{text}
\n'
-
- problem_bars = ''
- if level >= LEVEL_RANK['viewer']:
- try:
- st = json.load(open(HEALTH_FILE))
- problems = []
- for section in ('configurations', 'logs'):
- for item in st.get(section, []):
- if item.get('status') == 'problem':
- problems.append(e(item.get('detail', item.get('name', ''))))
- for item in st.get('services', []):
- if item.get('status') == 'problem':
- name = item.get('name', '')
- utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
- exp_parts, act_parts = [], []
- if not item.get('active_ok'):
- exp_parts.append(item.get('expected_active', 'active'))
- act_parts.append(item.get('active', 'unknown'))
- if not item.get('enabled_ok'):
- exp_parts.append(item.get('expected_enabled', 'enabled'))
- act_parts.append(item.get('enabled', 'unknown'))
- problems.append(e(
- f"The {utype} `{name}` is expected to be "
- f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
- ))
- has_problems = bool(problems)
- fix_suffix = ''
- fix_uuid = None
- if has_problems:
- if level < LEVEL_RANK['administrator']:
- fix_suffix = 'Please contact an administrator.'
- else:
- fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
- if _apply_changes_immediately():
- if _is_locked():
- mtime = _lock_mtime()
- fix_suffix = (
- 'Fix is being applied now...'
- if fix_ts and mtime and fix_ts < mtime
- else 'Fix will be applied on the next run.'
- )
- else:
- timing = _format_timing(_seconds_until_next_run())
- fix_suffix = (
- f'Fix will be applied {timing}.'
- if timing else
- 'Fix pending. The processing service is not running.'
- )
- else:
- fix_suffix = (
- 'Fix pending. Click Apply Now below to fix.'
- if view_id == 'actions' else
- 'Fix pending. Visit the Actions page ASAP to apply fix.'
- )
- if problems:
- problems_list = (
- '
'
- + ''.join(f'
{d}
' for d in problems)
- + '
'
- )
- uuid_attr = (
- f' data-health-uuid="{e(fix_uuid)}"'
- if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else ''
- )
- fix_html = (
- f'
{fix_suffix}
'
- if fix_suffix else ''
- )
- content = (
- '
'
- '
Health check - problems found:
'
- + problems_list + fix_html
- + '
'
- )
- problem_bars += f'
{content}
\n'
- except Exception:
- pass
-
- pending_bar = ''
- if has_pending_alert and not problem_bars and view_id != 'actions':
- pending_bar = (
- '
'
- 'You have actions pending. Please visit the Actions page.'
- '
\n'
- )
-
- return (
- '\n\n\n'
- ' \n'
- ' \n'
- f' {WEB_APP_DISPLAY_NAME}\n'
- f' \n'
- '\n\n'
- f'{titlebar_html}\n'
- f'{navbar_html}\n'
- f'\n{pending_bar}{problem_bars}{other_bars}{content_html}\n\n'
- f'{footer_html}\n'
- f'\n'
- f'\n'
- '\n'
- )
-
-
-def build_navbar(active_view, level, tokens, pending_alert=False):
- navbar_data = load_json(NAVBAR_FILE)
- left, right = [], []
- for item in navbar_data.get('items', []):
- req = item.get('client_requirement')
- align = item.get('align', 'left')
- if not passes(req, level):
- continue
- frag = build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
- (right if align == 'right' else left).append(frag)
- return (
- ''
- )
-
-
-def build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
- req = item.get('client_requirement', inherited_req)
- t = item.get('type', '')
-
- if t in ('nav_item', 'nav_action'):
- label = e(item.get('label', ''))
- map_to = item.get('map_to', '')
- action = item.get('action', '')
- is_active = ' active' if map_to and map_to == active_view else ''
- pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
- cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
- if action:
- return (
- f''
- )
- if map_to:
- return f'{label}'
- return f'{label}'
-
- if t == 'nav_menu':
- raw_label = item.get('label', '')
- if raw_label == '%MENU_LABEL%':
- raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
- label = e(raw_label)
- children = ''
- for child in item.get('items', []):
- child_req = child.get('client_requirement', req)
- if not passes(child_req, level):
- continue
- children += build_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
- if not children:
- return ''
- return (
- '
'
- f''
- f'
{children}
'
- '
'
- )
- return ''
-
-
-# Inline JavaScript =================================================
-
-def _inline_js(page_name=None):
- big_validate_js = factory.build_big_validate()
- try:
- with open(COMMON_JS_FILE) as f:
- app_js = f.read()
- except Exception:
- app_js = ''
- page_js = ''
- if page_name:
- page_js_path = os.path.join(PAGES_DIR, page_name, 'page.js')
- try:
- with open(page_js_path) as f:
- page_js = f.read()
- except Exception:
- pass
- return big_validate_js + '\n' + app_js + ('\n' + page_js if page_js else '')
-
-
-# Dynamic page view loader ==========================================
-
-_page_view_cache = {}
-
-def _load_page_view(page_name):
- if page_name not in _page_view_cache:
- path = os.path.join(PAGES_DIR, page_name, 'view.py')
- if not os.path.exists(path):
- _page_view_cache[page_name] = None
- else:
- spec = _importlib_util.spec_from_file_location(f'page_view_{page_name}', path)
- mod = _importlib_util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- _page_view_cache[page_name] = mod
- return _page_view_cache[page_name]
-
-
-# Routes ============================================================
-
-@bp.route('/')
-def index():
- return serve_view('overview')
-
-@bp.route('/')
-def view(page_name):
- return serve_view(page_name)
-
-def serve_view(page_name):
- view_def = load_json(os.path.join(PAGES_DIR, page_name, 'content.json'))
- if not view_def:
- from flask import abort
- abort(404)
-
- view_req = view_def.get('client_requirement')
- level = client_level()
- if not passes(view_req, level):
- return redirect('/overview' if level > 0 else '/accountlogin')
-
- cfg = load_config()
- tokens = collect_layout_tokens(cfg)
-
- # Auto-queue health fix for every administrator page load
- if level >= LEVEL_RANK['administrator']:
- try:
- st = json.load(open(HEALTH_FILE))
- has_problems = any(
- item.get('status') == 'problem'
- for section in ('configurations', 'logs', 'services')
- for item in st.get(section, [])
- )
- if has_problems:
- fix_uuid, _ = _find_cmd_in_queues('fix problems')
- if fix_uuid is None:
- queue_command('fix problems', user=session.get('email_address', ''))
- except Exception:
- pass
-
- page_view = _load_page_view(page_name)
- if page_view and hasattr(page_view, 'collect_tokens'):
- tokens.update(page_view.collect_tokens(cfg))
-
- if page_name == 'radius' and not os.path.exists(f'{CONFIGS_DIR}/.radius-secret'):
- queue_command('gen radius')
-
- flash_html = ''
- for category, message in get_flashed_messages(with_categories=True):
- variant = {'error': 'danger', 'warning': 'warning', 'success': 'success'}.get(category, 'info')
- msg_html = message if isinstance(message, Markup) else e(message)
- flash_html += f'
{msg_html}
'
-
- content_html = flash_html + build_items(view_def.get('items', []), tokens, view_req)
- return render_layout(page_name, content_html, tokens, page_name=page_name)
diff --git a/docker/routlin-dash/app/view_page.py b/docker/routlin-dash/app/view_page.py
deleted file mode 100644
index 505eaea..0000000
--- a/docker/routlin-dash/app/view_page.py
+++ /dev/null
@@ -1,1296 +0,0 @@
-from flask import Blueprint, session, redirect, get_flashed_messages
-from markupsafe import Markup
-import json, re, subprocess, os, sys, glob
-import sanitize
-import validation as validate
-from datetime import datetime, timezone
-from config_utils import config_hash, get_pending_entries, get_dashboard_pending, get_dashboard_done, load_all_groups, revert_group, get_done_timestamps, queue_command, _find_cmd_in_queues, _entry_ts_from_queue, _apply_changes_immediately, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME, CONFIGS_DIR, DATA_DIR, WWW_DIR, ACCOUNTS_FILE, APP_DIR
-import factory
-from factory import LEVEL_RANK, e, client_level, passes, build_items, build_snap_val, snap_expand_row
-PAGES_DIR = os.path.join(APP_DIR, 'pages')
-NAVBAR_FILE = os.path.join(APP_DIR, 'navbar.json')
-CSS_FILE = os.path.join(DATA_DIR, 'styles.css')
-COMMON_JS_FILE = os.path.join(DATA_DIR, 'common.js')
-BLOCKLISTS_DIR = os.path.join(CONFIGS_DIR, 'blocklists')
-HEALTH_FILE = os.path.join(CONFIGS_DIR, '.health')
-
-bp = Blueprint('view_page', __name__)
-
-try:
- import manuf as _manuf_mod
- _mac_parser = _manuf_mod.MacParser()
-except Exception:
- _mac_parser = None
-
-try:
- from mac_vendor_lookup import MacLookup as _MacLookup
- _mac_lookup = _MacLookup()
-except Exception:
- _mac_lookup = None
-
-def _get_vendor(mac):
- short, long = '', ''
- if _mac_parser:
- try:
- short = _mac_parser.get_manuf(mac) or ''
- except Exception:
- pass
- if _mac_lookup:
- try:
- long = _mac_lookup.lookup(mac) or ''
- except Exception:
- pass
- return (short, long)
-
-def _vendor_cell(vendor):
- short, long = vendor
- display = short if short else (long[:8] if long else '')
- if not display:
- return '-'
- if long:
- return f'{e(display)}'
- return e(display)
-
-
-# File loaders ======================================================
-
-def load_json(path):
- try:
- with open(path) as f:
- return json.load(f)
- except Exception as ex:
- print(f'[view_page] ERROR loading {path}: {ex}', file=sys.stderr)
- return {}
-
-def load_config(): return load_json(f'{CONFIGS_DIR}/config.json')
-def load_ddns(): return load_config().get('ddns', {})
-def load_accounts(): return load_json(ACCOUNTS_FILE)
-
-def _load_css():
- try:
- with open(CSS_FILE) as f:
- return f.read()
- except Exception:
- return ''
-
-
-def _load_icon(name):
- try:
- with open(f'{WWW_DIR}/icons/{name}.svg') as f:
- return f.read().strip()
- except Exception:
- return ''
-
-
-
-
-# Shell helper ======================================================
-
-def run(cmd):
- try:
- r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
- return r.stdout.strip()
- except Exception:
- return ''
-
-
-def get_system_interfaces():
- _EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth',
- 'tun', 'tap', 'ppp', 'virbr',
- 'podman', 'vnet', 'macvtap', 'fc-')
- try:
- return sorted(
- n for n in os.listdir('/sys/class/net')
- if not n.startswith(_EXCLUDE_PREFIXES)
- and os.path.exists(f'/sys/class/net/{n}/device')
- )
- except Exception:
- return []
-
-
-def iface_info(iface):
- base = f'/sys/class/net/{iface}'
- def _rd(path):
- try:
- with open(f'{base}/{path}') as f:
- return f.read().strip()
- except Exception:
- return None
- wireless = os.path.isdir(f'{base}/wireless')
- state = (_rd('operstate') or 'unknown').upper()
- if state == 'UNKNOWN':
- state = 'UP'
- carrier_raw = _rd('carrier')
- carrier = (carrier_raw == '1') if carrier_raw is not None else None
- speed_raw = _rd('speed')
- try:
- mbps = int(speed_raw)
- if mbps <= 0:
- speed = None
- elif mbps >= 1000 and mbps % 1000 == 0:
- speed = f'{mbps // 1000} Gbps'
- else:
- speed = f'{mbps} Mbps'
- except (TypeError, ValueError):
- speed = None
- mac = _rd('address')
- perm_mac = _rd('perm_address')
- if perm_mac and perm_mac == '00:00:00:00:00:00':
- perm_mac = None
- # DEBUG
- # if not perm_mac: perm_mac = 'de:ad:be:ef:f0:0d'
- def _int(val):
- try: return int(val) if val else None
- except ValueError: return None
- return {
- 'name': iface,
- 'wireless': wireless,
- 'state': state,
- 'carrier': carrier,
- 'speed': speed,
- 'mtu': _rd('mtu'),
- 'min_mtu': _int(_rd('min_mtu')),
- 'max_mtu': _int(_rd('max_mtu')),
- 'mac': mac,
- 'perm_mac': perm_mac,
- }
-
-
-def iface_status(iface):
- """Return link state for iface by reading /sys/class/net//operstate.
- Returns INVALID if the interface does not exist, otherwise UP/DOWN/UNKNOWN/etc."""
- if not iface:
- return 'INVALID'
- safe = re.sub(r'[^A-Za-z0-9._-]', '', iface)
- if not safe:
- return 'INVALID'
- try:
- with open(f'/sys/class/net/{safe}/operstate') as f:
- state = f.read().strip().upper()
- return state if state else 'UP'
- except OSError:
- return 'INVALID'
-
-
-def resolve_iface(vlan, cfg):
- """Compute interface name from is_vpn + stored vlan_id + general.lan_interface."""
- if vlan.get('is_vpn'):
- wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')]
- wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0))
- idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0)
- return f'wg{idx}'
- lan = cfg.get('network_interfaces', {}).get('lan_interface', 'eth0')
- vid = vlan.get('vlan_id') or 1
- return lan if vid == 1 else f'{lan}.{vid}'
-
-
-# Live data loaders =================================================
-
-def _parse_lease_secs(s):
- s = str(s).strip().lower()
- try:
- if s.endswith('h'): return int(s[:-1]) * 3600
- if s.endswith('m'): return int(s[:-1]) * 60
- if s.endswith('d'): return int(s[:-1]) * 86400
- except ValueError:
- pass
- return None
-
-
-def live_dhcp_leases():
- rows = []
- now = int(datetime.now(tz=timezone.utc).timestamp())
- cfg = load_config()
- vlans = cfg.get('vlans', [])
- vlan_lease_secs = {
- v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', ''))
- for v in vlans if v.get('name')
- }
- mac_to_res = {
- r['mac'].lower(): r['hostname']
- for r in cfg.get('dhcp_reservations', [])
- if r.get('mac') and r.get('hostname')
- }
- for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'):
- stem = os.path.basename(leases_file)
- vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')]
- lease_secs = vlan_lease_secs.get(vlan_name)
- try:
- with open(leases_file) as f:
- for line in f:
- parts = line.strip().split()
- if len(parts) < 4:
- continue
- expiry = int(parts[0])
- if expiry < now:
- continue
- obtained_ts = (expiry - lease_secs) if lease_secs else None
- renews_ts = (expiry - lease_secs // 2) if lease_secs else None
- if obtained_ts is None:
- last_active = '-'
- elif obtained_ts <= now:
- last_active = relative_time(obtained_ts, now, short=True) + ' ago'
- elif renews_ts and renews_ts > now:
- last_active = 'ETA ' + relative_time(renews_ts, now, short=True)
- else:
- last_active = 'ETA soon'
- mac_norm = parts[1].lower()
- device_h = parts[3] if parts[3] != '*' else None
- res_h = mac_to_res.get(mac_norm)
- if res_h and device_h and device_h.lower() != res_h.lower():
- hostname_html = f'{e(res_h)} ({e(device_h)})'
- elif res_h:
- hostname_html = f'{e(res_h)}'
- elif device_h:
- hostname_html = e(device_h)
- else:
- hostname_html = '-'
- rows.append({
- 'hostname': hostname_html,
- 'ip_address': parts[2],
- 'mac_address': parts[1],
- 'vendor': _vendor_cell(_get_vendor(parts[1])),
- 'vlan_name': vlan_name,
- 'last_active': last_active,
- 'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
- })
- except Exception:
- pass
- return rows
-
-def _vlan_name_for_ip(ip):
- import ipaddress
- for vlan in load_config().get('vlans', []):
- subnet = vlan.get('subnet', '')
- mask = vlan.get('subnet_mask', 24)
- if not subnet:
- continue
- try:
- if ipaddress.ip_address(ip) in ipaddress.ip_network(f'{subnet}/{mask}', strict=False):
- return vlan.get('name', '-')
- except Exception:
- pass
- return '-'
-
-def fmt_timestamp(ts):
- try:
- return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
- except Exception:
- return '-'
-
-def relative_time(ts1, ts2, short=False):
- try:
- diff = abs(int(ts1) - int(ts2))
- if diff < 60:
- return f'{diff}s' if short else f'{diff} second{"s" if diff != 1 else ""}'
- m = diff // 60
- if m < 60:
- return f'{m}m' if short else f'{m} minute{"s" if m != 1 else ""}'
- h, rem_m = divmod(m, 60)
- if h < 24:
- if short:
- return f'{h}h {rem_m}m' if rem_m else f'{h}h'
- return f'{h}h {rem_m}m' if rem_m else f'{h} hour{"s" if h != 1 else ""}'
- d = h // 24
- if d < 365:
- return f'{d}d' if short else f'{d} day{"s" if d != 1 else ""}'
- y = d // 365
- return f'{y}y' if short else f'{y} year{"s" if y != 1 else ""}'
- except Exception:
- return ''
-
-def live_vpn_sessions():
- rows = []
- out = run('wg show all dump 2>/dev/null')
- for line in out.splitlines():
- parts = line.split('\t')
- if len(parts) == 9:
- interface, _pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
- rows.append({
- 'peer_name': _pubkey[:16] + '...',
- 'interface': interface,
- 'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
- 'endpoint': endpoint if endpoint != '(none)' else '-',
- 'last_handshake': fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
- 'rx_bytes': fmt_bytes(int(rx)) if rx.isdigit() else '-',
- 'tx_bytes': fmt_bytes(int(tx)) if tx.isdigit() else '-',
- })
- return rows
-
-def fmt_bytes(n):
- for unit in ('B', 'KB', 'MB', 'GB'):
- if n < 1024:
- return f'{n:.1f} {unit}'
- n /= 1024
- return f'{n:.1f} TB'
-
-
-# Config data loaders ===============================================
-
-def config_datasource(name):
- cfg = load_config()
- vlans = cfg.get('vlans', [])
-
- if name == 'interfaces':
- gen = cfg.get('network_interfaces', {})
- wan = gen.get('wan_interface', '')
- lan = gen.get('lan_interface', '')
- return [
- {'iface_type': 'WAN', 'interface': wan, 'status': iface_status(wan)},
- {'iface_type': 'LAN', 'interface': lan, 'status': iface_status(lan)},
- ]
-
- if name == 'banned_ips':
- return cfg.get('banned_ips', [])
-
- if name == 'host_overrides':
- return cfg.get('host_overrides', [])
-
- if name == 'blocklists':
- rows = []
- for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
- row = dict(bl)
- bl_path = os.path.join(BLOCKLISTS_DIR, bl.get("save_as", ""))
- try:
- with open(bl_path) as f:
- row['domain_count'] = str(sum(1 for _ in f))
- row['last_updated'] = fmt_timestamp(int(os.path.getmtime(bl_path)))
- except Exception:
- row['domain_count'] = '-'
- row['last_updated'] = '-'
- rows.append(row)
- return rows
-
- if name == 'vlans':
- bl_desc = {b['name']: b.get('description', b['name']) for b in cfg.get('dns_blocking', {}).get('blocklists', []) if 'name' in b}
- rows = []
- for v in sorted(vlans, key=lambda x: x.get('vlan_id') or 0):
- row = {k: v.get(k) for k in ('name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn', 'dnsmasq_log_queries')}
- row['vlan_id'] = v.get('vlan_id')
- row['interface'] = resolve_iface(v, cfg)
- row['use_blocklists'] = json.dumps([
- {'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', [])
- ])
- _prefix = v.get('subnet_mask', 24)
- _n_octets = 1 if _prefix >= 24 else 2 if _prefix >= 16 else 3 if _prefix >= 8 else 4
- row['server_identity_ips'] = json.dumps([
- {
- 'n': s['ip'],
- 'd': ' | '.join(filter(None, [s['ip'], s.get('description'), s.get('hostname')])),
- 'short': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
- 'mini': '.' + '.'.join(s['ip'].split('.')[-_n_octets:]),
- }
- for s in v.get('server_identities', []) if s.get('ip')
- ])
- row['server_identity_descriptions'] = json.dumps([
- s.get('description', '') for s in v.get('server_identities', []) if s.get('ip')
- ])
- row['server_identity_hostnames'] = json.dumps([
- s.get('hostname', '') for s in v.get('server_identities', []) if s.get('ip')
- ])
- row['server_identity_gateway'] = (
- v.get('dhcp_information', {}).get('explicit_overrides', {}).get('gateway', '')
- )
- _dns = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('dns_servers', [])
- row['server_identity_dns_servers'] = '\n'.join(_dns) if isinstance(_dns, list) else str(_dns or '')
- _ntp = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('ntp_servers', [])
- row['server_identity_ntp_servers'] = '\n'.join(_ntp) if isinstance(_ntp, list) else str(_ntp or '')
- row['gateway'] = row['server_identity_gateway']
- row['dns_servers'] = row['server_identity_dns_servers']
- row['ntp_servers'] = row['server_identity_ntp_servers']
- row['dns_servers_override'] = 1 if row['server_identity_dns_servers'] else 0
- row['ntp_servers_override'] = 1 if row['server_identity_ntp_servers'] else 0
- _dhi = v.get('dhcp_information', {})
- row['dhcp_pool_start'] = _dhi.get('dynamic_pool_start', '')
- row['dhcp_pool_end'] = _dhi.get('dynamic_pool_end', '')
- _lt = _dhi.get('lease_time', '')
- if _lt and len(_lt) > 1 and _lt[:-1].isdigit() and _lt[-1] in 'mhd':
- row['dhcp_lease_time'] = _lt[:-1]
- row['dhcp_lease_unit'] = {'m': 'minutes', 'h': 'hours', 'd': 'days'}[_lt[-1]]
- else:
- row['dhcp_lease_time'] = ''
- row['dhcp_lease_unit'] = ''
- row['dhcp_domain'] = _dhi.get('domain', '')
- row['server_identities_json'] = json.dumps(v.get('server_identities', []))
- rows.append(row)
- return rows
-
- if name == 'inter_vlan_exceptions':
- return cfg.get('inter_vlan_exceptions', [])
-
- if name == 'port_forwarding':
- return cfg.get('port_forwarding', [])
-
- if name == 'port_wrangling':
- rows = []
- for r in cfg.get('port_wrangling', []):
- row = dict(r)
- row['vlan_name'] = r.get('vlan', '-')
- rows.append(row)
- return rows
-
- if name == 'dhcp_reservations':
- rows = []
- for res in cfg.get('dhcp_reservations', []):
- row = dict(res)
- row['vlan_name'] = res.get('vlan', '-')
- rows.append(row)
- return rows
-
- if name == 'ddns_providers':
- ddns = load_ddns()
- rows = []
- for p in ddns.get('providers', []):
- row = dict(p)
- ptype = p.get('provider', '').lower()
- if ptype == 'noip':
- row['credentials'] = (
- '
'
-
- tokens['CHANGE_HISTORY_HTML'] = history_html
- tokens['NO_HISTORY'] = 'true' if not all_groups else ''
-
- servers = dns.get('upstream_servers', [])
- tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
- tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
- tokens['DNS_UPSTREAM_SERVERS_JSON'] = json.dumps(servers)
- tokens['OVERVIEW_UPSTREAM_SERVERS'] = ', '.join(servers) or '-'
-
- non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
- vlan_names = [v.get('name', '') for v in vlans]
- tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-'
- tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans))
- tokens['STAT_LEASE_COUNT'] = str(len(live_dhcp_leases()))
- filter_opts = '' + ''.join(
- f'' for n in vlan_names
- )
- tokens['VLAN_FILTER_OPTIONS'] = filter_opts
- tokens['VLAN_NAMES_AS_OPTIONS'] = json.dumps([{'value': n, 'label': n} for n in vlan_names])
- tokens['VPN_VLAN_COUNT'] = str(sum(1 for v in vlans if v.get('is_vpn')))
- _res_ips_by_vlan = {}
- _res_hosts_by_vlan = {}
- for _v in vlans:
- _vn = _v.get('name', '')
- if not _vn:
- continue
- _vlan_res = [r for r in cfg.get('dhcp_reservations', []) if r.get('vlan') == _vn]
- _res_ips_by_vlan[_vn] = [r['ip'] for r in _vlan_res if r.get('ip') and r['ip'] != 'dynamic']
- _res_hosts_by_vlan[_vn] = [r['hostname'] for r in _vlan_res if r.get('hostname')]
- tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(_res_ips_by_vlan)
- tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(_res_hosts_by_vlan)
- tokens['VLAN_SUBNET_INFO_JSON'] = json.dumps({
- v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
- for v in vlans if v.get('name') and v.get('subnet')
- })
- tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
- tokens['EXISTING_VLAN_NAMES_JSON'] = json.dumps([v.get('name') for v in vlans])
- _dv = next((v for v in vlans if v.get('radius_default')), None)
- tokens['RADIUS_DEFAULT_VLAN'] = f'"{_dv["name"]}" (VLAN {_dv["vlan_id"]})' if _dv else 'none set'
- try:
- tokens['RADIUS_SECRET'] = open(f'{CONFIGS_DIR}/.radius-secret').read().strip()
- except OSError:
- tokens['RADIUS_SECRET'] = '(Generation is pending - visit Actions to apply generation command)'
- _fr = cfg.get('radius', {})
- _fr_opts = _fr.get('options', {})
- _fr_gen = _fr.get('general', {})
- tokens['RADIUS_MAC_FORMAT'] = _fr_opts.get('mac_format', 'aabbccddeeff')
- tokens['RADIUS_APPLY_TO'] = _fr_opts.get('apply_to', 'all')
- tokens['RADIUS_LOGGING'] = 'true' if _fr_gen.get('logging', False) else ''
- tokens['RADIUS_GEN_LOG_MAX_KB'] = str(_fr_gen.get('log_max_kb', 1024))
- tokens['RADIUS_LOG_TAIL'], tokens['RADIUS_LOG_SUMMARY'] = _radius_log_tail()
- tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True)))
- tokens['STAT_BLOCKLIST_COUNT'] = str(len(cfg.get('dns_blocking', {}).get('blocklists', [])))
- tokens['BLOCKLIST_STATS_HTML'] = _blocklist_stats_html(cfg)
-
- ddns = load_ddns()
- ddns_gen = ddns.get('general', {})
- tokens['DDNS_TIMER_INTERVAL'] = ddns_gen.get('timer_interval', '-')
- _interval_secs = _parse_interval_to_seconds(ddns_gen.get('timer_interval', '')) or 600
- tokens['DDNS_TIMER_INTERVAL_MINS'] = str(_interval_secs // 60)
- tokens['DDNS_GEN_LOG_MAX_KB'] = str(ddns_gen.get('log_max_kb', 1024))
- tokens['DDNS_GEN_LOG_ERRORS_ONLY'] = 'true' if ddns_gen.get('log_errors_only') else 'false'
- enabled_p = [p for p in ddns.get('providers', []) if p.get('enabled', True)]
- tokens['STAT_DDNS_PROVIDER_COUNT'] = str(len(enabled_p))
- _ip_check = ddns.get('ip_check_services', [])
- _http_svc = [s['url'] for s in _ip_check if s.get('type') == 'http']
- _dig_svc = [s['url'] for s in _ip_check if s.get('type') == 'dig']
- tokens['STAT_IP_CHECK_TOTAL'] = str(len(_ip_check))
- tokens['STAT_IP_CHECK_SUB'] = f'{len(_http_svc)} http and {len(_dig_svc)} dig'
- tokens['IP_CHECK_HTTP_JSON'] = json.dumps(_http_svc)
- tokens['IP_CHECK_DIG_JSON'] = json.dumps(_dig_svc)
- _ddns_labels = {'noip': 'No-IP', 'cloudflare': 'Cloudflare', 'duckdns': 'DuckDNS'}
- tokens['DDNS_PROVIDER_OPTIONS'] = json.dumps([
- {'value': p, 'label': _ddns_labels.get(p, p.title())}
- for p in validate.VALID_DDNS_PROVIDERS
- ])
-
- wg_vlans_list = sorted(
- [v for v in vlans if v.get('is_vpn')],
- key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
- )
- tokens['VPN_VLAN_OPTIONS'] = json.dumps([
- {'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
- for i, v in enumerate(wg_vlans_list)
- ])
- wg_vlan = wg_vlans_list[0] if wg_vlans_list else {}
- vpn = wg_vlan.get('vpn_information', {})
- overrides = vpn.get('explicit_overrides', {})
- tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
- tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
- tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
- tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_servers', ''))
- tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
-
- # Compute gateway from server_identities (lowest last-octet), fallback to first subnet host
- try:
- import ipaddress as _ipaddress
- ident_ips = [s['ip'] for s in wg_vlan.get('server_identities', []) if s.get('ip')]
- if ident_ips:
- default_gw = str(min((_ipaddress.IPv4Address(ip) for ip in ident_ips),
- key=lambda x: x.packed[-1]))
- else:
- wg_net = _ipaddress.IPv4Network(
- f"{wg_vlan['subnet']}/{wg_vlan['subnet_mask']}", strict=False)
- default_gw = str(next(wg_net.hosts()))
- tokens['VPN_GATEWAY'] = overrides.get('gateway') or default_gw
- except Exception:
- tokens['VPN_GATEWAY'] = ''
-
- ip_str, sub_str, next_interval, last_obtained = _public_ip_info(ddns)
- tokens['STAT_PUBLIC_IP'] = ip_str
- tokens['STAT_DDNS_HOSTNAME'] = sub_str
- tokens['STAT_DDNS_NEXT_INTERVAL'] = next_interval
- tokens['STAT_PUBLIC_IP_LAST_OBTAINED'] = last_obtained
- tokens['STAT_PUBLIC_IP_LAST_CHECKED'] = _ddns_last_checked()
- tokens['DDNS_LOG_TAIL'], tokens['DDNS_LOG_SUMMARY'] = _ddns_log_tail()
- tokens['STAT_UPTIME'] = run('uptime -p') or '-'
- tokens['STAT_NFTABLES_STATUS'] = 'Active' if run('nft list tables 2>/dev/null').strip() else 'Inactive'
-
- dns_stats = get_dnsmasq_stats()
- tokens['DNS_STAT_QUERIES'] = dns_stats['queries']
- tokens['DNS_STAT_HITS'] = dns_stats['hits']
- tokens['DNS_STAT_HIT_RATE'] = dns_stats['hit_rate']
- tokens['DNS_STAT_FORWARDED'] = dns_stats['forwarded']
- tokens['DNS_STAT_AUTH'] = dns_stats['auth']
- tokens['DNS_STAT_TCP_PEAK'] = dns_stats['tcp_peak']
- tokens['STAT_BLOCKED_TODAY'] = _count_blocked_today()
- tokens['STAT_BLOCKED_DOMAINS'] = _count_blocked_domains()
- tokens['STAT_BL_LAST_UPDATE'] = _bl_last_update()
- tokens['PREF_EMAIL'] = session.get('email_address', '')
- tokens['PREF_TIMEZONE'] = session.get('timezone', '')
-
- blank = [{'value': '', 'label': '-- Select timezone --'}]
- tokens['TIMEZONE_OPTIONS'] = json.dumps(
- blank + [{'value': tz, 'label': tz} for tz in sanitize.VALID_TIMEZONES]
- )
- tokens['PROTOCOL_OPTIONS'] = json.dumps([
- {'value': 'tcp', 'label': 'TCP'},
- {'value': 'udp', 'label': 'UDP'},
- {'value': 'both', 'label': 'TCP/UDP'},
- ])
- tokens['BLOCKLIST_FORMAT_OPTIONS'] = json.dumps([
- {'value': 'hosts', 'label': 'hosts (hosts file format)'},
- {'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
- ])
- tokens['BLOCKLIST_NAME_OPTIONS'] = json.dumps([
- {'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
- for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
- ])
- tokens['ACCOUNT_LEVEL_OPTIONS'] = json.dumps([
- {'value': 'viewer', 'label': 'Viewer (read-only access to live data)'},
- {'value': 'administrator', 'label': 'Administrator (can modify configuration)'},
- {'value': 'manager', 'label': 'Manager (full access including account management)'},
- ])
-
- return tokens
-
-
-# Layout renderer ===================================================
-
-def render_layout(view_id, content_html, tokens, page_name=None):
- css = _load_css()
- level = client_level()
- has_pending_alert = not _apply_changes_immediately() and bool(get_dashboard_pending())
- titlebar_html = f'
{WEB_APP_DISPLAY_NAME}
'
- navbar_html = build_navbar(view_id, level, tokens, pending_alert=has_pending_alert)
- footer_html = f''
-
- page_hash = config_hash()
- lan_iface = e(tokens.get('GENERAL_LAN_INTERFACE', ''))
- vpn_count = tokens.get('VPN_VLAN_COUNT', '0')
- current_user = session.get('email_address', '')
- pending = get_pending_entries()
- my_uuid = next((u for u, t, c, usr in pending if usr == current_user and c != 'fix problems'), None)
-
- secs = _seconds_until_next_run()
- locked = _is_locked()
- lock_mtime = _lock_mtime()
- other_bars = ''
- seen_other_users = set()
- for o_uuid, o_ts, o_cmd, o_user in pending:
- if o_user == current_user:
- continue
- if o_user in seen_other_users:
- continue
- seen_other_users.add(o_user)
- _display_user = 'Another user' if o_user in ('unknown', '') else e(o_user)
- if locked and lock_mtime and o_ts < lock_mtime:
- text = f'{_display_user}\'s changes are being applied now...'
- cls = 'info-bar-warning info-bar-running'
- else:
- timing = _format_timing(secs)
- text = f'{_display_user} has pending changes which will be applied {timing}.' if timing else f'{_display_user} has pending changes. The processing service is not running.'
- cls = 'info-bar-warning'
- other_bars += f'
{text}
\n'
-
- problem_bars = ''
- if level >= LEVEL_RANK['viewer']:
- try:
- import json as _j
- st = _j.load(open(HEALTH_FILE))
- problems = []
- for section in ('configurations', 'logs'):
- for item in st.get(section, []):
- if item.get('status') == 'problem':
- problems.append(e(item.get('detail', item.get('name', ''))))
- for item in st.get('services', []):
- if item.get('status') == 'problem':
- name = item.get('name', '')
- utype = 'timer' if name.endswith('.timer') else 'service' if name.endswith('.service') else 'unit'
- exp_parts, act_parts = [], []
- if not item.get('active_ok'):
- exp_parts.append(item.get('expected_active', 'active'))
- act_parts.append(item.get('active', 'unknown'))
- if not item.get('enabled_ok'):
- exp_parts.append(item.get('expected_enabled', 'enabled'))
- act_parts.append(item.get('enabled', 'unknown'))
- problems.append(e(
- f"The {utype} `{name}` is expected to be "
- f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}."
- ))
- has_problems = bool(problems)
- fix_suffix = ''
- fix_uuid = None
- if has_problems:
- if level < LEVEL_RANK['administrator']:
- fix_suffix = 'Please contact an administrator.'
- else:
- fix_uuid, fix_ts = _find_cmd_in_queues('fix problems')
- if _apply_changes_immediately():
- if _is_locked():
- mtime = _lock_mtime()
- fix_suffix = ('Fix is being applied now...' if fix_ts and mtime and fix_ts < mtime
- else 'Fix will be applied on the next run.')
- else:
- timing = _format_timing(_seconds_until_next_run())
- fix_suffix = (f'Fix will be applied {timing}.' if timing
- else 'Fix pending. The processing service is not running.')
- else:
- fix_suffix = ('Fix pending. Click Apply Now below to fix.'
- if view_id == 'actions' else
- 'Fix pending. Visit the Actions page ASAP to apply fix.')
- if problems:
- problems_list = ('
'
- + ''.join(f'
{d}
' for d in problems)
- + '
')
- uuid_attr = (f' data-health-uuid="{e(fix_uuid)}"'
- if fix_uuid and _entry_ts_from_queue(fix_uuid) is not None else '')
- fix_html = (f'
{fix_suffix}
'
- if fix_suffix else '')
- content = ('
'
- '
Health check - problems found:
'
- + problems_list + fix_html
- + '
')
- problem_bars += f'
{content}
\n'
- except Exception:
- pass
-
- pending_bar = ''
- if has_pending_alert and not problem_bars and view_id != 'actions':
- pending_bar = (
- '
'
- 'You have actions pending. Please visit the Actions page.'
- '
\n'
- )
-
- return (
- '\n\n\n'
- ' \n'
- ' \n'
- f' {WEB_APP_DISPLAY_NAME}\n'
- f' \n'
- '\n\n'
- f'{titlebar_html}\n'
- f'{navbar_html}\n'
- f'\n{pending_bar}{problem_bars}{other_bars}{content_html}\n\n'
- f'{footer_html}\n'
- f'\n'
- f'\n'
- '\n'
- )
-
-
-def build_navbar(active_view, level, tokens, pending_alert=False):
- navbar_data = load_json(NAVBAR_FILE)
- left, right = [], []
- for item in navbar_data.get('items', []):
- req = item.get('client_requirement')
- align = item.get('align', 'left')
- if not passes(req, level):
- continue
- frag = build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=req, pending_alert=pending_alert)
- (right if align == 'right' else left).append(frag)
-
- return (
- ''
- )
-
-
-def build_nav_item(item, active_view, level, in_dropdown=False, inherited_req=None, pending_alert=False):
- req = item.get('client_requirement', inherited_req)
- t = item.get('type', '')
-
- if t in ('nav_item', 'nav_action'):
- label = e(item.get('label', ''))
- map_to = item.get('map_to', '')
- action = item.get('action', '')
- is_active = ' active' if map_to and map_to == active_view else ''
- pending = ' nav-item-pending' if pending_alert and map_to == 'actions' else ''
- cls = f'dropdown-item{is_active}' if in_dropdown else f'nav-item{is_active}{pending}'
- if action:
- return (
- f''
- )
- if map_to:
- return f'{label}'
- return f'{label}'
-
- if t == 'nav_menu':
- raw_label = item.get('label', '')
- if raw_label == '%MENU_LABEL%':
- raw_label = 'Configure' if level >= LEVEL_RANK['administrator'] else 'View'
- label = e(raw_label)
- children = ''
- for child in item.get('items', []):
- child_req = child.get('client_requirement', req)
- if not passes(child_req, level):
- continue
- children += build_nav_item(child, active_view, level, in_dropdown=True, inherited_req=req, pending_alert=pending_alert)
- if not children:
- return ''
- return (
- '