linuxrouter/routlin/core.py
2026-06-09 15:51:02 -04:00

946 lines
39 KiB
Python

#!/usr/bin/env python3
"""
core.py -- Apply config.json to systemd-networkd, per-VLAN dnsmasq instances, and nftables.
Each VLAN defined in config.json gets its own dnsmasq instance that handles
both DHCP and DNS for that VLAN. WireGuard VLANs get a DNS-only instance
(no DHCP, since peers have statically assigned IPs).
Each instance binds exclusively to its VLAN gateway IP on port 53, so
instances do not conflict with each other or with the system dnsmasq.service,
which is stopped and disabled on --apply.
Blocklists are downloaded, parsed into unique domain sets, and merged per
unique blocklist combination (identified by a stable SHA256 hash). Each
VLAN's dnsmasq instance loads the merged file for its specific combination,
giving true per-VLAN DNS filtering. Blocked domains and all their subdomains
return NXDOMAIN via dnsmasq's local=/ syntax.
nftables rules are applied atomically into dedicated tables (routlin-nat,
routlin-filter) that do not touch Docker-managed tables. A systemd boot
service (core-nat.service) re-applies the rules on every boot.
File layout:
blocklists/
<save_as> -- raw downloaded blocklist files
merged-<hash>.conf -- merged file per unique blocklist combo
/etc/dnsmasq-routlin/
<name>.conf -- per-VLAN dnsmasq config
/etc/systemd/system/
dnsmasq-routlin-<name>.service -- per-VLAN dnsmasq service unit
routlin-dns-blocklist-update.timer -- daily blocklist refresh timer
routlin-dns-blocklist-update.service -- timer service unit
routlin-nat.service -- boot service to re-apply nftables rules
/var/lib/misc/
dnsmasq-routlin-<name>.leases -- per-VLAN DHCP lease files
.dns-metrics -- cumulative lifetime DNS metrics
Validation:
gateway -- Must exactly match one of the server_identities IPs.
dns_server -- Must be a valid IPv4 within the VLAN subnet.
ntp_server -- Must be a valid IPv4 within the VLAN subnet if specified.
pool range -- dynamic_pool_start must be <= dynamic_pool_end. Both must
fall within the VLAN subnet.
identities -- All server_identity IPs must fall within the VLAN subnet
and must not fall inside the dynamic pool range.
reservations -- All reservation IPs must fall within the VLAN subnet, must
not fall inside the dynamic pool range, must not duplicate
another reservation IP or MAC within the same VLAN, and
must not conflict with any server_identity IP.
vlan_id -- Must be unique across all VLAN blocks.
name -- Must be unique across all VLAN blocks.
interface -- Must be unique across all VLAN blocks.
blocklists -- Each entry must have: name, description, save_as, url,
format. Names must be unique. Format must be 'dnsmasq' or
'hosts'.
use_blocklists -- Each name must exist in the blocklists library. An empty
list is allowed (VLAN receives unfiltered DNS).
wan_interface -- Must exist on the system.
port_forwarding -- top-level array. nat_ip must be a valid IPv4. dest_port and
nat_port must be valid (1-65535). Protocol must be
tcp, udp, or both.
port_wrangling -- redirect_to must be within the VLAN subnet. dest_port
must be valid. Protocol must be tcp, udp, or both.
Generates DNAT rules only; no forward chain rules needed
since redirect_to is always a local IP (INPUT handles it).
inter_vlan_exceptions -- src_ip_or_subnet and dst_ip_or_subnet may be a single IPv4 address
or a CIDR network. dest_port_start/dest_port_end are optional (1-65535).
Protocol must be tcp, udp, or both.
Usage:
sudo python3 core.py --apply Apply config fast: restart running services only
sudo python3 core.py --status Show service and timer status
sudo python3 core.py --view-configs Show active per-VLAN dnsmasq config files
sudo python3 core.py --view-leases Show active DHCP leases
sudo python3 core.py --view-rules Show active nftables ruleset
sudo python3 core.py --disable Run the interactive disable wizard to stop instances, remove nftables, remove all generated config files
sudo python3 core.py --apply [--dry-run] Preview --apply without making changes
sudo python3 core.py --disable [--dry-run] Preview --disable without making changes
python3 core.py --view-metrics Show lifetime DNS metrics across all instances
"""
import hashlib
import ipaddress
import json
import logging
import os
import re
import subprocess
import sys
import time
import argparse
from datetime import datetime
from pathlib import Path
import health as health
import mod_avahi as avahi
import mod_captive as captive
import mod_dnsmasq as dnsmasq
import mod_metrics as metrics
import mod_networkd as networkd
import mod_nftables as nftables
import mod_radius as radius
import mod_shared as shared
import mod_timers as timers
import mod_wireguard as wireguard
import mod_validation as validation
PRODUCT_NAME = shared.PRODUCT_NAME
SCRIPT_DIR = shared.SCRIPT_DIR
DNSMASQ_CONF_DIR = shared.DNSMASQ_CONF_DIR
LEASES_DIR = shared.LEASES_DIR
SYSTEMD_DIR = shared.SYSTEMD_DIR
def die(msg):
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
CONFIG_FILE = SCRIPT_DIR / "config.json"
DASHBOARD_PENDING = SCRIPT_DIR / ".dashboard-pending"
HEALTH_FILE = SCRIPT_DIR / ".health"
RESOLV_CONF = Path("/etc/resolv.conf")
# ===================================================================
# Load
# ===================================================================
def load_config():
if not CONFIG_FILE.exists():
die(f"Config file not found: {CONFIG_FILE}")
with open(CONFIG_FILE) as f:
data = json.load(f)
if not data.get("vlans"):
die("No vlans defined in config.json.")
return data
def _remove_pending_cmd(cmd):
try:
if not DASHBOARD_PENDING.exists():
return
lines = DASHBOARD_PENDING.read_text().splitlines()
kept = [l for l in lines if f'[{cmd}]' not in l]
DASHBOARD_PENDING.write_text('\n'.join(kept) + ('\n' if kept else ''))
except Exception:
pass
def show_status(data):
_, status = health.run_and_write(data)
health.print_table(status)
def show_configs(data):
for vlan in data["vlans"]:
cf = shared.vlan_conf_file(vlan)
if cf.exists():
print(f"# -- {cf} --")
print(cf.read_text())
else:
print(f"No config found at {cf} (not yet applied).")
# ===================================================================
# Stop / disable
# ===================================================================
def stop_instances(data):
"""Remove timers and stop all per-VLAN instances (config files preserved)."""
timers.remove_timers(
names=[timers.BLIST_TIMER_NAME, timers.HEALTH_TIMER_NAME, timers.MAINT_TIMER_NAME],
timer_files=[timers.BLIST_TIMER_FILE, timers.HEALTH_TIMER_FILE, timers.MAINT_TIMER_FILE],
svc_files=[timers.BLIST_TIMER_SVC_FILE, timers.HEALTH_TIMER_SVC_FILE, timers.MAINT_TIMER_SVC_FILE],
daemon_reload=True,
)
print()
for vlan in data["vlans"]:
svc = shared.vlan_service_name(vlan, validation.derive_interface(vlan, data))
subprocess.run(["systemctl", "disable", "--now", svc],
capture_output=True, text=True)
print(f"Stopped and disabled: {svc}")
def disable_all(data):
"""Full teardown: stop dnsmasq instances, remove nftables, remove all generated config files."""
stop_instances(data)
print()
for vlan in data["vlans"]:
for f in (shared.vlan_conf_file(vlan), shared.vlan_service_file(vlan, validation.derive_interface(vlan, data))):
if f.exists():
f.unlink()
print(f"Removed: {f}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
print("systemd daemon reloaded.")
print()
print("Removing nftables rules =============================================")
err = nftables.delete_our_tables()
if err:
die(err)
nftables.remove_nat_service()
if radius.radius_enabled(data):
print()
print("Stopping RADIUS =====================================================")
subprocess.run(["systemctl", "disable", "--now", "freeradius"],
capture_output=True, text=True)
print("freeradius stopped and disabled.")
if avahi.avahi_enabled(data):
print()
print("Stopping mDNS Reflector =============================================")
avahi.disable_avahi()
def _write_client_network(iface, dhcp, static_cidr=None):
"""Remove all router networkd files and write a plain client .network file."""
for pattern in (f"10-{PRODUCT_NAME}-*.network", f"10-{PRODUCT_NAME}-*.netdev"):
for f in networkd.NETWORKD_DIR.glob(pattern):
f.unlink()
print(f"Removed: {f}")
lines = [
"# Generated by core.py --disable -- do not edit manually.",
"",
"[Match]",
f"Name={iface}",
"",
"[Network]",
]
if dhcp:
lines.append("DHCP=yes")
else:
lines.append("DHCP=no")
lines.append(f"Address={static_cidr}")
lines.append("")
path = networkd.NETWORKD_DIR / f"10-client-{iface}.network"
path.write_text("\n".join(lines))
print(f"Written: {path}")
result = subprocess.run(["networkctl", "reload"], capture_output=True, text=True)
if result.returncode != 0:
print(f"WARNING: networkctl reload returned non-zero:\n{result.stderr.strip()}")
else:
print("systemd-networkd reloaded.")
def _configure_dns_resolved():
"""Re-enable systemd-resolved and restore the resolv.conf symlink."""
result = subprocess.run(
["systemctl", "enable", "--now", "systemd-resolved"],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"ERROR: Failed to enable systemd-resolved:\n{result.stderr.strip()}")
return False
RESOLV_CONF.unlink(missing_ok=True)
RESOLV_CONF.symlink_to("/run/systemd/resolve/stub-resolv.conf")
print("systemd-resolved enabled. /etc/resolv.conf restored as symlink to stub resolver.")
return True
def _configure_dns_static(nameserver):
"""Write a plain /etc/resolv.conf with a single user-specified nameserver."""
RESOLV_CONF.unlink(missing_ok=True)
RESOLV_CONF.write_text(f"nameserver {nameserver}\n")
print(f"Updated /etc/resolv.conf: nameserver {nameserver}")
def _suggest_static_ip(physical_vlan):
"""
Suggest a client static IP from the physical VLAN's subnet.
Prefers server_identity IPs whose last octet is not 1 (highest wins).
Falls back to a random unused IP in the subnet if all are .1.
"""
import random
network = shared.network_for(physical_vlan)
prefix = network.prefixlen
identities = physical_vlan.get("server_identities", [])
known_ips = {ipaddress.IPv4Address(i["ip"]) for i in identities}
non_gateway = [ip for ip in known_ips if ip.packed[-1] != 1]
if non_gateway:
chosen = max(non_gateway, key=lambda ip: ip.packed[-1])
return f"{chosen}/{prefix}"
# All identities end in .1 - pick a random unused host in the subnet
hosts = list(network.hosts())
candidates = [h for h in hosts if h not in known_ips and h.packed[-1] != 1]
if candidates:
chosen = random.choice(candidates)
return f"{chosen}/{prefix}"
# Degenerate fallback - extremely small subnet
return f"{list(network.hosts())[0]}/{prefix}"
# ===================================================================
# Dry-run helpers
# ===================================================================
def _svc_state(unit):
"""Return 'active', 'inactive', or 'unknown' for a systemd unit."""
r = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
return r.stdout.strip() or "unknown"
def _svc_enabled(unit):
"""Return True if the systemd unit is enabled."""
r = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
return r.stdout.strip() in ("enabled", "enabled-runtime")
def _dry_run_conflicting_services(data):
print("Conflicting services (dry-run) ======================================")
for unit, label in [("systemd-resolved", "systemd-resolved"),
("systemd-timesyncd", "systemd-timesyncd")]:
state = _svc_state(unit)
if state == "active":
print(f" Would stop and disable: {label} (currently: active)")
else:
print(f" {label}: not active - no action needed")
chrony_ok = subprocess.run(["systemctl", "cat", "chrony"],
capture_output=True, text=True).returncode == 0
if not chrony_ok:
print(" chrony: not installed - dependency check would have prompted to install it")
else:
chrony_conf = Path("/etc/chrony/chrony.conf")
if chrony_conf.exists():
content = chrony_conf.read_text()
subnets = [str(shared.network_for(v)) for v in data["vlans"]]
missing = [s for s in subnets if f"allow {s}" not in content]
if missing:
print(f" Would add chrony allow directives for: {', '.join(missing)}")
else:
print(" chrony.conf already has required allow directives - no change needed")
print(f" Would enable and restart: chrony")
if subprocess.run(["which", "ufw"], capture_output=True, text=True).returncode == 0:
status = subprocess.run(["ufw", "status"], capture_output=True, text=True)
if "Status: active" in status.stdout:
print(" Would disable: ufw (currently: active)")
else:
print(" ufw: not active - no rule action needed")
if _svc_enabled("ufw"):
print(" Would disable: ufw.service (currently: enabled at boot)")
else:
print(" ufw.service: not enabled at boot - no action needed")
else:
print(" ufw: not installed - no action needed")
r = subprocess.run(["systemctl", "is-enabled", "dnsmasq"],
capture_output=True, text=True)
if r.stdout.strip() in ("enabled", "enabled-runtime"):
print(f" Would stop and disable: system dnsmasq.service (currently: enabled)")
else:
print(" system dnsmasq.service: not enabled - no action needed")
physical = next((v for v in data["vlans"] if shared.is_physical(v)), None)
if physical:
gw = shared.resolve_vlan_options(physical)["gateway"]
if RESOLV_CONF.is_symlink():
print(f" Would replace /etc/resolv.conf symlink with plain file: nameserver {gw}")
else:
wanted = f"nameserver {gw}\n"
current = RESOLV_CONF.read_text() if RESOLV_CONF.exists() else ""
if wanted not in current:
print(f" Would update /etc/resolv.conf: nameserver {gw}")
else:
print(f" /etc/resolv.conf already points to {gw} - no change needed")
def _dry_run_blocklists(data):
print("Blocklists (dry-run) ================================================")
for vlan in data.get("vlans", []):
names = vlan.get("use_blocklists", [])
if names:
f = dnsmasq.vlan_hosts_file(vlan)
action = "update" if f.exists() else "create"
print(f" Would {action}: {f}")
print(f" Sources: {', '.join(sorted(names))}")
def _dry_run_timer(data):
print("Timer (dry-run) =====================================================")
general = data.get("dns_blocking", {}).get("general", {})
execute_time = general.get("daily_execute_time_24hr_local", "02:30")
for path, label in [(timers.BLIST_TIMER_FILE, "timer unit"), (timers.BLIST_TIMER_SVC_FILE, "service unit")]:
action = "update" if path.exists() else "create and enable"
print(f" Would {action}: {path}")
print(f" Schedule: daily at {execute_time} local time (Persistent=true - catches up if missed)")
def _dry_run_boot_service():
print("Boot service (dry-run) ==============================================")
script_path = Path(__file__).resolve()
action = "update" if nftables.NAT_SERVICE_FILE.exists() else "create and enable"
print(f" Would {action}: {nftables.NAT_SERVICE_FILE}")
print(f" ExecStart: /usr/bin/python3 {script_path} --apply")
print(f" After: network-online.target docker.service")
print(f" WantedBy: multi-user.target (runs on every boot)")
def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, static_nameserver):
print()
print("[DRY RUN] Based on your selections, --disable would perform the following:")
print()
print(f"-- Stopping {PRODUCT_NAME} services (dry-run) --------------------------------")
print(f" Would disable and stop: {timers.BLIST_TIMER_NAME}.timer")
for vlan in data["vlans"]:
iface = validation.derive_interface(vlan, data)
svc = shared.vlan_service_name(vlan, iface)
conf = shared.vlan_conf_file(vlan)
svc_f = shared.vlan_service_file(vlan, iface)
print(f" Would stop and disable: {svc}")
if conf.exists():
print(f" Would remove: {conf}")
if svc_f.exists():
print(f" Would remove: {svc_f}")
print(f" Would reload: systemd daemon")
for table in (f"{PRODUCT_NAME}-nat", f"{PRODUCT_NAME}-filter"):
r = subprocess.run(["nft", "list", "table", "ip", table],
capture_output=True, text=True)
if r.returncode == 0:
print(f" Would flush nftables table: {table}")
else:
print(f" nftables table {table}: not present - no action needed")
if nftables.NAT_SERVICE_FILE.exists():
print(f" Would stop, disable, and remove: {nftables.NAT_SERVICE_NAME}.service")
else:
print(f" {nftables.NAT_SERVICE_NAME}.service: not installed - no action needed")
print()
print("Restoring NTP client (dry-run) ======================================")
state = _svc_state("chrony")
if state == "active":
print(f" Would stop and disable: chrony (currently: active)")
else:
print(f" chrony: not active - no action needed")
r = subprocess.run(["systemctl", "cat", "systemd-timesyncd"],
capture_output=True, text=True)
if r.returncode == 0:
print(f" Would enable and start: systemd-timesyncd")
else:
print(f" systemd-timesyncd: not available on this system")
print()
print("Network interface (dry-run) =========================================")
router_net = list(networkd.NETWORKD_DIR.glob(f"10-{PRODUCT_NAME}-*.network"))
router_dev = list(networkd.NETWORKD_DIR.glob(f"10-{PRODUCT_NAME}-*.netdev"))
client_file = networkd.NETWORKD_DIR / f"10-client-{iface}.network"
for f in router_net + router_dev:
print(f" Would remove: {f}")
print(f" Would write: {client_file}")
if use_dhcp:
print(f" [Match] Name={iface}")
print(f" [Network] DHCP=yes")
else:
print(f" [Match] Name={iface}")
print(f" [Network] DHCP=no Address={static_cidr}")
print(f" Would reload: systemd-networkd")
print()
if not resolv_ok:
print("DNS (dry-run) =======================================================")
if dns_choice == "resolved":
print(" Would enable: systemd-resolved")
print(" Would restore: /etc/resolv.conf -> /run/systemd/resolve/stub-resolv.conf")
else:
print(f" Would write: /etc/resolv.conf")
print(f" nameserver {static_nameserver}")
print()
# ===================================================================
# Disable wizard
# ===================================================================
def cmd_disable(data, dry_run=False):
"""Interactive wizard to revert the machine from router to plain network client."""
import readline
print()
print("=" * 70)
print(" REVERT TO NETWORK CLIENT" + (" [DRY RUN]" if dry_run else ""))
print("=" * 70)
print()
print(" You are reverting this machine from a gateway/router back to being")
print(" a plain network client. All router services, firewall rules, and")
print(" VLAN configuration will be removed.")
if dry_run:
print()
print(" DRY RUN: No changes will be made. This shows what would happen.")
print()
# ------------------------------------------------------------------
# Step 1 - Confirmation
# ------------------------------------------------------------------
while True:
print(" [1] Proceed with reversion")
print(" [2] Cancel")
choice = input(" Choice [1/2]: ").strip()
if choice == "2":
print("Cancelled.")
return
if choice == "1":
break
print(" Invalid choice. Enter 1 or 2.")
print()
# ------------------------------------------------------------------
# Step 2 - IP configuration
# ------------------------------------------------------------------
physical = next((v for v in data["vlans"] if shared.is_physical(v)), None)
if physical is None:
die("No physical VLAN (vlan_id=1) found in config. Cannot determine interface.")
iface = validation.derive_interface(physical, data)
print(" How should this machine obtain its IP address after reversion?")
print()
print(" [1] Obtain IP via DHCP (recommended - let the new router assign one)")
print(" [2] Use a static IP")
print()
use_dhcp = None
static_cidr = None
while True:
choice = input(" Choice [1/2]: ").strip()
if choice == "1":
use_dhcp = True
break
if choice == "2":
use_dhcp = False
break
print(" Invalid choice. Enter 1 or 2.")
if not use_dhcp:
print()
print(" WARNING: Do not assign an IP that will conflict with another")
print(" LAN device, especially the new gateway/router.")
print()
suggested = _suggest_static_ip(physical)
print(f" Suggested IP (edit as needed): {suggested}")
print()
while True:
try:
readline.set_startup_hook(lambda: readline.insert_text(suggested))
entry = input(" Static IP/prefix: ").strip()
finally:
readline.set_startup_hook(None)
if not entry:
print(" Cannot be empty.")
continue
try:
ipaddress.IPv4Interface(entry)
static_cidr = entry
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address/prefix (e.g. 192.168.1.50/24).")
print()
# ------------------------------------------------------------------
# Step 3 - DNS resolver
# ------------------------------------------------------------------
# If resolv.conf is already a plain file with no router gateway IPs, leave it alone.
gateway_ips = {shared.resolve_vlan_options(v)["gateway"] for v in data["vlans"] if not validation.is_wg(v)}
resolv_ok = False
if not RESOLV_CONF.is_symlink() and RESOLV_CONF.exists():
current_servers = {
parts[1] for line in RESOLV_CONF.read_text().splitlines()
if (parts := line.strip().split()) and parts[0] == "nameserver"
}
if current_servers and not current_servers.intersection(gateway_ips):
resolv_ok = True
static_nameserver = None # set if user chooses manual entry
dns_choice = None # "resolved" or "static"
if resolv_ok:
print(" /etc/resolv.conf already contains client-appropriate DNS settings.")
print(" Leaving it as-is.")
print()
else:
resolved_available = subprocess.run(
["systemctl", "cat", "systemd-resolved"],
capture_output=True, text=True
).returncode == 0
print(" How should DNS resolution be handled after reversion?")
print()
if resolved_available:
print(" [1] Re-enable systemd-resolved (recommended - adapts to any network)")
print(" [2] Enter a static nameserver IP")
while True:
choice = input(" Choice [1/2]: ").strip()
if choice == "1":
dns_choice = "resolved"
break
if choice == "2":
dns_choice = "static"
break
print(" Invalid choice. Enter 1 or 2.")
else:
print(" systemd-resolved is not installed on this system.")
print(" A static nameserver IP will be used.")
dns_choice = "static"
if dns_choice == "static":
print()
while True:
entry = input(" Nameserver IP: ").strip()
if not entry:
print(" Cannot be empty.")
continue
try:
ipaddress.IPv4Address(entry)
static_nameserver = entry
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address.")
print()
# ------------------------------------------------------------------
# Step 4 - Execute (or dry-run summary)
# ------------------------------------------------------------------
if dry_run:
_dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, static_nameserver)
return
print(f"-- Stopping {PRODUCT_NAME} services ------------------------------------------")
disable_all(data)
print()
print("Restoring NTP client ================================================")
dnsmasq.restore_ntp()
print()
print("Configuring network interface =======================================")
_write_client_network(iface, dhcp=use_dhcp, static_cidr=static_cidr)
print()
if not resolv_ok:
print("Configuring DNS =====================================================")
if dns_choice == "static":
_configure_dns_static(static_nameserver)
else:
if not _configure_dns_resolved():
print("Failed to re-enable systemd-resolved. Please enter a nameserver IP.")
while True:
entry = input(" Nameserver IP: ").strip()
try:
ipaddress.IPv4Address(entry)
_configure_dns_static(entry)
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address.")
print()
print("Done. This machine is now configured as a network client.")
if use_dhcp:
print(f" Interface {iface} will obtain its IP via DHCP.")
else:
print(f" Interface {iface} will use static IP: {static_cidr}")
# ===================================================================
# Main
# ===================================================================
def cmd_apply(data, dry_run=False):
"""--apply: full apply. Handles conflicting services, networkd (if changed),
dnsmasq confs, start/restart all services whose interface is up, nftables,
timer, and boot service. Safe to run repeatedly.
"""
if dry_run:
print("[DRY RUN] --apply would perform the following actions:")
print()
_dry_run_conflicting_services(data)
print()
print("systemd-networkd (dry-run) ==========================================")
networkd.apply_networkd(data, dry_run=True)
print()
print("dnsmasq instances (dry-run) =========================================")
dnsmasq.apply_dnsmasq_instances(data, dry_run=True, start_if_needed=True)
print()
print("nftables (dry-run) ==================================================")
nftables.apply_nftables(data, dry_run=True)
print()
_dry_run_timer(data)
print()
_dry_run_boot_service()
if radius.radius_enabled(data):
print()
print("RADIUS (dry-run) ====================================================")
num_clients = len(radius.radius_clients(data))
default_vlan = next((v for v in data["vlans"] if v.get("radius_default") is True), None)
total_macs = len([r for r in data.get("dhcp_reservations", []) if r.get("enabled") is True])
print(f" Would write: {radius.RADIUS_CLIENTS_CONF}")
print(f" {num_clients} RADIUS client(s)")
print(f" Would write: {radius.RADIUS_USERS_FILE}")
print(f" {total_macs} MAC reservation(s)")
if default_vlan:
print(f" DEFAULT -> VLAN {default_vlan.get('vlan_id')} ({default_vlan['name']})")
print(f" Would ensure freeradius is running")
if avahi.avahi_enabled(data):
print()
print("mDNS Reflection (dry-run) ===========================================")
ifaces = avahi.avahi_interfaces(data)
print(f" Would write: {avahi.AVAHI_CONF_FILE}")
print(f" Reflecting across: {', '.join(ifaces)}")
print(f" Would ensure avahi-daemon is running")
return
if not shared.is_root():
die("This script must be run as root (sudo).")
wg_names = {v["name"] for v in data["vlans"] if validation.is_wg(v)}
non_wg_res = [r for r in data.get("dhcp_reservations", []) if r.get("vlan") not in wg_names]
total_enabled = len([r for r in non_wg_res if r.get("enabled") is True])
total_disabled = len([r for r in non_wg_res if r.get("enabled") is not True])
total_wg_peers = sum(len(v.get("peers", [])) for v in data["vlans"] if validation.is_wg(v))
wg_part = f", {total_wg_peers} WG peer(s)" if total_wg_peers else ""
print(f"Applying config: {len(data['vlans'])} VLAN(s), "
f"{total_enabled} reservation(s), {total_disabled} skipped{wg_part}.")
print()
print("Conflicting services ================================================")
dnsmasq.disable_systemd_timesyncd()
dnsmasq.ensure_chrony(data)
dnsmasq.disable_ufw()
print()
print("systemd-networkd ====================================================")
networkd.apply_networkd(data, only_if_changed=True)
print()
if any(validation.is_wg(v) for v in data["vlans"]):
print("WireGuard interfaces ================================================")
wireguard.ensure_wg_interfaces(data)
print()
print("dnsmasq instances ===================================================")
if not dnsmasq.blocklists_available(data):
print(" NOTE: No blocklist hosts files found -- blocklist rules will be absent.")
print(" Run: sudo python3 dl_blocklists.py && sudo python3 core.py --merge-blocklists")
dnsmasq.apply_dnsmasq_instances(data, start_if_needed=True)
print()
print("nftables ============================================================")
nftables.apply_nftables(data)
print()
print("Timer ===============================================================")
err = timers.install_timer(data)
if err:
die(err)
print()
print("Interval timers =====================================================")
timers.install_interval_timers(
names=[timers.HEALTH_TIMER_NAME],
timer_files=[timers.HEALTH_TIMER_FILE],
svc_files=[timers.HEALTH_TIMER_SVC_FILE],
descriptions=["Router status health check"],
exec_starts=[f"/usr/bin/python3 {SCRIPT_DIR / 'health.py'}"],
interval_secs=[timers.HEALTH_TIMER_INTERVAL_SEC],
)
print()
print("DDNS timer ==========================================================")
enabled_ddns = [p for p in data.get("ddns", {}).get("providers", []) if p.get("enabled")]
if enabled_ddns:
timers.install_maint_timer(data)
else:
timers.remove_timers([timers.MAINT_TIMER_NAME], [timers.MAINT_TIMER_FILE], [timers.MAINT_TIMER_SVC_FILE])
print("No enabled DDNS providers - timer not installed.")
print()
print("Boot service ========================================================")
nftables.install_nat_service()
print()
if radius.radius_enabled(data):
print("RADIUS ==============================================================")
err = radius.apply_radius(data)
if err:
die(err)
print()
else:
svc = "freeradius"
if subprocess.run(["systemctl", "is-active", svc],
capture_output=True, text=True).stdout.strip() == "active":
print("RADIUS ==============================================================")
subprocess.run(["systemctl", "disable", "--now", svc],
capture_output=True, text=True)
print("freeradius stopped and disabled (no radius_client reservations).")
print()
if avahi.avahi_enabled(data):
print("mDNS Reflection =====================================================")
avahi.apply_avahi(data)
print()
else:
svc = "avahi-daemon"
if subprocess.run(["systemctl", "is-active", svc],
capture_output=True, text=True).stdout.strip() == "active":
print("mDNS Reflection =====================================================")
avahi.disable_avahi()
print()
print("Captive portal ==============================================")
if captive.captive_portal_enabled(data):
timers.install_captive_timers()
print("Captive portal enabled - timers installed.")
else:
timers.remove_captive_timers()
print("No captive portal VLANs - timers removed.")
print()
print("Done.")
healthy, status = health.run_and_write(data)
health.print_table(status)
_remove_pending_cmd('core apply')
if healthy:
_remove_pending_cmd('fix problems')
def main():
parser = argparse.ArgumentParser(
description="Apply config.json to systemd-networkd, per-VLAN dnsmasq instances, and nftables",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" sudo python3 core.py --apply Apply full config (idempotent, safe to re-run)\n"
" sudo python3 core.py --status Show service and timer status\n"
" sudo python3 core.py --view-configs Show active per-VLAN dnsmasq config files\n"
" sudo python3 core.py --view-leases Show active DHCP leases\n"
" sudo python3 core.py --view-rules Show active nftables ruleset\n"
" sudo python3 core.py --disable Stop instances, remove nftables, remove all config files\n"
" python3 core.py --view-metrics Show lifetime DNS metrics\n"
"\n"
" [--dry-run] may be combined with --apply or --disable\n"
" to preview all actions verbosely without making any changes:\n"
" sudo python3 core.py --apply --dry-run\n"
" sudo python3 core.py --disable --dry-run\n"
)
)
parser.add_argument("--apply", action="store_true", help="Apply full config: services, networkd, dnsmasq, nftables, timer, boot service")
parser.add_argument("--merge-blocklists", action="store_true", help="Merge downloaded blocklists and reload dnsmasq via SIGHUP (no restart)")
parser.add_argument("--dry-run", action="store_true", help="Preview all actions without making changes (combine with --apply or --disable)")
parser.add_argument("--status", action="store_true", help="Show service and timer status")
parser.add_argument("--view-configs", action="store_true", help="Show active per-VLAN dnsmasq config files")
parser.add_argument("--view-leases", action="store_true", help="Show active DHCP leases")
parser.add_argument("--reset-leases", nargs="?", const="__all__", metavar="VLAN",
help="Reset DHCP leases (stop dnsmasq, delete lease files, restart). "
"Optionally specify a VLAN name to reset only that VLAN.")
parser.add_argument("--view-rules", action="store_true", help="Show active nftables ruleset")
parser.add_argument("--disable", action="store_true", help="Stop instances, remove nftables, remove all config files")
parser.add_argument("--view-metrics", action="store_true", help="Show lifetime DNS metrics across all instances")
args = parser.parse_args()
if not any([args.apply, args.merge_blocklists,
args.dry_run, args.status, args.view_configs, args.view_leases,
args.view_rules, args.disable, args.view_metrics,
args.reset_leases]):
parser.print_help()
sys.exit(0)
if args.dry_run and not any([args.apply, args.disable]):
print("ERROR: --dry-run must be combined with --apply or --disable.", file=sys.stderr)
sys.exit(1)
data = load_config()
errors = validation.validate_config(data)
if errors:
print("Validation failed:", file=sys.stderr)
for e in errors:
print(f" - {e}", file=sys.stderr)
sys.exit(1)
if args.status:
show_status(data)
return
if args.view_configs:
show_configs(data)
return
if args.view_leases:
dnsmasq.show_leases(data)
return
if args.reset_leases:
vlan_name = None if args.reset_leases == "__all__" else args.reset_leases
err = dnsmasq.reset_leases(data, vlan_name)
if err:
die(err)
return
if args.view_rules:
nftables.show_rules()
return
if args.view_metrics:
if not shared.is_root():
die("This script must be run as root (sudo).")
metrics.show_metrics(data)
return
if args.disable:
if not args.dry_run:
if not shared.is_root():
die("This script must be run as root (sudo).")
cmd_disable(data, dry_run=args.dry_run)
return
if args.merge_blocklists:
if not shared.is_root():
die("This script must be run as root (sudo).")
general = data.get("dns_blocking", {}).get("general", {})
dnsmasq.setup_blocklist_logging(general)
logging.info("Merging blocklists ==================================================")
success = dnsmasq.update_blocklist_hosts(data)
print()
if success:
print("Reloading dnsmasq instances =========================================")
dnsmasq.sighup_all_instances()
else:
print("WARNING: Some blocklists failed -- reloading anyway with available data.")
dnsmasq.sighup_all_instances()
return
if args.apply:
cmd_apply(data, dry_run=args.dry_run)
return
if __name__ == "__main__":
main()