""" health.py -- System health checks for Routlin. Reads config.json, checks services, configuration files, and logs, then writes .health JSON. Imported by core.py; also runnable standalone. Public API: run_and_write(data) -> (bool, dict) run all checks, write .health, return (all_healthy, status) print_table(status: dict) render the CLI service table from status dict """ import hashlib import ipaddress import json import re import shutil import socket import subprocess import sys from datetime import datetime, timezone from pathlib import Path import mod_shared as shared import mod_validation as validation # =================================================================== # Constants # =================================================================== HEALTH_FILE = shared.SCRIPT_DIR / ".health" CONFIG_FILE = shared.SCRIPT_DIR / "config.json" BLOCKLIST_DIR = shared.SCRIPT_DIR / "blocklists" NETWORKD_DIR = Path("/etc/systemd/network") WG_DIR = Path("/etc/wireguard") RESOLV_CONF = Path("/etc/resolv.conf") AVAHI_CONF_FILE = Path("/etc/avahi/avahi-daemon.conf") CHRONY_CONF_FILE = Path("/etc/chrony/chrony.conf") RADIUS_SECRET_FILE = shared.SCRIPT_DIR / ".radius-secret" RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf") RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users") BLIST_TIMER_NAME = f"{shared.PRODUCT_NAME}-dns-blocklist-update" DASHB_TIMER_NAME = f"{shared.PRODUCT_NAME}-dashboard-queue" HEALTH_TIMER_NAME = f"{shared.PRODUCT_NAME}-health-check" MAINT_TIMER_NAME = f"{shared.PRODUCT_NAME}-maintenance" DASHB_QUEUE_FILE = shared.SCRIPT_DIR / ".dashboard-queue" NAT_SERVICE_NAME = f"{shared.PRODUCT_NAME}-nat" BLOCKLIST_STALE_SECS = 36 * 3600 DISK_WARN_PCT = 90 DHCP_WARN_PCT = 90 DNS_TIMEOUT_SECS = 2 # =================================================================== # Small helpers # =================================================================== def radius_enabled(data): return any( r.get("radius_client") is True for r in data.get("dhcp_reservations", []) ) def avahi_enabled(data): return any( v.get("mdns_reflection") is True for v in data.get("vlans", []) if not validation.is_wg(v) ) def _avahi_interfaces(data): return [ validation.derive_interface(v, data) for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not validation.is_wg(v) ] def _vlan_hosts_file(vlan): return shared.DNSMASQ_CONF_DIR / f"for-{vlan['name']}.hosts" def _gateway_ips(data): """Return set of all gateway IPs across all VLANs.""" gws = set() for vlan in data.get("vlans", []): ip = shared.lowest_quartet_ip(vlan) if ip: gws.add(ip) return gws def iface_operstate(iface): """Read operstate from sysfs. Returns 'up', 'down', 'unknown', or None.""" try: return Path(f"/sys/class/net/{iface}/operstate").read_text().strip() except OSError: return None def _sysctl_query(unit): """Return (active, enabled) strings from systemctl.""" r_a = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True) r_e = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True) enabled = r_e.stdout.strip() or "not-found" return r_a.stdout.strip(), enabled # =================================================================== # Result builders # =================================================================== def ok(id_, name, detail=""): r = {"id": id_, "name": name, "status": "ok"} if detail: r["detail"] = detail return r def problem(id_, name, severity, detail, suggestion=""): r = {"id": id_, "name": name, "status": "problem", "severity": severity, "detail": detail} if suggestion: r["suggestion"] = suggestion return r # =================================================================== # Services checks # =================================================================== def check_services(data): results = [] vlans = data.get("vlans", []) units = [] for vlan in vlans: iface = validation.derive_interface(vlan, data) name = shared.vlan_service_name(vlan, iface) units.append({"id": name, "name": name, "expected_active": "active", "expected_enabled": "enabled", "severity": "error"}) units.append({"id": f"{BLIST_TIMER_NAME}.timer", "name": f"{BLIST_TIMER_NAME}.timer", "expected_active": "active", "expected_enabled": "enabled", "severity": "warning"}) units.append({"id": NAT_SERVICE_NAME, "name": NAT_SERVICE_NAME, "expected_active": "inactive", "expected_enabled": "enabled", "severity": "error"}) units.append({"id": f"{HEALTH_TIMER_NAME}.timer", "name": f"{HEALTH_TIMER_NAME}.timer", "expected_active": "active", "expected_enabled": "enabled", "severity": "warning"}) if DASHB_QUEUE_FILE.exists(): units.append({"id": f"{DASHB_TIMER_NAME}.timer", "name": f"{DASHB_TIMER_NAME}.timer", "expected_active": "active", "expected_enabled": "enabled", "severity": "error"}) has_ddns = any(p.get("enabled") for p in data.get("ddns", {}).get("providers", [])) exp_ddns_active = "active" if has_ddns else "inactive" exp_ddns_enabled = "enabled" if has_ddns else "not-found" units.append({"id": f"{MAINT_TIMER_NAME}.timer", "name": f"{MAINT_TIMER_NAME}.timer", "expected_active": exp_ddns_active, "expected_enabled": exp_ddns_enabled, "severity": "warning"}) exp_fr_active = "active" if radius_enabled(data) else "inactive" exp_fr_enabled = "enabled" if radius_enabled(data) else "disabled" units.append({"id": "freeradius", "name": "freeradius", "expected_active": exp_fr_active, "expected_enabled": exp_fr_enabled, "severity": "error"}) exp_av_active = "active" if avahi_enabled(data) else "inactive" exp_av_enabled = "enabled" if avahi_enabled(data) else "disabled" units.append({"id": "avahi-daemon", "name": "avahi-daemon", "expected_active": exp_av_active, "expected_enabled": exp_av_enabled, "severity": "warning"}) units.append({"id": "chrony", "name": "chrony", "expected_active": "active", "expected_enabled": "enabled", "severity": "warning"}) units.append({"id": "systemd-networkd", "name": "systemd-networkd", "expected_active": "active", "expected_enabled": "enabled", "severity": "error"}) for u in units: active, enabled = _sysctl_query(u["id"]) exp_active = u["expected_active"] exp_enabled = u["expected_enabled"] active_ok = active == exp_active enabled_ok = enabled == exp_enabled svc_status = "ok" if (active_ok and enabled_ok) else "problem" results.append({ "id": u["id"], "name": u["name"], "active": active, "enabled": enabled, "expected_active": exp_active, "expected_enabled": exp_enabled, "active_ok": active_ok, "enabled_ok": enabled_ok, "severity": u.get("severity", "error"), "status": svc_status, }) return results # =================================================================== # Configuration checks # =================================================================== def check_configurations(data): results = [] vlans = data.get("vlans", []) non_wg = [v for v in vlans if not validation.is_wg(v)] wg_vlans = [v for v in vlans if validation.is_wg(v)] def file_ok(id_, name, path, severity="error", suggestion=""): try: exists = path.exists() except PermissionError: return problem(id_, name, "warning", f"{path}: permission denied - run with sudo for accurate status.") if not exists: return problem(id_, name, severity, f"{path} does not exist.", suggestion or f"Run `sudo python3 core.py --apply` to create it.") return ok(id_, name) # --- nftables tables --- try: tables_out = subprocess.run( ["nft", "list", "tables"], capture_output=True, text=True ).stdout for tbl in ("ip routlin-nat", "ip routlin-filter"): if tbl in tables_out: results.append(ok(f"nft_{tbl.replace(' ', '_')}", f"nftables table {tbl}")) else: results.append(problem( f"nft_{tbl.replace(' ', '_')}", f"nftables table {tbl}", "error", f"nftables table '{tbl}' is missing.", "Run `sudo python3 core.py --apply` to rebuild firewall rules.")) except Exception: results.append(problem("nft_tables", "nftables tables", "error", "Could not query nftables (nft not available or failed).")) # --- Docker bridge rules --- try: bridges = [ p.parent.name for p in Path("/sys/class/net").glob("*/bridge") if iface_operstate(p.parent.name) == "up" ] if bridges: fwd_out = subprocess.run( ["nft", "list", "chain", "ip", "routlin-filter", "forward"], capture_output=True, text=True ).stdout missing = [b for b in bridges if b not in fwd_out] if missing: results.append(problem( "nft_docker_bridges", "nftables Docker bridge rules", "warning", f"Container bridge(s) {', '.join(missing)} have no nftables forward rules.", "Run `sudo python3 core.py --apply` to add the missing rules.")) else: results.append(ok("nft_docker_bridges", "nftables Docker bridge rules")) except Exception: pass # --- VLAN sub-interfaces --- for vlan in non_wg: iface = validation.derive_interface(vlan, data) state = iface_operstate(iface) id_ = f"iface_{vlan['name']}" name = f"interface {iface}" if state is None: results.append(problem(id_, name, "error", f"Interface {iface} does not exist in /sys/class/net/.", "Run `sudo python3 core.py --apply` to configure network interfaces.")) elif state != "up": results.append(problem(id_, name, "error", f"Interface {iface} operstate is '{state}' (expected 'up').", "Check systemd-networkd: `sudo systemctl status systemd-networkd`")) else: results.append(ok(id_, name)) # --- WireGuard interfaces --- for vlan in wg_vlans: iface = validation.derive_interface(vlan, data) state = iface_operstate(iface) id_ = f"iface_wg_{vlan['name']}" name = f"WireGuard interface {iface}" if state is None: results.append(problem(id_, name, "error", f"WireGuard interface {iface} does not exist.", "Run `sudo python3 core.py --apply` to bring up WireGuard.")) elif state in ("up", "unknown"): # WireGuard interfaces normally report 'unknown' results.append(ok(id_, name)) else: results.append(problem(id_, name, "error", f"WireGuard interface {iface} operstate is '{state}'.", f"Try: sudo wg-quick up {iface}")) # --- Stale WG interfaces when no WG VLANs configured --- if not wg_vlans: stale_wg = [ p.name for p in Path("/sys/class/net").iterdir() if p.name.startswith("wg") and re.match(r"^wg\d+$", p.name) ] if stale_wg: results.append(problem( "stale_wg_ifaces", "Stale WireGuard interfaces", "warning", f"WireGuard interface(s) {', '.join(stale_wg)} exist but no VPN VLANs are configured.", f"Bring them down manually: sudo wg-quick down {stale_wg[0]}")) # --- dnsmasq config files --- for vlan in vlans: path = shared.DNSMASQ_CONF_DIR / f"{vlan['name']}.conf" results.append(file_ok(f"dnsmasq_conf_{vlan['name']}", f"dnsmasq config {path.name}", path)) # --- systemd-networkd files --- for vlan in non_wg: iface = validation.derive_interface(vlan, data) vid = vlan.get("vlan_id") net = NETWORKD_DIR / f"10-{shared.PRODUCT_NAME}-{vlan['name']}.network" results.append(file_ok(f"networkd_net_{vlan['name']}", f"networkd {net.name}", net)) if vid != 1: # non-physical VLANs have a .netdev too netdev = NETWORKD_DIR / f"10-{shared.PRODUCT_NAME}-{vlan['name']}.netdev" results.append(file_ok(f"networkd_netdev_{vlan['name']}", f"networkd {netdev.name}", netdev)) # --- systemd unit files --- for path in (shared.SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service", shared.SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.timer", shared.SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.service"): results.append(file_ok(f"unit_{path.stem}", f"systemd unit {path.name}", path)) # --- WireGuard config and key files --- for vlan in wg_vlans: iface = validation.derive_interface(vlan, data) conf = WG_DIR / f"{iface}.conf" key = WG_DIR / f"{iface}.key" pub = shared.SCRIPT_DIR / f".{iface}.pub" results.append(file_ok(f"wg_conf_{iface}", f"WireGuard {conf.name}", conf)) results.append(file_ok(f"wg_key_{iface}", f"WireGuard {key.name}", key)) results.append(file_ok(f"wg_pubkey_{iface}", f"WireGuard {pub.name}", pub)) # --- Stale WG conf files when no WG VLANs --- if not wg_vlans and WG_DIR.exists(): stale = [ p for p in WG_DIR.glob("wg*.conf") if p.read_text().startswith("# Generated by") ] if stale: results.append(problem( "stale_wg_conf", "Stale WireGuard config files", "warning", f"{', '.join(p.name for p in stale)} exist but no VPN VLANs are configured.", "Remove with: sudo rm " + " ".join(str(p) for p in stale))) # --- RADIUS files and secret check --- if radius_enabled(data): results.append(file_ok("radius_secret_file", ".radius-secret file", RADIUS_SECRET_FILE, "error")) results.append(file_ok("radius_clients_conf", "FreeRADIUS clients.conf", RADIUS_CLIENTS_CONF, "error")) results.append(file_ok("radius_users_file", "FreeRADIUS users", RADIUS_USERS_FILE, "error")) # Secret content match try: secret = RADIUS_SECRET_FILE.read_text().strip() conf_text = RADIUS_CLIENTS_CONF.read_text() secret_ok = any( line.strip().split("=", 1)[-1].strip() == secret for line in conf_text.splitlines() if "secret" in line and not line.strip().startswith("#") ) if secret_ok: results.append(ok("radius_secret_match", "FreeRADIUS shared secret")) else: results.append(problem( "radius_secret_match", "FreeRADIUS shared secret", "error", "clients.conf secret does not match .radius-secret. " "Access points will reject all authentication requests.", "Restore .radius-secret from backup, or run `sudo python3 core.py --apply` " "then update the shared secret in your AP controller.")) except OSError: pass # already caught above by file_ok else: # RADIUS not enabled - warn if generated config files still exist if RADIUS_CLIENTS_CONF.exists(): try: if "# Generated by" in RADIUS_CLIENTS_CONF.read_text(): results.append(problem( "radius_conf_orphan", "FreeRADIUS config", "warning", "FreeRADIUS clients.conf contains routlin-generated content " "but RADIUS is not enabled.", "This is harmless if freeradius is stopped. " "Remove with: sudo rm " + str(RADIUS_CLIENTS_CONF))) except OSError: pass # --- Avahi config --- if avahi_enabled(data): results.append(file_ok("avahi_conf", "avahi-daemon.conf", AVAHI_CONF_FILE, "warning")) if AVAHI_CONF_FILE.exists(): expected_ifaces = set(_avahi_interfaces(data)) try: text = AVAHI_CONF_FILE.read_text() m = re.search(r"allow-interfaces\s*=\s*(.+)", text) if m: actual_ifaces = {i.strip() for i in m.group(1).split(",")} missing = expected_ifaces - actual_ifaces extra = actual_ifaces - expected_ifaces if missing or extra: results.append(problem( "avahi_ifaces", "avahi-daemon interface list", "warning", f"avahi-daemon.conf interface list does not match config " f"(missing: {missing or 'none'}, extra: {extra or 'none'}).", "Run `sudo python3 core.py --apply` to update.")) else: results.append(ok("avahi_ifaces", "avahi-daemon interface list")) except OSError: pass # --- resolv.conf --- gateway_ips = _gateway_ips(data) try: resolv = RESOLV_CONF.read_text() ns_ips = { line.split()[1] for line in resolv.splitlines() if line.startswith("nameserver") and len(line.split()) >= 2 } if ns_ips & gateway_ips: results.append(ok("resolv_conf", "/etc/resolv.conf")) else: results.append(problem( "resolv_conf", "/etc/resolv.conf", "warning", f"/etc/resolv.conf nameserver(s) {ns_ips} do not include any VLAN gateway. " f"Expected one of: {gateway_ips}.", "Run `sudo python3 core.py --apply` to update /etc/resolv.conf.")) except OSError: results.append(problem("resolv_conf", "/etc/resolv.conf", "warning", "/etc/resolv.conf is not readable.", "Run `sudo python3 core.py --apply`.")) # --- chrony.conf --- if CHRONY_CONF_FILE.exists(): try: content = CHRONY_CONF_FILE.read_text() missing_subnets = [] for vlan in non_wg: try: network = ipaddress.IPv4Network( f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False) cidr = str(network) if f"allow {cidr}" not in content and f"allow {vlan['subnet']}" not in content: missing_subnets.append(cidr) except Exception: pass if missing_subnets: results.append(problem( "chrony_conf", "/etc/chrony/chrony.conf", "warning", f"chrony.conf is missing allow directives for: {', '.join(missing_subnets)}.", "Run `sudo python3 core.py --apply` to update chrony.conf.")) else: results.append(ok("chrony_conf", "/etc/chrony/chrony.conf")) except OSError: results.append(problem("chrony_conf", "/etc/chrony/chrony.conf", "warning", "/etc/chrony/chrony.conf is not readable.")) else: results.append(problem("chrony_conf", "/etc/chrony/chrony.conf", "warning", "/etc/chrony/chrony.conf does not exist.", "Install chrony: sudo apt-get install chrony")) # --- DHCP pool utilization --- for vlan in non_wg: try: dhcp = vlan.get("dhcp_information", {}) start = dhcp.get("pool_start", "") end = dhcp.get("pool_end", "") if not start or not end: continue pool_size = (int(ipaddress.IPv4Address(end)) - int(ipaddress.IPv4Address(start)) + 1) if pool_size <= 0: continue lease_file = shared.LEASES_DIR / f"dnsmasq-{shared.PRODUCT_NAME}-{vlan['name']}.leases" if not lease_file.exists(): continue leases = [ l for l in lease_file.read_text().splitlines() if l.strip() and not l.startswith("#") ] pct = len(leases) * 100 // pool_size if pct >= DHCP_WARN_PCT: results.append(problem( f"dhcp_pool_{vlan['name']}", f"DHCP pool ({vlan['name']})", "warning", f"DHCP pool for VLAN '{vlan['name']}' is {pct}% full " f"({len(leases)}/{pool_size} leases).", "Expand the pool range in config.json or clean up stale leases " f"with: `sudo python3 core.py --reset-leases {vlan['name']}`")) else: results.append(ok(f"dhcp_pool_{vlan['name']}", f"DHCP pool ({vlan['name']})", f"{pct}% used ({len(leases)}/{pool_size})")) except Exception: pass # --- Blocklist file freshness --- now = datetime.now(timezone.utc).timestamp() for vlan in vlans: names = vlan.get("use_blocklists", []) if not names: continue vlan_name = vlan["name"] path = _vlan_hosts_file(vlan) label = ", ".join(sorted(names)) if not path.exists(): results.append(problem( f"blocklist_{vlan_name}", f"blocklist ({vlan_name})", "warning", f"Blocklist hosts file for '{vlan_name}' does not exist.", "Run `sudo python3 dl_blocklists.py && sudo python3 core.py --merge-blocklists`.")) elif now - path.stat().st_mtime > BLOCKLIST_STALE_SECS: age_h = int((now - path.stat().st_mtime) / 3600) results.append(problem( f"blocklist_{vlan_name}", f"blocklist ({vlan_name})", "warning", f"Blocklist hosts file for '{vlan_name}' is {age_h}h old (threshold 36h).", "Run `sudo python3 dl_blocklists.py && sudo python3 core.py --merge-blocklists`.")) else: results.append(ok(f"blocklist_{vlan_name}", f"blocklist ({vlan_name})")) # --- Disk space --- try: usage = shutil.disk_usage("/") pct = usage.used * 100 // usage.total if pct >= DISK_WARN_PCT: results.append(problem( "disk_space", "Disk space", "warning", f"Root filesystem is {pct}% full " f"({usage.used // 1_073_741_824}G of {usage.total // 1_073_741_824}G used).", "Free up disk space to avoid service disruption.")) else: results.append(ok("disk_space", "Disk space", f"{pct}% used")) except Exception: pass # --- Upstream DNS reachability --- servers = data.get("upstream_dns", {}).get("upstream_servers", []) unreachable = [] for srv in servers: try: with socket.create_connection((srv, 53), timeout=DNS_TIMEOUT_SECS): pass except OSError: unreachable.append(srv) if unreachable: results.append(problem( "upstream_dns", "Upstream DNS reachability", "warning", f"Upstream DNS server(s) unreachable on port 53: {', '.join(unreachable)}.", "Check WAN connectivity and upstream DNS server addresses in config.json.")) elif servers: results.append(ok("upstream_dns", "Upstream DNS reachability")) return results # =================================================================== # Log checks # =================================================================== def check_logs(data): results = [] # --- FreeRADIUS auth failures --- radius_log = Path("/var/log/freeradius/radius.log") if radius_log.exists(): try: now = datetime.now(timezone.utc).timestamp() cutoff = now - 3600 lines = radius_log.read_text(errors="replace").splitlines() # Parse lines with timestamps like "Thu May 21 11:53:47 2026 : Info: ..." recent = [] failure_re = re.compile(r"Shared secret is incorrect") ts_re = re.compile( r"(\w+ \w+ +\d+ \d+:\d+:\d+ \d+) : ") for line in lines[-2000:]: # scan last 2000 lines m = ts_re.match(line) if not m: continue try: ts = datetime.strptime(m.group(1), "%a %b %d %H:%M:%S %Y") ts = ts.replace(tzinfo=timezone.utc) if ts.timestamp() >= cutoff: recent.append(line) except ValueError: pass failures = [l for l in recent if failure_re.search(l)] if failures: # Extract distinct AP names from "(from client ...)" pattern ap_re = re.compile(r"\(from client ([^)]+)\)") aps = sorted({m.group(1) for l in failures for m in ap_re.finditer(l)}) ap_str = ", ".join(aps) if aps else f"{len(failures)} request(s)" results.append(problem( "freeradius_auth_failures", "FreeRADIUS auth failures", "error", f"FreeRADIUS is rejecting requests from {ap_str} with " f"'Shared secret is incorrect' ({len(failures)} failures in the last hour).", "Restore .radius-secret from backup and run `sudo python3 core.py --apply`, " "or update the shared secret in your AP controller to match .radius-secret.")) else: results.append(ok("freeradius_auth_failures", "FreeRADIUS auth failures")) # High rejection rate (>50% of recent activity is failures) if recent and len(failures) > len(recent) * 0.5 and not failures: results.append(problem( "freeradius_high_reject_rate", "FreeRADIUS rejection rate", "warning", f"Over half of recent FreeRADIUS activity ({len(failures)}/{len(recent)}) " f"are auth failures.", "Investigate FreeRADIUS config and shared secrets.")) elif recent: results.append(ok("freeradius_high_reject_rate", "FreeRADIUS rejection rate")) except OSError: pass # --- dnsmasq errors --- try: r = subprocess.run( ["journalctl", f"-u", f"dnsmasq-{shared.PRODUCT_NAME}-*", "--since", "-1h", "--priority=err", "--no-pager", "-q"], capture_output=True, text=True, timeout=5 ) err_lines = [l for l in r.stdout.splitlines() if l.strip()] if err_lines: results.append(problem( "dnsmasq_errors", "dnsmasq errors", "error", f"{len(err_lines)} dnsmasq error(s) in the last hour: " f"{err_lines[0][:120]}{'...' if len(err_lines) > 1 else ''}", "Check dnsmasq logs: `sudo journalctl -u 'dnsmasq-routlin-*' --since -1h`")) else: results.append(ok("dnsmasq_errors", "dnsmasq errors")) except Exception: pass return results # =================================================================== # Next blocklist update # =================================================================== def _next_blocklist_update(): """Return the next scheduled trigger time for the blocklist timer as a string, or None if unavailable.""" try: r = subprocess.run( ["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"], capture_output=True, text=True, timeout=5 ) for line in r.stdout.splitlines(): line = line.strip() if line.startswith("Trigger:"): trigger = line.split("Trigger:", 1)[1].strip() if trigger and trigger != "n/a": return trigger except Exception: pass return None # =================================================================== # Public API # =================================================================== def run_and_write(data): """Run all checks, write .health atomically, return (all_healthy, status_dict).""" status = { "checked_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), "services": check_services(data), "configurations": check_configurations(data), "logs": check_logs(data), "next_blocklist_update": _next_blocklist_update(), } tmp = HEALTH_FILE.with_suffix(".tmp") tmp.write_text(json.dumps(status, indent=2)) tmp.replace(HEALTH_FILE) healthy = all( item.get('status') != 'problem' for section in ('services', 'configurations', 'logs') for item in status.get(section, []) ) return healthy, status def print_table(status): """Print the service status table and any problems to stdout.""" col = shutil.get_terminal_size((80, 24)).columns services = status.get("services", []) print(f"\n {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}") print(f" {'-'*45} {'-'*18} {'-'*15}") for svc in services: active = svc.get("active", "unknown") enabled = svc.get("enabled", "unknown") a_ok = svc.get("active_ok", True) e_ok = svc.get("enabled_ok", True) a_sym = "+" if active == "active" else "x" e_sym = "+" if enabled == "enabled" else "x" a_status = "(OK) " if a_ok else "(BAD)" e_status = "(OK) " if e_ok else "(BAD)" print(f" {svc['name']:<45} " f"{a_sym} {active:<10} {a_status} " f"{e_sym} {enabled:<10} {e_status}") trigger = status.get("next_blocklist_update") if trigger: print(f"\n Next blocklist update: {trigger}") svc_problems = [] for svc in status.get("services", []): if svc.get("status") == "problem": name = svc["name"] utype = "timer" if name.endswith(".timer") else "service" if name.endswith(".service") else "unit" exp_parts, act_parts, fix_parts = [], [], [] if not svc.get("active_ok"): exp_parts.append(svc.get("expected_active", "active")) act_parts.append(svc.get("active", "unknown")) fix_parts.append("activate") if not svc.get("enabled_ok"): exp_parts.append(svc.get("expected_enabled", "enabled")) act_parts.append(svc.get("enabled", "unknown")) fix_parts.append("enable") detail = (f"The {utype} `{name}` is expected to be " f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}.") suggestion = f"Run `sudo python3 core.py --apply` to {' and '.join(reversed(fix_parts))} it." svc_problems.append({"severity": svc.get("severity", "error"), "detail": detail, "suggestion": suggestion}) problems = svc_problems + [ item for section in ("configurations", "logs") for item in status.get(section, []) if item.get("status") == "problem" ] if problems: print(f"\n Problems {'=' * (col - 12)}") for p in problems: sev = p.get("severity", "error") tag = f"[{sev}]" detail = p.get("detail", p.get("name", "")) print(f" {tag:<10} {detail}") tip = p.get("suggestion", "") if tip: print(f" {'':10} -> {tip}") print() # =================================================================== # Standalone entry point # =================================================================== if __name__ == "__main__": try: with open(CONFIG_FILE) as f: data = json.load(f) except Exception as ex: print(f"Error loading {CONFIG_FILE}: {ex}", file=sys.stderr) sys.exit(1) _, status = run_and_write(data) print_table(status)