linuxrouter/routlin/health.py
2026-06-09 11:00:37 -04:00

781 lines
33 KiB
Python

"""
health.py -- System health checks for Routlin.
Reads config.json, checks services, configuration files, and logs, then writes
.health JSON. Imported by core.py; also runnable standalone.
Public API:
run_and_write(data) -> (bool, dict) run all checks, write .health, return (all_healthy, status)
print_table(status: dict) render the CLI service table from status dict
"""
import hashlib
import ipaddress
import json
import re
import shutil
import socket
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
import mod_shared as shared
import mod_validation as validation
# ===================================================================
# Constants
# ===================================================================
HEALTH_FILE = shared.SCRIPT_DIR / ".health"
CONFIG_FILE = shared.SCRIPT_DIR / "config.json"
BLOCKLIST_DIR = shared.SCRIPT_DIR / "blocklists"
NETWORKD_DIR = Path("/etc/systemd/network")
WG_DIR = Path("/etc/wireguard")
RESOLV_CONF = Path("/etc/resolv.conf")
AVAHI_CONF_FILE = Path("/etc/avahi/avahi-daemon.conf")
CHRONY_CONF_FILE = Path("/etc/chrony/chrony.conf")
RADIUS_SECRET_FILE = shared.SCRIPT_DIR / ".radius-secret"
RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf")
RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users")
BLIST_TIMER_NAME = f"{shared.PRODUCT_NAME}-dns-blocklist-update"
DASHB_TIMER_NAME = f"{shared.PRODUCT_NAME}-dashboard-queue"
HEALTH_TIMER_NAME = f"{shared.PRODUCT_NAME}-health-check"
MAINT_TIMER_NAME = f"{shared.PRODUCT_NAME}-maintenance"
DASHB_QUEUE_FILE = shared.SCRIPT_DIR / ".dashboard-queue"
NAT_SERVICE_NAME = f"{shared.PRODUCT_NAME}-nat"
BLOCKLIST_STALE_SECS = 36 * 3600
DISK_WARN_PCT = 90
DHCP_WARN_PCT = 90
DNS_TIMEOUT_SECS = 2
# ===================================================================
# Small helpers
# ===================================================================
def radius_enabled(data):
return any(
r.get("radius_client") is True
for r in data.get("dhcp_reservations", [])
)
def avahi_enabled(data):
return any(
v.get("mdns_reflection") is True
for v in data.get("vlans", [])
if not validation.is_wg(v)
)
def _avahi_interfaces(data):
return [
validation.derive_interface(v, data)
for v in data.get("vlans", [])
if v.get("mdns_reflection") is True and not validation.is_wg(v)
]
def _vlan_hosts_file(vlan):
return shared.DNSMASQ_CONF_DIR / f"for-{vlan['name']}.hosts"
def _gateway_ips(data):
"""Return set of all gateway IPs across all VLANs."""
gws = set()
for vlan in data.get("vlans", []):
ip = shared.lowest_quartet_ip(vlan)
if ip:
gws.add(ip)
return gws
def iface_operstate(iface):
"""Read operstate from sysfs. Returns 'up', 'down', 'unknown', or None."""
try:
return Path(f"/sys/class/net/{iface}/operstate").read_text().strip()
except OSError:
return None
def _sysctl_query(unit):
"""Return (active, enabled) strings from systemctl."""
r_a = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
r_e = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
enabled = r_e.stdout.strip() or "not-found"
return r_a.stdout.strip(), enabled
# ===================================================================
# Result builders
# ===================================================================
def ok(id_, name, detail=""):
r = {"id": id_, "name": name, "status": "ok"}
if detail:
r["detail"] = detail
return r
def problem(id_, name, severity, detail, suggestion=""):
r = {"id": id_, "name": name, "status": "problem",
"severity": severity, "detail": detail}
if suggestion:
r["suggestion"] = suggestion
return r
# ===================================================================
# Services checks
# ===================================================================
def check_services(data):
results = []
vlans = data.get("vlans", [])
units = []
for vlan in vlans:
iface = validation.derive_interface(vlan, data)
name = shared.vlan_service_name(vlan, iface)
units.append({"id": name, "name": name,
"expected_active": "active", "expected_enabled": "enabled",
"severity": "error"})
units.append({"id": f"{BLIST_TIMER_NAME}.timer",
"name": f"{BLIST_TIMER_NAME}.timer",
"expected_active": "active", "expected_enabled": "enabled",
"severity": "warning"})
units.append({"id": NAT_SERVICE_NAME,
"name": NAT_SERVICE_NAME,
"expected_active": "inactive",
"expected_enabled": "enabled",
"severity": "error"})
units.append({"id": f"{HEALTH_TIMER_NAME}.timer",
"name": f"{HEALTH_TIMER_NAME}.timer",
"expected_active": "active", "expected_enabled": "enabled",
"severity": "warning"})
if DASHB_QUEUE_FILE.exists():
units.append({"id": f"{DASHB_TIMER_NAME}.timer",
"name": f"{DASHB_TIMER_NAME}.timer",
"expected_active": "active", "expected_enabled": "enabled",
"severity": "error"})
has_ddns = any(p.get("enabled") for p in data.get("ddns", {}).get("providers", []))
exp_ddns_active = "active" if has_ddns else "inactive"
exp_ddns_enabled = "enabled" if has_ddns else "not-found"
units.append({"id": f"{MAINT_TIMER_NAME}.timer",
"name": f"{MAINT_TIMER_NAME}.timer",
"expected_active": exp_ddns_active, "expected_enabled": exp_ddns_enabled,
"severity": "warning"})
exp_fr_active = "active" if radius_enabled(data) else "inactive"
exp_fr_enabled = "enabled" if radius_enabled(data) else "disabled"
units.append({"id": "freeradius", "name": "freeradius",
"expected_active": exp_fr_active,
"expected_enabled": exp_fr_enabled,
"severity": "error"})
exp_av_active = "active" if avahi_enabled(data) else "inactive"
exp_av_enabled = "enabled" if avahi_enabled(data) else "disabled"
units.append({"id": "avahi-daemon", "name": "avahi-daemon",
"expected_active": exp_av_active,
"expected_enabled": exp_av_enabled,
"severity": "warning"})
units.append({"id": "chrony", "name": "chrony",
"expected_active": "active", "expected_enabled": "enabled",
"severity": "warning"})
units.append({"id": "systemd-networkd", "name": "systemd-networkd",
"expected_active": "active", "expected_enabled": "enabled",
"severity": "error"})
for u in units:
active, enabled = _sysctl_query(u["id"])
exp_active = u["expected_active"]
exp_enabled = u["expected_enabled"]
active_ok = active == exp_active
enabled_ok = enabled == exp_enabled
svc_status = "ok" if (active_ok and enabled_ok) else "problem"
results.append({
"id": u["id"],
"name": u["name"],
"active": active,
"enabled": enabled,
"expected_active": exp_active,
"expected_enabled": exp_enabled,
"active_ok": active_ok,
"enabled_ok": enabled_ok,
"severity": u.get("severity", "error"),
"status": svc_status,
})
return results
# ===================================================================
# Configuration checks
# ===================================================================
def check_configurations(data):
results = []
vlans = data.get("vlans", [])
non_wg = [v for v in vlans if not validation.is_wg(v)]
wg_vlans = [v for v in vlans if validation.is_wg(v)]
def file_ok(id_, name, path, severity="error", suggestion=""):
try:
exists = path.exists()
except PermissionError:
return problem(id_, name, "warning",
f"{path}: permission denied - run with sudo for accurate status.")
if not exists:
return problem(id_, name, severity,
f"{path} does not exist.",
suggestion or f"Run `sudo python3 core.py --apply` to create it.")
return ok(id_, name)
# --- nftables tables ---
try:
tables_out = subprocess.run(
["nft", "list", "tables"], capture_output=True, text=True
).stdout
for tbl in ("ip routlin-nat", "ip routlin-filter"):
if tbl in tables_out:
results.append(ok(f"nft_{tbl.replace(' ', '_')}",
f"nftables table {tbl}"))
else:
results.append(problem(
f"nft_{tbl.replace(' ', '_')}",
f"nftables table {tbl}",
"error",
f"nftables table '{tbl}' is missing.",
"Run `sudo python3 core.py --apply` to rebuild firewall rules."))
except Exception:
results.append(problem("nft_tables", "nftables tables", "error",
"Could not query nftables (nft not available or failed)."))
# --- Docker bridge rules ---
try:
bridges = [
p.parent.name
for p in Path("/sys/class/net").glob("*/bridge")
if iface_operstate(p.parent.name) == "up"
]
if bridges:
fwd_out = subprocess.run(
["nft", "list", "chain", "ip", "routlin-filter", "forward"],
capture_output=True, text=True
).stdout
missing = [b for b in bridges if b not in fwd_out]
if missing:
results.append(problem(
"nft_docker_bridges", "nftables Docker bridge rules", "warning",
f"Container bridge(s) {', '.join(missing)} have no nftables forward rules.",
"Run `sudo python3 core.py --apply` to add the missing rules."))
else:
results.append(ok("nft_docker_bridges", "nftables Docker bridge rules"))
except Exception:
pass
# --- VLAN sub-interfaces ---
for vlan in non_wg:
iface = validation.derive_interface(vlan, data)
state = iface_operstate(iface)
id_ = f"iface_{vlan['name']}"
name = f"interface {iface}"
if state is None:
results.append(problem(id_, name, "error",
f"Interface {iface} does not exist in /sys/class/net/.",
"Run `sudo python3 core.py --apply` to configure network interfaces."))
elif state != "up":
results.append(problem(id_, name, "error",
f"Interface {iface} operstate is '{state}' (expected 'up').",
"Check systemd-networkd: `sudo systemctl status systemd-networkd`"))
else:
results.append(ok(id_, name))
# --- WireGuard interfaces ---
for vlan in wg_vlans:
iface = validation.derive_interface(vlan, data)
state = iface_operstate(iface)
id_ = f"iface_wg_{vlan['name']}"
name = f"WireGuard interface {iface}"
if state is None:
results.append(problem(id_, name, "error",
f"WireGuard interface {iface} does not exist.",
"Run `sudo python3 core.py --apply` to bring up WireGuard."))
elif state in ("up", "unknown"): # WireGuard interfaces normally report 'unknown'
results.append(ok(id_, name))
else:
results.append(problem(id_, name, "error",
f"WireGuard interface {iface} operstate is '{state}'.",
f"Try: sudo wg-quick up {iface}"))
# --- Stale WG interfaces when no WG VLANs configured ---
if not wg_vlans:
stale_wg = [
p.name for p in Path("/sys/class/net").iterdir()
if p.name.startswith("wg") and re.match(r"^wg\d+$", p.name)
]
if stale_wg:
results.append(problem(
"stale_wg_ifaces", "Stale WireGuard interfaces", "warning",
f"WireGuard interface(s) {', '.join(stale_wg)} exist but no VPN VLANs are configured.",
f"Bring them down manually: sudo wg-quick down {stale_wg[0]}"))
# --- dnsmasq config files ---
for vlan in vlans:
path = shared.DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
results.append(file_ok(f"dnsmasq_conf_{vlan['name']}",
f"dnsmasq config {path.name}", path))
# --- systemd-networkd files ---
for vlan in non_wg:
iface = validation.derive_interface(vlan, data)
vid = vlan.get("vlan_id")
net = NETWORKD_DIR / f"10-{shared.PRODUCT_NAME}-{vlan['name']}.network"
results.append(file_ok(f"networkd_net_{vlan['name']}",
f"networkd {net.name}", net))
if vid != 1: # non-physical VLANs have a .netdev too
netdev = NETWORKD_DIR / f"10-{shared.PRODUCT_NAME}-{vlan['name']}.netdev"
results.append(file_ok(f"networkd_netdev_{vlan['name']}",
f"networkd {netdev.name}", netdev))
# --- systemd unit files ---
for path in (shared.SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service",
shared.SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.timer",
shared.SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.service"):
results.append(file_ok(f"unit_{path.stem}", f"systemd unit {path.name}", path))
# --- WireGuard config and key files ---
for vlan in wg_vlans:
iface = validation.derive_interface(vlan, data)
conf = WG_DIR / f"{iface}.conf"
key = WG_DIR / f"{iface}.key"
pub = shared.SCRIPT_DIR / f".{iface}.pub"
results.append(file_ok(f"wg_conf_{iface}", f"WireGuard {conf.name}", conf))
results.append(file_ok(f"wg_key_{iface}", f"WireGuard {key.name}", key))
results.append(file_ok(f"wg_pubkey_{iface}", f"WireGuard {pub.name}", pub))
# --- Stale WG conf files when no WG VLANs ---
if not wg_vlans and WG_DIR.exists():
stale = [
p for p in WG_DIR.glob("wg*.conf")
if p.read_text().startswith("# Generated by")
]
if stale:
results.append(problem(
"stale_wg_conf", "Stale WireGuard config files", "warning",
f"{', '.join(p.name for p in stale)} exist but no VPN VLANs are configured.",
"Remove with: sudo rm " + " ".join(str(p) for p in stale)))
# --- RADIUS files and secret check ---
if radius_enabled(data):
results.append(file_ok("radius_secret_file", ".radius-secret file",
RADIUS_SECRET_FILE, "error"))
results.append(file_ok("radius_clients_conf", "FreeRADIUS clients.conf",
RADIUS_CLIENTS_CONF, "error"))
results.append(file_ok("radius_users_file", "FreeRADIUS users",
RADIUS_USERS_FILE, "error"))
# Secret content match
try:
secret = RADIUS_SECRET_FILE.read_text().strip()
conf_text = RADIUS_CLIENTS_CONF.read_text()
secret_ok = any(
line.strip().split("=", 1)[-1].strip() == secret
for line in conf_text.splitlines()
if "secret" in line and not line.strip().startswith("#")
)
if secret_ok:
results.append(ok("radius_secret_match", "FreeRADIUS shared secret"))
else:
results.append(problem(
"radius_secret_match", "FreeRADIUS shared secret", "error",
"clients.conf secret does not match .radius-secret. "
"Access points will reject all authentication requests.",
"Restore .radius-secret from backup, or run `sudo python3 core.py --apply` "
"then update the shared secret in your AP controller."))
except OSError:
pass # already caught above by file_ok
else:
# RADIUS not enabled - warn if generated config files still exist
if RADIUS_CLIENTS_CONF.exists():
try:
if "# Generated by" in RADIUS_CLIENTS_CONF.read_text():
results.append(problem(
"radius_conf_orphan", "FreeRADIUS config", "warning",
"FreeRADIUS clients.conf contains routlin-generated content "
"but RADIUS is not enabled.",
"This is harmless if freeradius is stopped. "
"Remove with: sudo rm " + str(RADIUS_CLIENTS_CONF)))
except OSError:
pass
# --- Avahi config ---
if avahi_enabled(data):
results.append(file_ok("avahi_conf", "avahi-daemon.conf",
AVAHI_CONF_FILE, "warning"))
if AVAHI_CONF_FILE.exists():
expected_ifaces = set(_avahi_interfaces(data))
try:
text = AVAHI_CONF_FILE.read_text()
m = re.search(r"allow-interfaces\s*=\s*(.+)", text)
if m:
actual_ifaces = {i.strip() for i in m.group(1).split(",")}
missing = expected_ifaces - actual_ifaces
extra = actual_ifaces - expected_ifaces
if missing or extra:
results.append(problem(
"avahi_ifaces", "avahi-daemon interface list", "warning",
f"avahi-daemon.conf interface list does not match config "
f"(missing: {missing or 'none'}, extra: {extra or 'none'}).",
"Run `sudo python3 core.py --apply` to update."))
else:
results.append(ok("avahi_ifaces",
"avahi-daemon interface list"))
except OSError:
pass
# --- resolv.conf ---
gateway_ips = _gateway_ips(data)
try:
resolv = RESOLV_CONF.read_text()
ns_ips = {
line.split()[1]
for line in resolv.splitlines()
if line.startswith("nameserver") and len(line.split()) >= 2
}
if ns_ips & gateway_ips:
results.append(ok("resolv_conf", "/etc/resolv.conf"))
else:
results.append(problem(
"resolv_conf", "/etc/resolv.conf", "warning",
f"/etc/resolv.conf nameserver(s) {ns_ips} do not include any VLAN gateway. "
f"Expected one of: {gateway_ips}.",
"Run `sudo python3 core.py --apply` to update /etc/resolv.conf."))
except OSError:
results.append(problem("resolv_conf", "/etc/resolv.conf", "warning",
"/etc/resolv.conf is not readable.",
"Run `sudo python3 core.py --apply`."))
# --- chrony.conf ---
if CHRONY_CONF_FILE.exists():
try:
content = CHRONY_CONF_FILE.read_text()
missing_subnets = []
for vlan in non_wg:
try:
network = ipaddress.IPv4Network(
f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
cidr = str(network)
if f"allow {cidr}" not in content and f"allow {vlan['subnet']}" not in content:
missing_subnets.append(cidr)
except Exception:
pass
if missing_subnets:
results.append(problem(
"chrony_conf", "/etc/chrony/chrony.conf", "warning",
f"chrony.conf is missing allow directives for: {', '.join(missing_subnets)}.",
"Run `sudo python3 core.py --apply` to update chrony.conf."))
else:
results.append(ok("chrony_conf", "/etc/chrony/chrony.conf"))
except OSError:
results.append(problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
"/etc/chrony/chrony.conf is not readable."))
else:
results.append(problem("chrony_conf", "/etc/chrony/chrony.conf", "warning",
"/etc/chrony/chrony.conf does not exist.",
"Install chrony: sudo apt-get install chrony"))
# --- DHCP pool utilization ---
for vlan in non_wg:
try:
dhcp = vlan.get("dhcp_information", {})
start = dhcp.get("pool_start", "")
end = dhcp.get("pool_end", "")
if not start or not end:
continue
pool_size = (int(ipaddress.IPv4Address(end))
- int(ipaddress.IPv4Address(start)) + 1)
if pool_size <= 0:
continue
lease_file = shared.LEASES_DIR / f"dnsmasq-{shared.PRODUCT_NAME}-{vlan['name']}.leases"
if not lease_file.exists():
continue
leases = [
l for l in lease_file.read_text().splitlines()
if l.strip() and not l.startswith("#")
]
pct = len(leases) * 100 // pool_size
if pct >= DHCP_WARN_PCT:
results.append(problem(
f"dhcp_pool_{vlan['name']}",
f"DHCP pool ({vlan['name']})", "warning",
f"DHCP pool for VLAN '{vlan['name']}' is {pct}% full "
f"({len(leases)}/{pool_size} leases).",
"Expand the pool range in config.json or clean up stale leases "
f"with: `sudo python3 core.py --reset-leases {vlan['name']}`"))
else:
results.append(ok(f"dhcp_pool_{vlan['name']}",
f"DHCP pool ({vlan['name']})",
f"{pct}% used ({len(leases)}/{pool_size})"))
except Exception:
pass
# --- Blocklist file freshness ---
now = datetime.now(timezone.utc).timestamp()
bl_library = {bl["name"]: bl for bl in data.get("dns_blocking", {}).get("blocklists", [])}
needed = set()
for vlan in vlans:
needed.update(vlan.get("use_blocklists", []))
for name in sorted(needed):
bl = bl_library.get(name)
if not bl or bl.get("bl_type") == "local":
continue
save_as = bl.get("save_as", "")
path = BLOCKLIST_DIR / save_as if save_as else None
if not path or not path.exists():
results.append(problem(
f"blocklist_{name}", f"blocklist ({name})", "warning",
f"Blocklist file for '{name}' has not been downloaded.",
"Run `sudo python3 dl_blocklists.py`."))
elif now - path.stat().st_mtime > BLOCKLIST_STALE_SECS:
age_h = int((now - path.stat().st_mtime) / 3600)
results.append(problem(
f"blocklist_{name}", f"blocklist ({name})", "warning",
f"Blocklist '{name}' is {age_h}h old (threshold 36h).",
"Run `sudo python3 dl_blocklists.py`."))
else:
results.append(ok(f"blocklist_{name}", f"blocklist ({name})"))
# --- Disk space ---
try:
usage = shutil.disk_usage("/")
pct = usage.used * 100 // usage.total
if pct >= DISK_WARN_PCT:
results.append(problem(
"disk_space", "Disk space", "warning",
f"Root filesystem is {pct}% full "
f"({usage.used // 1_073_741_824}G of {usage.total // 1_073_741_824}G used).",
"Free up disk space to avoid service disruption."))
else:
results.append(ok("disk_space", "Disk space",
f"{pct}% used"))
except Exception:
pass
# --- Upstream DNS reachability ---
servers = data.get("upstream_dns", {}).get("upstream_servers", [])
unreachable = []
for srv in servers:
try:
with socket.create_connection((srv, 53), timeout=DNS_TIMEOUT_SECS):
pass
except OSError:
unreachable.append(srv)
if unreachable:
results.append(problem(
"upstream_dns", "Upstream DNS reachability", "warning",
f"Upstream DNS server(s) unreachable on port 53: {', '.join(unreachable)}.",
"Check WAN connectivity and upstream DNS server addresses in config.json."))
elif servers:
results.append(ok("upstream_dns", "Upstream DNS reachability"))
return results
# ===================================================================
# Log checks
# ===================================================================
def check_logs(data):
results = []
# --- FreeRADIUS auth failures ---
radius_log = Path("/var/log/freeradius/radius.log")
if radius_log.exists():
try:
now = datetime.now(timezone.utc).timestamp()
cutoff = now - 3600
lines = radius_log.read_text(errors="replace").splitlines()
# Parse lines with timestamps like "Thu May 21 11:53:47 2026 : Info: ..."
recent = []
failure_re = re.compile(r"Shared secret is incorrect")
ts_re = re.compile(
r"(\w+ \w+ +\d+ \d+:\d+:\d+ \d+) : ")
for line in lines[-2000:]: # scan last 2000 lines
m = ts_re.match(line)
if not m:
continue
try:
ts = datetime.strptime(m.group(1), "%a %b %d %H:%M:%S %Y")
ts = ts.replace(tzinfo=timezone.utc)
if ts.timestamp() >= cutoff:
recent.append(line)
except ValueError:
pass
failures = [l for l in recent if failure_re.search(l)]
if failures:
# Extract distinct AP names from "(from client ...)" pattern
ap_re = re.compile(r"\(from client ([^)]+)\)")
aps = sorted({m.group(1) for l in failures
for m in ap_re.finditer(l)})
ap_str = ", ".join(aps) if aps else f"{len(failures)} request(s)"
results.append(problem(
"freeradius_auth_failures",
"FreeRADIUS auth failures", "error",
f"FreeRADIUS is rejecting requests from {ap_str} with "
f"'Shared secret is incorrect' ({len(failures)} failures in the last hour).",
"Restore .radius-secret from backup and run `sudo python3 core.py --apply`, "
"or update the shared secret in your AP controller to match .radius-secret."))
else:
results.append(ok("freeradius_auth_failures",
"FreeRADIUS auth failures"))
# High rejection rate (>50% of recent activity is failures)
if recent and len(failures) > len(recent) * 0.5 and not failures:
results.append(problem(
"freeradius_high_reject_rate",
"FreeRADIUS rejection rate", "warning",
f"Over half of recent FreeRADIUS activity ({len(failures)}/{len(recent)}) "
f"are auth failures.",
"Investigate FreeRADIUS config and shared secrets."))
elif recent:
results.append(ok("freeradius_high_reject_rate",
"FreeRADIUS rejection rate"))
except OSError:
pass
# --- dnsmasq errors ---
try:
r = subprocess.run(
["journalctl", f"-u", f"dnsmasq-{shared.PRODUCT_NAME}-*",
"--since", "-1h", "--priority=err", "--no-pager", "-q"],
capture_output=True, text=True, timeout=5
)
err_lines = [l for l in r.stdout.splitlines() if l.strip()]
if err_lines:
results.append(problem(
"dnsmasq_errors", "dnsmasq errors", "error",
f"{len(err_lines)} dnsmasq error(s) in the last hour: "
f"{err_lines[0][:120]}{'...' if len(err_lines) > 1 else ''}",
"Check dnsmasq logs: `sudo journalctl -u 'dnsmasq-routlin-*' --since -1h`"))
else:
results.append(ok("dnsmasq_errors", "dnsmasq errors"))
except Exception:
pass
return results
# ===================================================================
# Next blocklist update
# ===================================================================
def _next_blocklist_update():
"""Return the next scheduled trigger time for the blocklist timer as a string, or None if unavailable."""
try:
r = subprocess.run(
["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
capture_output=True, text=True, timeout=5
)
for line in r.stdout.splitlines():
line = line.strip()
if line.startswith("Trigger:"):
trigger = line.split("Trigger:", 1)[1].strip()
if trigger and trigger != "n/a":
return trigger
except Exception:
pass
return None
# ===================================================================
# Public API
# ===================================================================
def run_and_write(data):
"""Run all checks, write .health atomically, return (all_healthy, status_dict)."""
status = {
"checked_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"services": check_services(data),
"configurations": check_configurations(data),
"logs": check_logs(data),
"next_blocklist_update": _next_blocklist_update(),
}
tmp = HEALTH_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(status, indent=2))
tmp.replace(HEALTH_FILE)
healthy = all(
item.get('status') != 'problem'
for section in ('services', 'configurations', 'logs')
for item in status.get(section, [])
)
return healthy, status
def print_table(status):
"""Print the service status table and any problems to stdout."""
col = shutil.get_terminal_size((80, 24)).columns
services = status.get("services", [])
print(f"\n {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
print(f" {'-'*45} {'-'*18} {'-'*15}")
for svc in services:
active = svc.get("active", "unknown")
enabled = svc.get("enabled", "unknown")
a_ok = svc.get("active_ok", True)
e_ok = svc.get("enabled_ok", True)
a_sym = "+" if active == "active" else "x"
e_sym = "+" if enabled == "enabled" else "x"
a_status = "(OK) " if a_ok else "(BAD)"
e_status = "(OK) " if e_ok else "(BAD)"
print(f" {svc['name']:<45} "
f"{a_sym} {active:<10} {a_status} "
f"{e_sym} {enabled:<10} {e_status}")
trigger = status.get("next_blocklist_update")
if trigger:
print(f"\n Next blocklist update: {trigger}")
svc_problems = []
for svc in status.get("services", []):
if svc.get("status") == "problem":
name = svc["name"]
utype = "timer" if name.endswith(".timer") else "service" if name.endswith(".service") else "unit"
exp_parts, act_parts, fix_parts = [], [], []
if not svc.get("active_ok"):
exp_parts.append(svc.get("expected_active", "active"))
act_parts.append(svc.get("active", "unknown"))
fix_parts.append("activate")
if not svc.get("enabled_ok"):
exp_parts.append(svc.get("expected_enabled", "enabled"))
act_parts.append(svc.get("enabled", "unknown"))
fix_parts.append("enable")
detail = (f"The {utype} `{name}` is expected to be "
f"{' and '.join(exp_parts)} but is {' and '.join(act_parts)}.")
suggestion = f"Run `sudo python3 core.py --apply` to {' and '.join(reversed(fix_parts))} it."
svc_problems.append({"severity": svc.get("severity", "error"), "detail": detail, "suggestion": suggestion})
problems = svc_problems + [
item
for section in ("configurations", "logs")
for item in status.get(section, [])
if item.get("status") == "problem"
]
if problems:
print(f"\n Problems {'=' * (col - 12)}")
for p in problems:
sev = p.get("severity", "error")
tag = f"[{sev}]"
detail = p.get("detail", p.get("name", ""))
print(f" {tag:<10} {detail}")
tip = p.get("suggestion", "")
if tip:
print(f" {'':10} -> {tip}")
print()
# ===================================================================
# Standalone entry point
# ===================================================================
if __name__ == "__main__":
try:
with open(CONFIG_FILE) as f:
data = json.load(f)
except Exception as ex:
print(f"Error loading {CONFIG_FILE}: {ex}", file=sys.stderr)
sys.exit(1)
_, status = run_and_write(data)
print_table(status)