Development

This commit is contained in:
Matthew Grotke 2026-05-22 01:09:23 -04:00
parent cc2f57aa83
commit 74166f03bd
11 changed files with 986 additions and 61 deletions

View file

@ -14,6 +14,7 @@ def apply_general():
log_errors_only = 'log_errors_only' in request.form log_errors_only = 'log_errors_only' in request.form
dnsmasq_log_queries = 'dnsmasq_log_queries' in request.form dnsmasq_log_queries = 'dnsmasq_log_queries' in request.form
daily_execute_time = sanitize.time_24h(request.form.get('daily_execute_time_24hr_local', '')) daily_execute_time = sanitize.time_24h(request.form.get('daily_execute_time_24hr_local', ''))
apply_on_save = 'apply_on_save' in request.form
log_max_kb = validate.int_range(log_max_kb_raw, 64, None) log_max_kb = validate.int_range(log_max_kb_raw, 64, None)
if log_max_kb is None: if log_max_kb is None:
@ -30,6 +31,7 @@ def apply_general():
'log_errors_only': log_errors_only, 'log_errors_only': log_errors_only,
'dnsmasq_log_queries': dnsmasq_log_queries, 'dnsmasq_log_queries': dnsmasq_log_queries,
'daily_execute_time_24hr_local': daily_execute_time, 'daily_execute_time_24hr_local': daily_execute_time,
'apply_on_save': apply_on_save,
}) })
errors = validate.validate_config(core) errors = validate.validate_config(core)
if errors: if errors:

View file

@ -0,0 +1,26 @@
from flask import Blueprint, redirect, flash
from auth import require_level
from config_utils import (flush_pending_to_queue, get_dashboard_pending,
_is_locked, _format_timing, _seconds_until_next_run)
bp = Blueprint('action_apply_pending', __name__)
@bp.route('/action/apply_pending', methods=['POST'])
@require_level('administrator')
def apply_pending():
items = get_dashboard_pending()
if not items:
flash('No pending changes to apply.', 'info')
return redirect('/view/view_general')
flush_pending_to_queue()
if _is_locked():
msg = 'Changes queued. They are being applied now.'
else:
timing = _format_timing(_seconds_until_next_run())
if timing:
msg = f'Changes queued. They will be applied {timing}.'
else:
msg = 'Changes queued. The processing service is not running.'
flash(msg, 'success')
return redirect('/view/view_general')

View file

@ -8,6 +8,8 @@ DASHBOARD_QUEUE = f'{CONFIGS_DIR}/.dashboard-queue'
DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done' DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done'
DASHBOARD_LAST_RUN = f'{CONFIGS_DIR}/.dashboard-last-run' DASHBOARD_LAST_RUN = f'{CONFIGS_DIR}/.dashboard-last-run'
DASHBOARD_LOCK = f'{CONFIGS_DIR}/.dashboard-lock' DASHBOARD_LOCK = f'{CONFIGS_DIR}/.dashboard-lock'
DASHBOARD_PENDING = f'{CONFIGS_DIR}/.dashboard-pending'
STATUS_FILE = f'{CONFIGS_DIR}/.status'
DASHB_TIMER_NAME = 'routlin-dashboard-queue' DASHB_TIMER_NAME = 'routlin-dashboard-queue'
PRODUCT_DISPLAY_NAME = os.environ.get('PRODUCT_DISPLAY_NAME', 'Routlin Dashboard') PRODUCT_DISPLAY_NAME = os.environ.get('PRODUCT_DISPLAY_NAME', 'Routlin Dashboard')
DASHB_INTERVAL_SECS = 60 DASHB_INTERVAL_SECS = 60
@ -103,7 +105,77 @@ def _trim_if_needed():
pass pass
def _queue_command(cmd): def _apply_on_save():
try:
return load_core().get('general', {}).get('apply_on_save', True)
except Exception:
return True
def _read_dashboard_pending():
"""Return list of (uuid, ts, cmd, user, description) from .dashboard-pending."""
items = []
try:
lines = open(DASHBOARD_PENDING).read().splitlines()
except Exception:
return items
for line in lines:
if not line.strip():
continue
try:
main, _, desc = line.partition(' :: ')
parts = main.split(None, 3)
if len(parts) == 4:
entry_uuid, entry_ts, _dt, rest = parts
cmd_user = rest.rsplit(' (', 1)
entry_cmd = cmd_user[0].strip('[]')
entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else ''
items.append((entry_uuid, int(entry_ts), entry_cmd, entry_user, desc))
except Exception:
pass
return items
def get_dashboard_pending():
return _read_dashboard_pending()
def flush_pending_to_queue():
"""Move all entries from .dashboard-pending to .dashboard-queue and clear pending."""
items = _read_dashboard_pending()
if not items:
return
done_set = _load_done_set()
existing_ids = {uu for uu, *_ in _read_pending(done_set)}
with open(DASHBOARD_QUEUE, 'a') as f:
for entry_uuid, entry_ts, entry_cmd, entry_user, _desc in items:
if entry_uuid not in existing_ids:
dt_str = datetime.fromtimestamp(entry_ts).strftime('%Y-%m-%dT%H:%M:%S')
f.write(f'{entry_uuid} {entry_ts} {dt_str} [{entry_cmd}] ({entry_user})\n')
open(DASHBOARD_PENDING, 'w').close()
_trim_if_needed()
def _queue_pending_command(cmd, description=''):
"""Append cmd to .dashboard-pending if not already present for this cmd+user."""
existing = _read_dashboard_pending()
current_user = session.get('email_address', 'unknown')
for entry_uuid, entry_ts, entry_cmd, entry_user, _desc in existing:
if entry_cmd == cmd and entry_user == current_user:
return entry_uuid, entry_ts
entry_uuid = str(uuid.uuid4())
now = datetime.now()
entry_ts = int(now.timestamp())
dt_str = now.strftime('%Y-%m-%dT%H:%M:%S')
desc_suffix = f' :: {description}' if description else ''
with open(DASHBOARD_PENDING, 'a') as f:
f.write(f'{entry_uuid} {entry_ts} {dt_str} [{cmd}] ({current_user}){desc_suffix}\n')
return entry_uuid, entry_ts
def _queue_command(cmd, description=''):
if not _apply_on_save():
return _queue_pending_command(cmd, description)
done_set = _load_done_set() done_set = _load_done_set()
pending = _read_pending(done_set) pending = _read_pending(done_set)
current_user = session.get('email_address', 'unknown') current_user = session.get('email_address', 'unknown')
@ -155,17 +227,19 @@ def _lock_mtime():
return None return None
def queue_command(cmd): def queue_command(cmd, description=''):
"""Queue a command without generating a flash message.""" """Queue a command without generating a flash message."""
return _queue_command(cmd) return _queue_command(cmd, description)
def queued_msg(cmd=None): def queued_msg(cmd=None, description=''):
"""Queue cmd if given, then return a timing message. """Queue cmd if given, then return a timing message.
Without cmd, just returns timing (for commands already queued by the caller).""" Without cmd, just returns timing (for commands already queued by the caller)."""
entry_ts = None entry_ts = None
if cmd is not None: if cmd is not None:
_entry_uuid, entry_ts = queue_command(cmd) _entry_uuid, entry_ts = queue_command(cmd, description)
if not _apply_on_save():
return 'Configuration saved. Click Apply Now on the Configuration Changes card to apply.'
if _is_locked(): if _is_locked():
mtime = _lock_mtime() mtime = _lock_mtime()
if entry_ts is not None and mtime and entry_ts < mtime: if entry_ts is not None and mtime and entry_ts < mtime:
@ -178,7 +252,7 @@ def queued_msg(cmd=None):
return 'Changes queued. The processing service is not running.' return 'Changes queued. The processing service is not running.'
parts = cmd.split() parts = cmd.split()
cli_cmd = f'sudo python3 {parts[0]}.py --{parts[1]}' if len(parts) == 2 else cmd cli_cmd = f'sudo python3 {parts[0]}.py --{parts[1]}' if len(parts) == 2 else cmd
install_cmd = f'sudo python3 {parts[0]}.py --install' if len(parts) >= 1 else 'core.py --install' install_cmd = f'sudo python3 install.py'
from markupsafe import Markup from markupsafe import Markup
return Markup(f'Configuration saved. The command processing service is not installed. ' return Markup(f'Configuration saved. The command processing service is not installed. '
f'Run <strong>{install_cmd}</strong> to enable it, ' f'Run <strong>{install_cmd}</strong> to enable it, '

View file

@ -24,6 +24,7 @@ from action_clear_ddns_log import bp as action_clear_ddns_log_bp
from action_apply_ddns_providers import bp as action_apply_ddns_providers_bp from action_apply_ddns_providers import bp as action_apply_ddns_providers_bp
from action_apply_interface import bp as action_apply_interface_bp from action_apply_interface import bp as action_apply_interface_bp
from action_apply_iface_config import bp as action_apply_iface_config_bp from action_apply_iface_config import bp as action_apply_iface_config_bp
from action_apply_pending import bp as action_apply_pending_bp
from api_apply_status import bp as api_apply_status_bp from api_apply_status import bp as api_apply_status_bp
app = Flask(__name__) app = Flask(__name__)
@ -52,6 +53,7 @@ app.register_blueprint(action_clear_ddns_log_bp)
app.register_blueprint(action_apply_ddns_providers_bp) app.register_blueprint(action_apply_ddns_providers_bp)
app.register_blueprint(action_apply_interface_bp) app.register_blueprint(action_apply_interface_bp)
app.register_blueprint(action_apply_iface_config_bp) app.register_blueprint(action_apply_iface_config_bp)
app.register_blueprint(action_apply_pending_bp)
app.register_blueprint(api_apply_status_bp) app.register_blueprint(api_apply_status_bp)
def _seed_initial_account(): def _seed_initial_account():

View file

@ -4,7 +4,7 @@ import json, re, subprocess, os, sys, html as html_mod
import sanitize import sanitize
import validation as validate import validation as validate
from datetime import datetime, timezone from datetime import datetime, timezone
from config_utils import core_hash, get_pending_entries, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, PRODUCT_DISPLAY_NAME from config_utils import core_hash, get_pending_entries, get_dashboard_pending, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, PRODUCT_DISPLAY_NAME
bp = Blueprint('view_page', __name__) bp = Blueprint('view_page', __name__)
@ -529,6 +529,38 @@ def collect_tokens():
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if gen.get('log_errors_only') else 'false' tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if gen.get('log_errors_only') else 'false'
tokens['GENERAL_DNSMASQ_LOG_QUERIES'] = 'true' if gen.get('dnsmasq_log_queries') else 'false' tokens['GENERAL_DNSMASQ_LOG_QUERIES'] = 'true' if gen.get('dnsmasq_log_queries') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(gen.get('daily_execute_time_24hr_local', '-')) tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(gen.get('daily_execute_time_24hr_local', '-'))
tokens['GENERAL_APPLY_ON_SAVE'] = 'true' if gen.get('apply_on_save', True) else 'false'
pending_items = get_dashboard_pending()
if pending_items:
rows = ''
for _uuid, ts, cmd, user, desc in pending_items:
dt_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
label = e(desc) if desc else e(cmd)
rows += (f'<tr><td class="table-cell">{e(dt_str)}</td>'
f'<td class="table-cell">{label}</td>'
f'<td class="table-cell">{e(user)}</td></tr>')
pending_html = (
'<hr class="divider">'
'<h3 style="margin:0 0 0.75rem 0;font-size:0.85rem;font-weight:600;'
'text-transform:uppercase;letter-spacing:0.05em;color:var(--text-muted)">Pending Changes</h3>'
'<table class="data-table" style="margin-bottom:1rem">'
'<thead><tr>'
'<th class="table-header">Time</th>'
'<th class="table-header">Change</th>'
'<th class="table-header">User</th>'
'</tr></thead>'
f'<tbody>{rows}</tbody>'
'</table>'
'<form method="post" action="/action/apply_pending">'
f'<input type="hidden" name="config_hash" value="{e(core_hash())}">'
'<div class="button-row">'
'<button type="submit" class="btn btn-primary">Apply Now</button>'
'</div></form>'
)
else:
pending_html = ''
tokens['PENDING_CHANGES_HTML'] = pending_html
servers = dns.get('upstream_servers', []) servers = dns.get('upstream_servers', [])
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false' tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
@ -895,6 +927,9 @@ def _render_item(item, tokens, inherited_req=None):
if t == 'table': if t == 'table':
return _render_table(item, tokens, req) return _render_table(item, tokens, req)
if t == 'raw_html':
return Markup(apply_tokens(item.get('html', ''), tokens))
return '' return ''
@ -1364,6 +1399,23 @@ def render_layout(view_id, content_html, tokens):
cls = 'info-bar-warning' cls = 'info-bar-warning'
other_bars += f'<div class="info-bar {cls}" data-apply-uuid="{e(o_uuid)}" data-apply-user="{e(o_user)}">{text}</div>\n' other_bars += f'<div class="info-bar {cls}" data-apply-uuid="{e(o_uuid)}" data-apply-user="{e(o_user)}">{text}</div>\n'
problem_bars = ''
try:
import json as _j
st = _j.load(open(f'{CONFIGS_DIR}/.status'))
for section in ('configurations', 'logs'):
for item in st.get(section, []):
if item.get('status') == 'problem':
sev = item.get('severity', 'error')
cls = 'info-bar-danger' if sev == 'error' else 'info-bar-warning'
text = e(item.get('detail', item.get('name', '')))
tip = item.get('suggestion', '')
if tip:
text += f' <span style="opacity:0.75">— {e(tip)}</span>'
problem_bars += f'<div class="info-bar {cls}">{text}</div>\n'
except Exception:
pass
return (f'<!DOCTYPE html>\n<html lang="en">\n<head>\n' return (f'<!DOCTYPE html>\n<html lang="en">\n<head>\n'
f' <meta charset="UTF-8">\n' f' <meta charset="UTF-8">\n'
f' <meta name="viewport" content="width=device-width, initial-scale=1.0">\n' f' <meta name="viewport" content="width=device-width, initial-scale=1.0">\n'
@ -1372,7 +1424,7 @@ def render_layout(view_id, content_html, tokens):
f'</head>\n<body>\n' f'</head>\n<body>\n'
f'{titlebar_html}\n' f'{titlebar_html}\n'
f'{navbar_html}\n' f'{navbar_html}\n'
f'<main class="main-content">\n{other_bars}{content_html}\n</main>\n' f'<main class="main-content">\n{problem_bars}{other_bars}{content_html}\n</main>\n'
f'{footer_html}\n' f'{footer_html}\n'
f'<script>var CONFIG_HASH="{page_hash}";var LAN_IFACE="{lan_iface}";var VPN_VLAN_COUNT={vpn_count};var EXISTING_VLAN_IDS={existing_ids};var EXISTING_VLAN_NAMES={existing_names};var EXISTING_VLAN_INTERFACES={existing_interfaces};var APPLY_UUID={json.dumps(my_uuid)};</script>\n' f'<script>var CONFIG_HASH="{page_hash}";var LAN_IFACE="{lan_iface}";var VPN_VLAN_COUNT={vpn_count};var EXISTING_VLAN_IDS={existing_ids};var EXISTING_VLAN_NAMES={existing_names};var EXISTING_VLAN_INTERFACES={existing_interfaces};var APPLY_UUID={json.dumps(my_uuid)};</script>\n'
f'<script>{_inline_js()}</script>\n' f'<script>{_inline_js()}</script>\n'

View file

@ -679,6 +679,47 @@
} }
], ],
"client_requirement": "client_is_administrator+" "client_requirement": "client_is_administrator+"
},
{
"type": "card",
"label": "Configuration Changes",
"client_requirement": "client_is_administrator+",
"items": [
{
"type": "form",
"action": "/action/apply_general",
"method": "post",
"items": [
{
"type": "field",
"label": "Apply on Save",
"name": "apply_on_save",
"input_type": "checkbox",
"value": "%GENERAL_APPLY_ON_SAVE%",
"hint": "When enabled, saved changes are queued immediately. When disabled, changes accumulate here until you click Apply Now."
},
{
"type": "button_row",
"items": [
{
"type": "button_primary",
"text": "Save",
"action": "/action/apply_general",
"method": "post"
},
{
"type": "button_cancel",
"text": "Cancel"
}
]
}
]
},
{
"type": "raw_html",
"html": "%PENDING_CHANGES_HTML%"
}
]
} }
] ]
}, },

View file

@ -23,6 +23,7 @@ All configuration lives in two JSON files. Edit these to match your network befo
| `.dashboard-done` | UUIDs of already-processed queue entries; prevents duplicate execution. | | `.dashboard-done` | UUIDs of already-processed queue entries; prevents duplicate execution. |
| `.dashboard-last-run` | Epoch timestamp of the last timer execution. | | `.dashboard-last-run` | Epoch timestamp of the last timer execution. |
| `.dashboard-lock` | PID lock file preventing concurrent timer runs. | | `.dashboard-lock` | PID lock file preventing concurrent timer runs. |
| `.dashboard-pending` | Changes held back when Apply on Save is disabled; flushed to `.dashboard-queue` when Apply Now is clicked. |
| `.dns-metrics` | Cumulative lifetime DNS metrics across all VLAN instances. Created and updated each time `--view-metrics` is run. | | `.dns-metrics` | Cumulative lifetime DNS metrics across all VLAN instances. Created and updated each time `--view-metrics` is run. |
| `.ddns-last-ip-*` | Cached public IP per DDNS provider. Managed by `ddns.py`. | | `.ddns-last-ip-*` | Cached public IP per DDNS provider. Managed by `ddns.py`. |
| `.ddns-last-service` | Tracks IP-check service rotation. Managed by `ddns.py`. | | `.ddns-last-service` | Tracks IP-check service rotation. Managed by `ddns.py`. |

View file

@ -2171,58 +2171,8 @@ def disable_avahi():
def show_status(data): def show_status(data):
import shutil import status as _status
col = shutil.get_terminal_size((80, 24)).columns _status.print_table(_status.run_and_write(data))
def svc_row(unit, expected_active="active"):
r_active = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
r_enabled = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
active = r_active.stdout.strip()
enabled = r_enabled.stdout.strip()
active_sym = "+" if active == "active" else "x"
enabled_sym = "+" if enabled == "enabled" else "x"
active_ok = "(OK) " if active == expected_active else "(BAD)"
enabled_ok = "(OK) " if enabled == "enabled" else "(BAD)"
return active_sym, active, active_ok, enabled_sym, enabled, enabled_ok
units = []
for vlan in data["vlans"]:
iface = derive_interface(vlan, data)
if is_wg(vlan) and not wg_interface_up(iface):
units.append((vlan_service_name(vlan, iface), "(wg0 not up)", "active"))
else:
units.append((vlan_service_name(vlan, iface), None, "active"))
units.append((f"{BLIST_TIMER_NAME}.timer", None, "active"))
units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running
units.append(("freeradius", None, "active"))
units.append(("avahi-daemon", None, "active"))
print(f" {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
print(f" {'-'*45} {'-'*18} {'-'*15}")
for unit, note, expected_active in units:
if note:
print(f" {unit:<45} {note}")
else:
active_sym, active, active_ok, enabled_sym, enabled, enabled_ok = svc_row(unit, expected_active)
print(f" {unit:<45} {active_sym} {active:<10} {active_ok} {enabled_sym} {enabled:<10} {enabled_ok}")
# Timer next trigger
r = subprocess.run(
["systemctl", "show", f"{BLIST_TIMER_NAME}.timer", "--property=NextElapseUSecRealtime,NextElapseUSecMonotonic"],
capture_output=True, text=True
)
# Fall back to human-readable 'Trigger' field from status output
r2 = subprocess.run(
["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
capture_output=True, text=True
)
for line in r2.stdout.splitlines():
line = line.strip()
if line.startswith("Trigger:"):
trigger = line.split("Trigger:", 1)[1].strip()
if trigger and trigger != "n/a":
print(f"\n Next blocklist update: {trigger}")
break
def show_configs(data): def show_configs(data):
for vlan in data["vlans"]: for vlan in data["vlans"]:
@ -3167,6 +3117,9 @@ def cmd_apply(data, dry_run=False):
print("Done.") print("Done.")
import status as _status
_status.print_table(_status.run_and_write(data))
def cmd_update_blocklists(data): def cmd_update_blocklists(data):
"""--update-blocklists: download and merge blocklists. On success, call """--update-blocklists: download and merge blocklists. On success, call

View file

@ -29,6 +29,7 @@ DASHB_QUEUE_FILE = SCRIPT_DIR / ".dashboard-queue"
DASHB_DONE_FILE = SCRIPT_DIR / ".dashboard-done" DASHB_DONE_FILE = SCRIPT_DIR / ".dashboard-done"
DASHB_LAST_RUN_FILE = SCRIPT_DIR / ".dashboard-last-run" DASHB_LAST_RUN_FILE = SCRIPT_DIR / ".dashboard-last-run"
DASHB_LOCK_FILE = SCRIPT_DIR / ".dashboard-lock" DASHB_LOCK_FILE = SCRIPT_DIR / ".dashboard-lock"
DASHB_PENDING_FILE = SCRIPT_DIR / ".dashboard-pending"
# =================================================================== # ===================================================================
@ -303,7 +304,7 @@ def setup_docker_compose():
def create_dotfiles(): def create_dotfiles():
for f in (DASHB_QUEUE_FILE, DASHB_DONE_FILE, DASHB_LAST_RUN_FILE, DASHB_LOCK_FILE): for f in (DASHB_QUEUE_FILE, DASHB_DONE_FILE, DASHB_LAST_RUN_FILE, DASHB_LOCK_FILE, DASHB_PENDING_FILE):
if not f.exists(): if not f.exists():
f.touch() f.touch()
# chown to the routlin dir owner so the timer can write # chown to the routlin dir owner so the timer can write

760
routlin/status.py Normal file
View file

@ -0,0 +1,760 @@
"""
status.py -- System health checks for Routlin.
Reads core.json, checks services, configuration files, and logs, then writes
.status JSON. Imported by core.py; also runnable standalone.
Public API:
run_and_write(data) -> dict run all checks, write .status, return dict
print_table(status: dict) render the CLI service table from status dict
"""
import hashlib
import ipaddress
import json
import os
import re
import shutil
import socket
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from validation import derive_interface, derive_vlan_id, is_wg
# ===================================================================
# Constants (mirror core.py — no import to avoid circular dependency)
# ===================================================================
PRODUCT_NAME = "routlin"
SCRIPT_DIR = Path(__file__).parent
STATUS_FILE = SCRIPT_DIR / ".status"
CONFIG_FILE = SCRIPT_DIR / "core.json"
BLOCKLIST_DIR = SCRIPT_DIR / "blocklists"
DNSMASQ_CONF_DIR = Path(f"/etc/dnsmasq-{PRODUCT_NAME}")
LEASES_DIR = Path("/var/lib/misc")
NETWORKD_DIR = Path("/etc/systemd/network")
SYSTEMD_DIR = Path("/etc/systemd/system")
WG_DIR = Path("/etc/wireguard")
RESOLV_CONF = Path("/etc/resolv.conf")
AVAHI_CONF_FILE = Path("/etc/avahi/avahi-daemon.conf")
CHRONY_CONF_FILE = Path("/etc/chrony/chrony.conf")
RADIUS_SECRET_FILE = SCRIPT_DIR / ".radius-secret"
RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf")
RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users")
BLIST_TIMER_NAME = f"{PRODUCT_NAME}-dns-blocklist-update"
DASHB_TIMER_NAME = f"{PRODUCT_NAME}-dashboard-queue"
DASHB_QUEUE_FILE = SCRIPT_DIR / ".dashboard-queue"
NAT_SERVICE_NAME = f"{PRODUCT_NAME}-nat"
BLOCKLIST_STALE_SECS = 36 * 3600
DISK_WARN_PCT = 90
DHCP_WARN_PCT = 90
DNS_TIMEOUT_SECS = 2
# ===================================================================
# Small helpers replicated from core.py (no import)
# ===================================================================
def _vlan_service_name(vlan, iface):
if is_wg(vlan):
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{iface}"
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}"
def _radius_enabled(data):
return any(
r.get("radius_client") is True
for v in data.get("vlans", [])
for r in v.get("reservations", [])
)
def _avahi_enabled(data):
return any(
v.get("mdns_reflection") is True
for v in data.get("vlans", [])
if not is_wg(v)
)
def _avahi_interfaces(data):
return [
derive_interface(v, data)
for v in data.get("vlans", [])
if v.get("mdns_reflection") is True and not is_wg(v)
]
def _combo_hash(names):
key = ",".join(sorted(names))
return hashlib.sha256(key.encode()).hexdigest()[:8]
def _merged_path(h):
return BLOCKLIST_DIR / f"merged-{h}.conf"
def _lowest_quartet_ip(vlan):
ips = []
for s in vlan.get("server_identities", []):
try:
ips.append(ipaddress.IPv4Address(s["ip"]))
except (KeyError, ValueError):
pass
return str(min(ips, key=lambda ip: ip.packed[-1])) if ips else None
def _gateway_ips(data):
"""Return set of all gateway IPs across all VLANs."""
gws = set()
for vlan in data.get("vlans", []):
ip = _lowest_quartet_ip(vlan)
if ip:
gws.add(ip)
return gws
def _iface_operstate(iface):
"""Read operstate from sysfs. Returns 'up', 'down', 'unknown', or None."""
try:
return Path(f"/sys/class/net/{iface}/operstate").read_text().strip()
except OSError:
return None
def _sysctl_query(unit):
"""Return (active, enabled) strings from systemctl."""
r_a = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
r_e = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
return r_a.stdout.strip(), r_e.stdout.strip()
# ===================================================================
# Result builders
# ===================================================================
def _ok(id_, name, detail=""):
r = {"id": id_, "name": name, "status": "ok"}
if detail:
r["detail"] = detail
return r
def _problem(id_, name, severity, detail, suggestion=""):
r = {"id": id_, "name": name, "status": "problem",
"severity": severity, "detail": detail}
if suggestion:
r["suggestion"] = suggestion
return r
# ===================================================================
# Services checks
# ===================================================================
def check_services(data):
results = []
vlans = data.get("vlans", [])
units = []
for vlan in vlans:
iface = derive_interface(vlan, data)
name = _vlan_service_name(vlan, iface)
units.append({"id": name, "name": name,
"expected_active": "active", "expected_enabled": "enabled"})
units.append({"id": f"{BLIST_TIMER_NAME}.timer",
"name": f"{BLIST_TIMER_NAME}.timer",
"expected_active": "active", "expected_enabled": "enabled"})
units.append({"id": NAT_SERVICE_NAME,
"name": NAT_SERVICE_NAME,
"expected_active": "inactive",
"expected_enabled": "enabled"})
if DASHB_QUEUE_FILE.exists():
units.append({"id": f"{DASHB_TIMER_NAME}.timer",
"name": f"{DASHB_TIMER_NAME}.timer",
"expected_active": "active", "expected_enabled": "enabled"})
exp_fr_active = "active" if _radius_enabled(data) else "inactive"
exp_fr_enabled = "enabled" if _radius_enabled(data) else "disabled"
units.append({"id": "freeradius", "name": "freeradius",
"expected_active": exp_fr_active,
"expected_enabled": exp_fr_enabled})
exp_av_active = "active" if _avahi_enabled(data) else "inactive"
exp_av_enabled = "enabled" if _avahi_enabled(data) else "disabled"
units.append({"id": "avahi-daemon", "name": "avahi-daemon",
"expected_active": exp_av_active,
"expected_enabled": exp_av_enabled})
units.append({"id": "chrony", "name": "chrony",
"expected_active": "active", "expected_enabled": "enabled"})
units.append({"id": "systemd-networkd", "name": "systemd-networkd",
"expected_active": "active", "expected_enabled": "enabled"})
for u in units:
active, enabled = _sysctl_query(u["id"])
exp_active = u["expected_active"]
exp_enabled = u["expected_enabled"]
active_ok = active == exp_active
enabled_ok = enabled == exp_enabled
status = "ok" if (active_ok and enabled_ok) else "problem"
results.append({
"id": u["id"],
"name": u["name"],
"active": active,
"enabled": enabled,
"expected_active": exp_active,
"expected_enabled": exp_enabled,
"active_ok": active_ok,
"enabled_ok": enabled_ok,
"status": status,
})
return results
# ===================================================================
# Configuration checks
# ===================================================================
def check_configurations(data):
results = []
vlans = data.get("vlans", [])
non_wg = [v for v in vlans if not is_wg(v)]
wg_vlans = [v for v in vlans if is_wg(v)]
core_mtime = CONFIG_FILE.stat().st_mtime if CONFIG_FILE.exists() else 0
def file_ok(id_, name, path, severity="error", suggestion=""):
if not path.exists():
return _problem(id_, name, severity,
f"{path} does not exist.",
suggestion or f"Run sudo python3 core.py --apply to create it.")
if path.stat().st_mtime < core_mtime:
return _problem(id_, name, "warning",
f"{path} is older than core.json and may be stale.",
"Run sudo python3 core.py --apply to update it.")
return _ok(id_, name)
# --- nftables tables ---
try:
tables_out = subprocess.run(
["nft", "list", "tables"], capture_output=True, text=True
).stdout
for tbl in ("ip routlin-nat", "ip routlin-filter"):
if tbl in tables_out:
results.append(_ok(f"nft_{tbl.replace(' ', '_')}",
f"nftables table {tbl}"))
else:
results.append(_problem(
f"nft_{tbl.replace(' ', '_')}",
f"nftables table {tbl}",
"error",
f"nftables table '{tbl}' is missing.",
"Run sudo python3 core.py --apply to rebuild firewall rules."))
except Exception:
results.append(_problem("nft_tables", "nftables tables", "error",
"Could not query nftables (nft not available or failed)."))
# --- Docker bridge rules ---
try:
bridges = [
p.parent.name
for p in Path("/sys/class/net").glob("*/bridge")
if _iface_operstate(p.parent.name) == "up"
]
if bridges:
fwd_out = subprocess.run(
["nft", "list", "chain", "ip", "routlin-filter", "forward"],
capture_output=True, text=True
).stdout
missing = [b for b in bridges if b not in fwd_out]
if missing:
results.append(_problem(
"nft_docker_bridges", "nftables Docker bridge rules", "warning",
f"Container bridge(s) {', '.join(missing)} have no nftables forward rules.",
"Run sudo python3 core.py --apply to add the missing rules."))
else:
results.append(_ok("nft_docker_bridges", "nftables Docker bridge rules"))
except Exception:
pass
# --- VLAN sub-interfaces ---
for vlan in non_wg:
iface = derive_interface(vlan, data)
vid = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24))
state = _iface_operstate(iface)
id_ = f"iface_{vlan['name']}"
name = f"interface {iface}"
if state is None:
results.append(_problem(id_, name, "error",
f"Interface {iface} does not exist in /sys/class/net/.",
"Run sudo python3 core.py --apply to configure network interfaces."))
elif state != "up":
results.append(_problem(id_, name, "error",
f"Interface {iface} operstate is '{state}' (expected 'up').",
"Check systemd-networkd: sudo systemctl status systemd-networkd"))
else:
results.append(_ok(id_, name))
# --- WireGuard interfaces ---
for vlan in wg_vlans:
iface = derive_interface(vlan, data)
state = _iface_operstate(iface)
id_ = f"iface_wg_{vlan['name']}"
name = f"WireGuard interface {iface}"
if state is None:
results.append(_problem(id_, name, "error",
f"WireGuard interface {iface} does not exist.",
"Run sudo python3 core.py --apply to bring up WireGuard."))
elif state != "up":
results.append(_problem(id_, name, "error",
f"WireGuard interface {iface} operstate is '{state}'.",
f"Try: sudo wg-quick up {iface}"))
else:
results.append(_ok(id_, name))
# --- Stale WG interfaces when no WG VLANs configured ---
if not wg_vlans:
stale_wg = [
p.name for p in Path("/sys/class/net").iterdir()
if p.name.startswith("wg") and re.match(r"^wg\d+$", p.name)
]
if stale_wg:
results.append(_problem(
"stale_wg_ifaces", "Stale WireGuard interfaces", "warning",
f"WireGuard interface(s) {', '.join(stale_wg)} exist but no VPN VLANs are configured.",
f"Bring them down manually: sudo wg-quick down {stale_wg[0]}"))
# --- dnsmasq config files ---
for vlan in vlans:
path = DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
results.append(file_ok(f"dnsmasq_conf_{vlan['name']}",
f"dnsmasq config {path.name}", path))
# --- systemd-networkd files ---
for vlan in non_wg:
iface = derive_interface(vlan, data)
vid = derive_vlan_id(vlan.get("subnet", ""), vlan.get("subnet_mask", 24))
net = NETWORKD_DIR / f"10-{PRODUCT_NAME}-{vlan['name']}.network"
results.append(file_ok(f"networkd_net_{vlan['name']}",
f"networkd {net.name}", net))
if vid != 1: # non-physical VLANs have a .netdev too
netdev = NETWORKD_DIR / f"10-{PRODUCT_NAME}-{vlan['name']}.netdev"
results.append(file_ok(f"networkd_netdev_{vlan['name']}",
f"networkd {netdev.name}", netdev))
# --- systemd unit files ---
for path in (SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service",
SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.timer",
SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.service"):
results.append(file_ok(f"unit_{path.stem}", f"systemd unit {path.name}", path))
# --- WireGuard config and key files ---
for vlan in wg_vlans:
iface = derive_interface(vlan, data)
conf = WG_DIR / f"{iface}.conf"
key = WG_DIR / f"{iface}.key"
pub = SCRIPT_DIR / f".{iface}.pub"
results.append(file_ok(f"wg_conf_{iface}", f"WireGuard {conf.name}", conf))
results.append(file_ok(f"wg_key_{iface}", f"WireGuard {key.name}", key))
results.append(file_ok(f"wg_pubkey_{iface}", f"WireGuard {pub.name}", pub))
# --- Stale WG conf files when no WG VLANs ---
if not wg_vlans and WG_DIR.exists():
stale = [
p for p in WG_DIR.glob("wg*.conf")
if p.read_text().startswith("# Generated by")
]
if stale:
results.append(_problem(
"stale_wg_conf", "Stale WireGuard config files", "warning",
f"{', '.join(p.name for p in stale)} exist but no VPN VLANs are configured.",
"Remove with: sudo rm " + " ".join(str(p) for p in stale)))
# --- RADIUS files and secret check ---
if _radius_enabled(data):
results.append(file_ok("radius_secret_file", ".radius-secret file",
RADIUS_SECRET_FILE, "error"))
results.append(file_ok("radius_clients_conf", "FreeRADIUS clients.conf",
RADIUS_CLIENTS_CONF, "error"))
results.append(file_ok("radius_users_file", "FreeRADIUS users",
RADIUS_USERS_FILE, "error"))
# Secret content match
try:
secret = RADIUS_SECRET_FILE.read_text().strip()
conf_text = RADIUS_CLIENTS_CONF.read_text()
secret_ok = any(
line.strip().split("=", 1)[-1].strip() == secret
for line in conf_text.splitlines()
if "secret" in line and not line.strip().startswith("#")
)
if secret_ok:
results.append(_ok("radius_secret_match", "FreeRADIUS shared secret"))
else:
results.append(_problem(
"radius_secret_match", "FreeRADIUS shared secret", "error",
"clients.conf secret does not match .radius-secret. "
"Access points will reject all authentication requests.",
"Restore .radius-secret from backup, or run sudo python3 core.py --apply "
"then update the shared secret in your AP controller."))
except OSError:
pass # already caught above by file_ok
else:
# RADIUS not enabled — warn if generated config files still exist
if RADIUS_CLIENTS_CONF.exists():
try:
if "# Generated by" in RADIUS_CLIENTS_CONF.read_text():
results.append(_problem(
"radius_conf_orphan", "FreeRADIUS config", "warning",
"FreeRADIUS clients.conf contains routlin-generated content "
"but RADIUS is not enabled.",
"This is harmless if freeradius is stopped. "
"Remove with: sudo rm " + str(RADIUS_CLIENTS_CONF)))
except OSError:
pass
# --- Avahi config ---
if _avahi_enabled(data):
results.append(file_ok("avahi_conf", "avahi-daemon.conf",
AVAHI_CONF_FILE, "warning"))
if AVAHI_CONF_FILE.exists():
expected_ifaces = set(_avahi_interfaces(data))
try:
text = AVAHI_CONF_FILE.read_text()
m = re.search(r"allow-interfaces\s*=\s*(.+)", text)
if m:
actual_ifaces = {i.strip() for i in m.group(1).split(",")}
missing = expected_ifaces - actual_ifaces
extra = actual_ifaces - expected_ifaces
if missing or extra:
results.append(_problem(
"avahi_ifaces", "avahi-daemon interface list", "warning",
f"avahi-daemon.conf interface list does not match config "
f"(missing: {missing or 'none'}, extra: {extra or 'none'}).",
"Run sudo python3 core.py --apply to update."))
else:
results.append(_ok("avahi_ifaces",
"avahi-daemon interface list"))
except OSError:
pass
# --- resolv.conf ---
gateway_ips = _gateway_ips(data)
try:
resolv = RESOLV_CONF.read_text()
ns_ips = {
line.split()[1]
for line in resolv.splitlines()
if line.startswith("nameserver") and len(line.split()) >= 2
}
if ns_ips & gateway_ips:
results.append(_ok("resolv_conf", "/etc/resolv.conf"))
else:
results.append(_problem(
"resolv_conf", "/etc/resolv.conf", "warning",
f"/etc/resolv.conf nameserver(s) {ns_ips} do not include any VLAN gateway. "
f"Expected one of: {gateway_ips}.",
"Run sudo python3 core.py --apply to update /etc/resolv.conf."))
except OSError:
results.append(_problem("resolv_conf", "/etc/resolv.conf", "warning",
"/etc/resolv.conf is not readable.",
"Run sudo python3 core.py --apply."))
# --- chrony.conf ---
if CHRONY_CONF_FILE.exists():
try:
content = CHRONY_CONF_FILE.read_text()
missing_subnets = []
for vlan in non_wg:
try:
network = ipaddress.IPv4Network(
f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
cidr = str(network)
if f"allow {cidr}" not in content and f"allow {vlan['subnet']}" not in content:
missing_subnets.append(cidr)
except Exception:
pass
if missing_subnets:
results.append(_problem(
"chrony_conf", "/etc/chrony/chrony.conf", "warning",
f"chrony.conf is missing allow directives for: {', '.join(missing_subnets)}.",
"Run sudo python3 core.py --apply to update chrony.conf."))
else:
results.append(_ok("chrony_conf", "/etc/chrony/chrony.conf"))
except OSError:
results.append(_problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
"/etc/chrony/chrony.conf is not readable."))
else:
results.append(_problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
"/etc/chrony/chrony.conf does not exist.",
"Install chrony: sudo apt-get install chrony"))
# --- Stale WG conf when no WG VLANs (already handled above) ---
# --- DHCP pool utilization ---
for vlan in non_wg:
try:
dhcp = vlan.get("dhcp_information", {})
start = dhcp.get("pool_start", "")
end = dhcp.get("pool_end", "")
if not start or not end:
continue
pool_size = (int(ipaddress.IPv4Address(end))
- int(ipaddress.IPv4Address(start)) + 1)
if pool_size <= 0:
continue
lease_file = LEASES_DIR / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.leases"
if not lease_file.exists():
continue
leases = [
l for l in lease_file.read_text().splitlines()
if l.strip() and not l.startswith("#")
]
pct = len(leases) * 100 // pool_size
if pct >= DHCP_WARN_PCT:
results.append(_problem(
f"dhcp_pool_{vlan['name']}",
f"DHCP pool ({vlan['name']})", "warning",
f"DHCP pool for VLAN '{vlan['name']}' is {pct}% full "
f"({len(leases)}/{pool_size} leases).",
"Expand the pool range in core.json or clean up stale leases "
"with: sudo python3 core.py --reset-leases " + vlan['name']))
else:
results.append(_ok(f"dhcp_pool_{vlan['name']}",
f"DHCP pool ({vlan['name']})",
f"{pct}% used ({len(leases)}/{pool_size})"))
except Exception:
pass
# --- Blocklist file freshness ---
blocklists = data.get("blocklists", [])
if blocklists:
combos = {}
for vlan in vlans:
names = vlan.get("use_blocklists", [])
if names:
combos[_combo_hash(names)] = names
now = datetime.now(timezone.utc).timestamp()
for h, names in combos.items():
path = _merged_path(h)
label = ", ".join(names)
if not path.exists():
results.append(_problem(
f"blocklist_{h}", f"blocklist ({label})", "warning",
f"Merged blocklist file for '{label}' does not exist.",
"Run sudo python3 core.py --update-blocklists to download blocklists."))
elif now - path.stat().st_mtime > BLOCKLIST_STALE_SECS:
age_h = int((now - path.stat().st_mtime) / 3600)
results.append(_problem(
f"blocklist_{h}", f"blocklist ({label})", "warning",
f"Merged blocklist for '{label}' is {age_h}h old (threshold 36h).",
"Run sudo python3 core.py --update-blocklists to refresh."))
else:
results.append(_ok(f"blocklist_{h}", f"blocklist ({label})"))
# --- Disk space ---
try:
usage = shutil.disk_usage("/")
pct = usage.used * 100 // usage.total
if pct >= DISK_WARN_PCT:
results.append(_problem(
"disk_space", "Disk space", "warning",
f"Root filesystem is {pct}% full "
f"({usage.used // 1_073_741_824}G of {usage.total // 1_073_741_824}G used).",
"Free up disk space to avoid service disruption."))
else:
results.append(_ok("disk_space", "Disk space",
f"{pct}% used"))
except Exception:
pass
# --- Upstream DNS reachability ---
servers = data.get("upstream_dns", {}).get("upstream_servers", [])
unreachable = []
for srv in servers:
try:
with socket.create_connection((srv, 53), timeout=DNS_TIMEOUT_SECS):
pass
except OSError:
unreachable.append(srv)
if unreachable:
results.append(_problem(
"upstream_dns", "Upstream DNS reachability", "warning",
f"Upstream DNS server(s) unreachable on port 53: {', '.join(unreachable)}.",
"Check WAN connectivity and upstream DNS server addresses in core.json."))
elif servers:
results.append(_ok("upstream_dns", "Upstream DNS reachability"))
return results
# ===================================================================
# Log checks
# ===================================================================
def check_logs(data):
results = []
# --- FreeRADIUS auth failures ---
radius_log = Path("/var/log/freeradius/radius.log")
if radius_log.exists():
try:
now = datetime.now(timezone.utc).timestamp()
cutoff = now - 3600
lines = radius_log.read_text(errors="replace").splitlines()
# Parse lines with timestamps like "Thu May 21 11:53:47 2026 : Info: ..."
recent = []
failure_re = re.compile(r"Shared secret is incorrect")
ts_re = re.compile(
r"(\w+ \w+ +\d+ \d+:\d+:\d+ \d+) : ")
for line in lines[-2000:]: # scan last 2000 lines
m = ts_re.match(line)
if not m:
continue
try:
ts = datetime.strptime(m.group(1), "%a %b %d %H:%M:%S %Y")
ts = ts.replace(tzinfo=timezone.utc)
if ts.timestamp() >= cutoff:
recent.append(line)
except ValueError:
pass
failures = [l for l in recent if failure_re.search(l)]
if failures:
# Extract distinct AP names from "(from client ...)" pattern
ap_re = re.compile(r"\(from client ([^)]+)\)")
aps = sorted({m.group(1) for l in failures
for m in ap_re.finditer(l)})
ap_str = ", ".join(aps) if aps else f"{len(failures)} request(s)"
results.append(_problem(
"freeradius_auth_failures",
"FreeRADIUS auth failures", "error",
f"FreeRADIUS is rejecting requests from {ap_str} with "
f"'Shared secret is incorrect' ({len(failures)} failures in the last hour).",
"Restore .radius-secret from backup and run sudo python3 core.py --apply, "
"or update the shared secret in your AP controller to match .radius-secret."))
else:
results.append(_ok("freeradius_auth_failures",
"FreeRADIUS auth failures"))
# High rejection rate (>50% of recent activity is failures)
if recent and len(failures) > len(recent) * 0.5 and not failures:
results.append(_problem(
"freeradius_high_reject_rate",
"FreeRADIUS rejection rate", "warning",
f"Over half of recent FreeRADIUS activity ({len(failures)}/{len(recent)}) "
f"are auth failures.",
"Investigate FreeRADIUS config and shared secrets."))
elif recent:
results.append(_ok("freeradius_high_reject_rate",
"FreeRADIUS rejection rate"))
except OSError:
pass
# --- dnsmasq errors ---
try:
r = subprocess.run(
["journalctl", f"-u", f"dnsmasq-{PRODUCT_NAME}-*",
"--since", "-1h", "--priority=err", "--no-pager", "-q"],
capture_output=True, text=True, timeout=5
)
err_lines = [l for l in r.stdout.splitlines() if l.strip()]
if err_lines:
results.append(_problem(
"dnsmasq_errors", "dnsmasq errors", "error",
f"{len(err_lines)} dnsmasq error(s) in the last hour: "
f"{err_lines[0][:120]}{'...' if len(err_lines) > 1 else ''}",
"Check dnsmasq logs: sudo journalctl -u 'dnsmasq-routlin-*' --since -1h"))
else:
results.append(_ok("dnsmasq_errors", "dnsmasq errors"))
except Exception:
pass
return results
# ===================================================================
# Next blocklist update
# ===================================================================
def _next_blocklist_update():
try:
r = subprocess.run(
["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
capture_output=True, text=True, timeout=5
)
for line in r.stdout.splitlines():
line = line.strip()
if line.startswith("Trigger:"):
trigger = line.split("Trigger:", 1)[1].strip()
if trigger and trigger != "n/a":
return trigger
except Exception:
pass
return None
# ===================================================================
# Public API
# ===================================================================
def run_and_write(data):
"""Run all checks, write .status atomically, return the status dict."""
status = {
"checked_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"services": check_services(data),
"configurations": check_configurations(data),
"logs": check_logs(data),
"next_blocklist_update": _next_blocklist_update(),
}
tmp = STATUS_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(status, indent=2))
tmp.replace(STATUS_FILE)
return status
def print_table(status):
"""Print the service status table and any problems to stdout."""
col = shutil.get_terminal_size((80, 24)).columns
services = status.get("services", [])
print(f"\n {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
print(f" {'-'*45} {'-'*18} {'-'*15}")
for svc in services:
active = svc.get("active", "unknown")
enabled = svc.get("enabled", "unknown")
a_ok = svc.get("active_ok", True)
e_ok = svc.get("enabled_ok", True)
a_sym = "+" if active == "active" else "x"
e_sym = "+" if enabled == "enabled" else "x"
a_status = "(OK) " if a_ok else "(BAD)"
e_status = "(OK) " if e_ok else "(BAD)"
print(f" {svc['name']:<45} "
f"{a_sym} {active:<10} {a_status} "
f"{e_sym} {enabled:<10} {e_status}")
trigger = status.get("next_blocklist_update")
if trigger:
print(f"\n Next blocklist update: {trigger}")
problems = [
item
for section in ("configurations", "logs")
for item in status.get(section, [])
if item.get("status") == "problem"
]
if problems:
print(f"\n Problems {'=' * (col - 12)}")
for p in problems:
sev = p.get("severity", "error")
tag = f"[{sev}]"
detail = p.get("detail", p.get("name", ""))
print(f" {tag:<10} {detail}")
tip = p.get("suggestion", "")
if tip:
print(f" {'':10} -> {tip}")
print()
# ===================================================================
# Standalone entry point
# ===================================================================
if __name__ == "__main__":
try:
with open(CONFIG_FILE) as f:
data = json.load(f)
except Exception as ex:
print(f"Error loading {CONFIG_FILE}: {ex}", file=sys.stderr)
sys.exit(1)
status = run_and_write(data)
print_table(status)

View file

@ -701,6 +701,19 @@ def validate_config(data):
errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). " errors.append(f"Multiple VLANs have radius_default: true ({', '.join(defaults)}). "
f"Only one VLAN may be the RADIUS default.") f"Only one VLAN may be the RADIUS default.")
# -- RADIUS requires multiple VLANs ----------------------------------------
non_wg_vlans = [v for v in data.get("vlans", []) if not is_wg(v)]
has_radius_clients = any(
r.get("radius_client")
for v in non_wg_vlans
for r in v.get("reservations", [])
)
if has_radius_clients and len(non_wg_vlans) < 2:
errors.append(
"RADIUS clients are configured but only one non-VPN VLAN exists. "
"Dynamic VLAN assignment requires at least two VLANs."
)
# -- host_overrides validation --------------------------------------------- # -- host_overrides validation ---------------------------------------------
all_vlan_nets = list(vlan_networks.values()) all_vlan_nets = list(vlan_networks.values())
for idx, entry in enumerate(data.get("host_overrides", [])): for idx, entry in enumerate(data.get("host_overrides", [])):