import copy, json, subprocess, hashlib, os, uuid import os as _os from datetime import datetime, timezone from flask import session APP_DIR = _os.path.dirname(_os.path.abspath(__file__)) CONFIGS_DIR = '/routlin_location' WWW_DIR = '/www' ACCOUNTS_FILE = f'{APP_DIR}/authorized_accounts.json' CONFIG_FILE = f'{CONFIGS_DIR}/config.json' DASHBOARD_QUEUE = f'{CONFIGS_DIR}/.dashboard-queue' DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done' DASHBOARD_LAST_RUN = f'{CONFIGS_DIR}/.dashboard-last-run' DASHBOARD_LOCK = f'{CONFIGS_DIR}/.dashboard-lock' DASHBOARD_PENDING = f'{CONFIGS_DIR}/.dashboard-pending' DASHBOARD_DB = f'{CONFIGS_DIR}/.dashboard-snapshots' CREDENTIALS_DB = f'{CONFIGS_DIR}/.client-credentials' HEALTH_FILE = f'{CONFIGS_DIR}/.health' BLOCKLISTS_DIR = f'{CONFIGS_DIR}/blocklists' PRODUCT_NAME = os.environ.get('PRODUCT_NAME', 'routlin') DASHB_TIMER_NAME = f'{PRODUCT_NAME}-dashboard-queue' DDNS_TIMER_NAME = f'{PRODUCT_NAME}-ddns-update' WEB_APP_DISPLAY_NAME = os.environ.get('WEB_APP_DISPLAY_NAME', f'{PRODUCT_NAME.capitalize()} Dashboard') DASHB_INTERVAL_SECS = 30 QUEUE_MAX_LINES = 50 _config_cache = None _config_mtime = None def load_config(): global _config_cache, _config_mtime try: mtime = os.path.getmtime(CONFIG_FILE) if _config_cache is not None and mtime == _config_mtime: return copy.deepcopy(_config_cache) with open(CONFIG_FILE) as f: data = json.load(f) _config_cache = data _config_mtime = mtime return copy.deepcopy(data) except Exception: return {} def save_config(data): with open(CONFIG_FILE, 'w') as f: json.dump(data, f, indent=2) def config_hash(): try: with open(CONFIG_FILE, 'rb') as f: return hashlib.md5(f.read()).hexdigest() except Exception: return '' def verify_config_hash(submitted): if not submitted: return True return submitted == config_hash() def _load_done_set(): try: done = set() for line in open(DASHBOARD_DONE).read().splitlines(): parts = line.split() if parts: done.add(parts[0]) return done except Exception: return set() def _read_pending(done_set): pending = [] try: lines = open(DASHBOARD_QUEUE).read().splitlines() except Exception: return pending for line in lines: try: parts = line.split(None, 2) if len(parts) == 3: entry_uuid, entry_ts, rest = parts cmd_user = rest.rsplit(' (', 1) entry_cmd = cmd_user[0].strip('[]') entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else '' if entry_uuid not in done_set: pending.append((entry_uuid, int(entry_ts), entry_cmd, entry_user)) except Exception: pass return pending def get_pending_entries(): return _read_pending(_load_done_set()) def _format_timing(secs): if secs is None: return None if secs <= 5: return 'momentarily' if secs < 60: return f'in about {secs} seconds' mins = round(secs / 60) return f'in about {mins} minute{"s" if mins != 1 else ""}' def _trim_if_needed(): try: lines = [l for l in open(DASHBOARD_QUEUE).read().splitlines() if l] if len(lines) <= QUEUE_MAX_LINES: return done_set = _load_done_set() pending = [l for l in lines if l.split()[0] not in done_set] with open(DASHBOARD_QUEUE, 'w') as f: f.write('\n'.join(pending) + ('\n' if pending else '')) open(DASHBOARD_DONE, 'w').close() except Exception: pass def _apply_changes_immediately(): try: return session.get('apply_changes_immediately', False) except Exception: return False def _read_dashboard_pending(): """Return list of (uuid, ts, cmd, user) from .dashboard-pending.""" items = [] try: lines = open(DASHBOARD_PENDING).read().splitlines() except Exception: return items for line in lines: if not line.strip(): continue try: parts = line.split(None, 2) if len(parts) == 3: entry_uuid, entry_ts, rest = parts cmd_user = rest.rsplit(' (', 1) entry_cmd = cmd_user[0].strip('[]') entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else '' items.append((entry_uuid, int(entry_ts), entry_cmd, entry_user)) except Exception: pass return items def get_dashboard_pending(): return _read_dashboard_pending() def get_dashboard_done(): """Return list of (uuid, applied_ts) from .dashboard-done, newest first.""" items = [] try: lines = open(DASHBOARD_DONE).read().splitlines() except Exception: return items for line in lines: if not line.strip(): continue try: parts = line.split(None, 1) if len(parts) >= 2: items.append((parts[0], int(parts[1]))) elif len(parts) == 1: items.append((parts[0], None)) except Exception: pass items.reverse() return items def get_done_timestamps(): """Return dict of {uuid: applied_ts} from .dashboard-done.""" result = {} try: for line in open(DASHBOARD_DONE).read().splitlines(): if not line.strip(): continue parts = line.split(None, 1) if len(parts) >= 2: result[parts[0]] = int(parts[1]) elif len(parts) == 1: result[parts[0]] = None except Exception: pass return result def load_all_snapshots(): """Return all snapshot dicts from .snapshots/, sorted newest first.""" snaps = [] try: for fname in sorted(os.listdir(SNAPSHOTS_DIR), reverse=True): if not fname.endswith('.json'): continue try: with open(os.path.join(SNAPSHOTS_DIR, fname)) as f: snaps.append(json.load(f)) except Exception: pass except Exception: pass return snaps def flush_pending_to_queue(): """Move all entries from .dashboard-pending to .dashboard-queue and clear pending.""" items = _read_dashboard_pending() if not items: return done_set = _load_done_set() existing_ids = {uu for uu, *_ in _read_pending(done_set)} with open(DASHBOARD_QUEUE, 'a') as f: for entry_uuid, entry_ts, entry_cmd, entry_user in items: if entry_uuid not in existing_ids: f.write(f'{entry_uuid} {entry_ts} [{entry_cmd}] ({entry_user})\n') open(DASHBOARD_PENDING, 'w').close() _trim_if_needed() def _queue_pending_command(cmd, user=None): """Append cmd to .dashboard-pending if not already present for this cmd+user.""" existing = _read_dashboard_pending() current_user = user or session.get('email_address', 'unknown') for entry_uuid, entry_ts, entry_cmd, entry_user in existing: if entry_cmd == cmd and entry_user == current_user: return entry_uuid, entry_ts entry_uuid = str(uuid.uuid4()) entry_ts = int(datetime.now().timestamp()) with open(DASHBOARD_PENDING, 'a') as f: f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n') return entry_uuid, entry_ts def _queue_pending_presigned(cmd, entry_uuid, entry_ts): """Write a pre-generated entry to .dashboard-pending without dedup.""" current_user = session.get('email_address', 'unknown') with open(DASHBOARD_PENDING, 'a') as f: f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n') def _queue_command(cmd, user=None): if not _apply_changes_immediately(): return _queue_pending_command(cmd, user=user) done_set = _load_done_set() pending = _read_pending(done_set) current_user = user or session.get('email_address', 'unknown') for entry_uuid, entry_ts, entry_cmd, entry_user in pending: if entry_cmd == cmd and entry_user == current_user: return entry_uuid, entry_ts entry_uuid = str(uuid.uuid4()) entry_ts = int(datetime.now().timestamp()) with open(DASHBOARD_QUEUE, 'a') as f: f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n') _trim_if_needed() return entry_uuid, entry_ts def _find_cmd_in_queues(cmd): """Return (uuid, ts) of first matching entry in .dashboard-pending or .dashboard-queue, or (None, None).""" for entry_uuid, entry_ts, entry_cmd, entry_user in _read_dashboard_pending(): if entry_cmd == cmd: return entry_uuid, entry_ts done_set = _load_done_set() for entry_uuid, entry_ts, entry_cmd, entry_user in _read_pending(done_set): if entry_cmd == cmd: return entry_uuid, entry_ts return None, None def _entry_ts_from_queue(entry_uuid): try: for line in open(DASHBOARD_QUEUE).read().splitlines(): parts = line.split(None, 2) if len(parts) >= 2 and parts[0] == entry_uuid: return int(parts[1]) except Exception: pass return None def _seconds_until_next_run(): try: last_run = float(open(DASHBOARD_LAST_RUN).read().strip()) elapsed = datetime.now(timezone.utc).timestamp() - last_run return int(max(0, DASHB_INTERVAL_SECS - elapsed)) except Exception: return None def _is_locked(): try: return os.path.getsize(DASHBOARD_LOCK) > 0 except Exception: return False def _lock_mtime(): try: return os.path.getmtime(DASHBOARD_LOCK) except Exception: return None def _timing_status_msg(entry_ts, action_label): """Return a flash message for a command already written to the queue.""" if _is_locked(): mtime = _lock_mtime() if entry_ts is not None and mtime and entry_ts < mtime: return f'{action_label}. Your changes are being applied now...' return f'{action_label}. Your changes will be applied on the next run.' timing = _format_timing(_seconds_until_next_run()) if timing: return f'{action_label}. Your changes will be applied {timing}.' return f'{action_label}. The processing service is not running.' def _build_timing_msg(entry_ts, action_label='Configuration saved'): if not _apply_changes_immediately(): from markupsafe import Markup return Markup(f'{action_label}. Visit the Actions page to apply your changes.') return _timing_status_msg(entry_ts, action_label) def queue_command(cmd, description='', user=None): """Queue a command without generating a flash message. description is ignored (kept for compat).""" return _queue_command(cmd, user=user) def queued_msg(cmd=None, description='', action_label='Configuration saved'): """Queue cmd if given, then return a timing message. description is ignored.""" entry_ts = None if cmd is not None: _entry_uuid, entry_ts = queue_command(cmd) return _build_timing_msg(entry_ts, action_label) # Snapshot system =================================================== import re as _re import sqlite3 as _sqlite3 def _db(): conn = _sqlite3.connect(DASHBOARD_DB) conn.row_factory = _sqlite3.Row conn.execute('PRAGMA journal_mode=WAL') conn.executescript(''' CREATE TABLE IF NOT EXISTS groups ( uuid TEXT PRIMARY KEY, ts INTEGER NOT NULL, cmd TEXT, user TEXT, parent_path TEXT NOT NULL, item_key TEXT, item_value TEXT, reverts_group TEXT, reverted INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS changes ( group_id TEXT NOT NULL REFERENCES groups(uuid), field TEXT NOT NULL, before TEXT, after TEXT, value_type TEXT NOT NULL, PRIMARY KEY (group_id, field) ); CREATE INDEX IF NOT EXISTS idx_changes_group ON changes(group_id); ''') return conn def _py_value_type(val): if val is None: return 'null' if isinstance(val, bool): return 'bool' if isinstance(val, int): return 'int' if isinstance(val, float): return 'float' if isinstance(val, (dict, list)): return 'json' return 'str' def _serialize_value(val): if val is None: return None if isinstance(val, bool): return 'true' if val else 'false' if isinstance(val, (dict, list)): return json.dumps(val, separators=(',', ':')) return str(val) def _deserialize_value(text, value_type): if text is None: return None if value_type == 'int': return int(text) if value_type == 'float': return float(text) if value_type == 'bool': return text == 'true' if value_type in ('json', 'null'): return json.loads(text) return text def diff_fields(before_dict, after_dict): """Return list of (field, before_text, after_text, value_type) for changed fields.""" bd = before_dict or {} ad = after_dict or {} result = [] for key in sorted(set(bd) | set(ad)): bval = bd.get(key) aval = ad.get(key) if bval == aval: continue ref = aval if aval is not None else bval result.append(( key, _serialize_value(bval), _serialize_value(aval), _py_value_type(ref), )) return result _PATH_SEG = _re.compile(r'([^\.\[]+)(?:\[([^\]=]+)=([^\]]+)\])?') def _parse_path(path): """Parse 'vlans[name=trusted].field' into [(field, sel_key, sel_val), ...].""" return [(m.group(1), m.group(2), m.group(3)) for m in _PATH_SEG.finditer(path)] def _nav_get(cfg, path): """Navigate config to the value at path.""" for field, sel_key, sel_val in _parse_path(path): cfg = cfg[field] if sel_key: cfg = next(x for x in cfg if str(x.get(sel_key, '')) == str(sel_val)) return cfg def _nav_parent(cfg, path): """Return (parent_obj, final_key) for setting/deleting the last path segment.""" segs = _parse_path(path) for field, sel_key, sel_val in segs[:-1]: cfg = cfg[field] if sel_key: cfg = next(x for x in cfg if str(x.get(sel_key, '')) == str(sel_val)) return cfg, segs[-1][0] def record_group(cfg, parent_path, item_key, item_value, changes, cmd, reverts_group=None, queue=True): """Insert a group + changes into sqlite, save config, and queue the command. Returns a flash message string. """ group_uuid = str(uuid.uuid4()) entry_ts = int(datetime.now().timestamp()) current_user = session.get('email_address', 'unknown') conn = _db() try: conn.execute( 'INSERT INTO groups ' '(uuid,ts,cmd,user,parent_path,item_key,item_value,reverts_group) ' 'VALUES (?,?,?,?,?,?,?,?)', (group_uuid, entry_ts, cmd, current_user, parent_path, item_key, item_value, reverts_group) ) for field, before, after, value_type in changes: conn.execute( 'INSERT INTO changes (group_id,field,before,after,value_type) ' 'VALUES (?,?,?,?,?)', (group_uuid, field, before, after, value_type) ) conn.commit() finally: conn.close() save_config(cfg) if not queue: with open(DASHBOARD_DONE, 'a') as f: f.write(f'{group_uuid} {entry_ts}\n') return 'Saved.' if _apply_changes_immediately(): with open(DASHBOARD_QUEUE, 'a') as f: f.write(f'{group_uuid} {entry_ts} [{cmd}] ({current_user})\n') _trim_if_needed() else: _queue_pending_presigned(cmd, group_uuid, entry_ts) return _build_timing_msg(entry_ts) def load_all_groups(): """Return list of (group_dict, [change_dicts]) sorted newest first.""" conn = _db() try: gs = conn.execute('SELECT * FROM groups ORDER BY ts DESC').fetchall() result = [] for g in gs: cs = conn.execute( 'SELECT * FROM changes WHERE group_id=? ORDER BY field', (g['uuid'],) ).fetchall() result.append((dict(g), [dict(c) for c in cs])) return result finally: conn.close() def revert_group(group_uuid, force=False): """Revert a change group. Returns (flash_message, success_bool). force=True skips the revert-of-revert guard, used by revert_group_chain.""" conn = _db() try: g = conn.execute('SELECT * FROM groups WHERE uuid=?', (group_uuid,)).fetchone() if not g: return f'Snapshot not found for {group_uuid[:8]}.', False g = dict(g) changes = [dict(c) for c in conn.execute( 'SELECT * FROM changes WHERE group_id=?', (group_uuid,) ).fetchall()] finally: conn.close() if g['reverts_group'] and not force: return 'Cannot revert a revert.', False cfg = load_config() parent_path = g['parent_path'] item_key = g['item_key'] item_value = g['item_value'] all_before_null = all(c['before'] is None for c in changes) all_after_null = all(c['after'] is None for c in changes) if all_before_null: parent_obj, lst_key = _nav_parent(cfg, parent_path) parent_obj[lst_key] = [ x for x in parent_obj[lst_key] if str(x.get(item_key, '')) != str(item_value) ] elif all_after_null: item = {c['field']: _deserialize_value(c['before'], c['value_type']) for c in changes} _nav_get(cfg, parent_path).append(item) else: item_path = f'{parent_path}[{item_key}={item_value}]' if item_key else parent_path for c in changes: parent_obj, field = _nav_parent(cfg, f'{item_path}.{c["field"]}') if c['before'] is None: parent_obj.pop(field, None) else: parent_obj[field] = _deserialize_value(c['before'], c['value_type']) inv = [(c['field'], c['after'], c['before'], c['value_type']) for c in changes] msg = record_group(cfg, parent_path, item_key, item_value, inv, g['cmd'], reverts_group=group_uuid) conn = _db() try: conn.execute('UPDATE groups SET reverted=1 WHERE uuid=?', (group_uuid,)) conn.commit() finally: conn.close() return msg, True def revert_group_chain(group_uuid): """Revert group_uuid and all subsequent groups touching the same item (same parent_path + item_key + item_value), newest first. Returns (error_messages, succeeded_count, failed_count).""" conn = _db() try: g = conn.execute('SELECT * FROM groups WHERE uuid=?', (group_uuid,)).fetchone() if not g: return [f'Snapshot not found for {group_uuid[:8]}.'], 0, 1 g = dict(g) chain = [dict(r) for r in conn.execute( 'SELECT * FROM groups ' 'WHERE parent_path=? AND item_key IS ? AND item_value IS ? AND ts >= ? AND reverted=0 ' 'ORDER BY ts DESC', (g['parent_path'], g['item_key'], g['item_value'], g['ts']) ).fetchall()] finally: conn.close() errors, succeeded, failed = [], 0, 0 for grp in chain: msg, ok = revert_group(grp['uuid'], force=True) if ok: succeeded += 1 else: errors.append(msg) failed += 1 return errors, succeeded, failed # Misc ============================================================== def run_apply(): try: subprocess.run( ['python3', f'{CONFIGS_DIR}/core.py', '--apply'], capture_output=True, timeout=30 ) except Exception: pass def run_merge_blocklists(): try: subprocess.run( ['python3', f'{CONFIGS_DIR}/core.py', '--merge-blocklists'], capture_output=True, timeout=120 ) except Exception: pass # Format helpers ==================================================== def fmt_timestamp(ts): try: return datetime.fromtimestamp(ts, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC') except Exception: return '-' def relative_time(ts1, ts2, short=False): try: diff = abs(int(ts1) - int(ts2)) if diff < 60: return f'{diff}s' if short else f'{diff} second{"s" if diff != 1 else ""}' m = diff // 60 if m < 60: return f'{m}m' if short else f'{m} minute{"s" if m != 1 else ""}' h, rem_m = divmod(m, 60) if h < 24: if short: return f'{h}h {rem_m}m' if rem_m else f'{h}h' return f'{h}h {rem_m}m' if rem_m else f'{h} hour{"s" if h != 1 else ""}' d = h // 24 if d < 365: return f'{d}d' if short else f'{d} day{"s" if d != 1 else ""}' y = d // 365 return f'{y}y' if short else f'{y} year{"s" if y != 1 else ""}' except Exception: return '' def fmt_bytes(n): for unit in ('B', 'KB', 'MB', 'GB'): if n < 1024: return f'{n:.1f} {unit}' n /= 1024 return f'{n:.1f} TB' def resolve_iface(vlan, cfg): if vlan.get('is_vpn'): wg_vlans = [v for v in cfg.get('vlans', []) if v.get('is_vpn')] wg_sorted = sorted(wg_vlans, key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)) idx = next((i for i, v in enumerate(wg_sorted) if v is vlan), 0) return f'wg{idx}' lan = cfg.get('network_interfaces', {}).get('lan_interface', 'eth0') vid = vlan.get('vlan_id') or 1 return lan if vid == 1 else f'{lan}.{vid}' # Config datasources ================================================ def _bl_db_rows(): """Return {blocklist_name: {domain_count, fetched_at}} from domains.db, or {} if unavailable.""" db_path = os.path.join(BLOCKLISTS_DIR, 'domains.db') try: db = _sqlite3.connect(f'file:{db_path}?mode=ro', uri=True) rows = db.execute('SELECT name, domain_count, fetched_at FROM blocklists').fetchall() db.close() return {name: {'domain_count': count, 'fetched_at': fetched_at} for name, count, fetched_at in rows} except Exception: return {} def _bl_db_counts(): return {name: v['domain_count'] for name, v in _bl_db_rows().items()} def config_datasource(name): cfg = load_config() vlans = cfg.get('vlans', []) if name == 'banned_ips': return cfg.get('banned_ips', []) if name == 'host_overrides': return cfg.get('host_overrides', []) if name == 'blocklists': db_counts = _bl_db_counts() used_by = {} for v in vlans: vlan_name = v.get('name', '') for bl_name in v.get('use_blocklists', []): used_by.setdefault(bl_name, []).append(vlan_name) rows = [] for bl in cfg.get('dns_blocking', {}).get('blocklists', []): row = dict(bl) bl_type = bl.get('bl_type', 'community') row['bl_type_label'] = 'Local' if bl_type == 'local' else 'Community' count = db_counts.get(bl.get('name', '')) row['domain_count'] = f'{count:,}' if count is not None else '-' if bl_type == 'local': bl_path = os.path.join(BLOCKLISTS_DIR, bl.get('save_as', '')) try: with open(bl_path) as f: row['local_entries'] = f.read().strip() except Exception: row['local_entries'] = '' row['source_display'] = bl.get('save_as', '') else: row['local_entries'] = '' row['source_display'] = row.get('url', '') vlan_names = used_by.get(bl.get('name', ''), []) row['used_by'] = json.dumps([{'n': n, 'd': ''} for n in vlan_names]) rows.append(row) return rows if name == 'vlans': bl_desc = { b['name']: b.get('description', b['name']) for b in cfg.get('dns_blocking', {}).get('blocklists', []) if 'name' in b } rows = [] for v in sorted(vlans, key=lambda x: x.get('vlan_id') or 0): row = {k: v.get(k) for k in ( 'name', 'subnet', 'subnet_mask', 'radius_default', 'mdns_reflection', 'is_vpn', 'dnsmasq_log_queries' )} row['vlan_id'] = v.get('vlan_id') row['interface'] = resolve_iface(v, cfg) row['use_blocklists'] = json.dumps([ {'n': bl, 'd': bl_desc.get(bl, bl)} for bl in v.get('use_blocklists', []) ]) prefix = v.get('subnet_mask', 24) n_octets = 1 if prefix >= 24 else 2 if prefix >= 16 else 3 if prefix >= 8 else 4 row['server_identity_ips'] = json.dumps([ { 'n': s['ip'], 'd': ' | '.join(filter(None, [s['ip'], s.get('description'), s.get('hostname')])), 'short': '.' + '.'.join(s['ip'].split('.')[-n_octets:]), 'mini': '.' + '.'.join(s['ip'].split('.')[-n_octets:]), } for s in v.get('server_identities', []) if s.get('ip') ]) row['server_identity_descriptions'] = json.dumps([ s.get('description', '') for s in v.get('server_identities', []) if s.get('ip') ]) row['server_identity_hostnames'] = json.dumps([ s.get('hostname', '') for s in v.get('server_identities', []) if s.get('ip') ]) row['server_identity_gateway'] = ( v.get('dhcp_information', {}).get('explicit_overrides', {}).get('gateway', '') ) dns = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('dns_servers', []) row['server_identity_dns_servers'] = '\n'.join(dns) if isinstance(dns, list) else str(dns or '') ntp = v.get('dhcp_information', {}).get('explicit_overrides', {}).get('ntp_servers', []) row['server_identity_ntp_servers'] = '\n'.join(ntp) if isinstance(ntp, list) else str(ntp or '') row['gateway'] = row['server_identity_gateway'] row['dns_servers'] = row['server_identity_dns_servers'] row['ntp_servers'] = row['server_identity_ntp_servers'] row['dns_servers_override'] = 1 if row['server_identity_dns_servers'] else 0 row['ntp_servers_override'] = 1 if row['server_identity_ntp_servers'] else 0 dhi = v.get('dhcp_information', {}) row['dhcp_pool_start'] = dhi.get('dynamic_pool_start', '') row['dhcp_pool_end'] = dhi.get('dynamic_pool_end', '') lt = dhi.get('lease_time', '') if lt and len(lt) > 1 and lt[:-1].isdigit() and lt[-1] in 'mhd': row['dhcp_lease_time'] = lt[:-1] row['dhcp_lease_unit'] = {'m': 'minutes', 'h': 'hours', 'd': 'days'}[lt[-1]] else: row['dhcp_lease_time'] = '' row['dhcp_lease_unit'] = '' row['dhcp_domain'] = dhi.get('domain', '') row['server_identities_json'] = json.dumps(v.get('server_identities', [])) rows.append(row) return rows if name == 'inter_vlan_exceptions': return cfg.get('inter_vlan_exceptions', []) if name == 'port_forwarding': return cfg.get('port_forwarding', []) if name == 'port_wrangling': rows = [] for r in cfg.get('port_wrangling', []): row = dict(r) row['vlan_name'] = r.get('vlan', '-') rows.append(row) return rows if name == 'dhcp_reservations': rows = [] for res in cfg.get('dhcp_reservations', []): row = dict(res) row['vlan_name'] = res.get('vlan', '-') row['ip'] = res.get('ip') or 'dynamic' rows.append(row) return rows if name == 'ddns_providers': from factory import e ddns = load_config().get('ddns', {}) rows = [] for p in ddns.get('providers', []): row = dict(p) ptype = p.get('provider', '').lower() if ptype == 'noip': row['credentials'] = ( '
' f'U: {e(p.get("username", "-"))}
' 'P: ••••••
' ) elif ptype in ('cloudflare', 'duckdns'): tok = p.get('api_token', '') row['credentials'] = f'API Token: {e(tok[:20])}...' if tok else '(not set)' else: row['credentials'] = '-' row['hostnames'] = json.dumps(p.get('hostnames', p.get('subdomains', []))) rows.append(row) return rows if name == 'accounts': try: with open(ACCOUNTS_FILE) as f: data = json.load(f) except Exception: data = {} rows = [] for acct in data.get('accounts', []): row = dict(acct) row['account_status'] = 'active' if acct.get('hashed_password') else 'pending' rows.append(row) return rows if name == 'vpn_peers': rows = [] wg_sorted = sorted( [v for v in vlans if v.get('is_vpn')], key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0) ) for i, vlan in enumerate(wg_sorted): iface = f'wg{i}' vlan_display = f'{iface} (VLAN {vlan.get("vlan_id") or "?"})' for peer in vlan.get('peers', []): row = dict(peer) row['vlan_display'] = vlan_display row['split_tunnel'] = 'yes' if peer.get('split_tunnel') else 'no' row['pubkey_short'] = peer.get('public_key', '')[:20] + '...' if peer.get('public_key') else '-' rows.append(row) return rows return [] def load_datasource(spec): if spec.startswith('config:'): return config_datasource(spec[7:]) return [] def collect_layout_tokens(cfg): import settings as settings net = cfg.get('network_interfaces', {}) return { 'GENERAL_LAN_INTERFACE': str(net.get('lan_interface', '-')), 'VPN_VLAN_COUNT': str(sum(1 for v in cfg.get('vlans', []) if v.get('is_vpn'))), 'PRO_LICENSE_JS': 'true' if settings.is_pro() else 'false', }