linuxrouter/routlin/core.py
2026-05-21 03:23:31 -04:00

3288 lines
126 KiB
Python

#!/usr/bin/env python3
"""
core.py -- Apply core.json to systemd-networkd, per-VLAN dnsmasq instances, and nftables.
Each VLAN defined in core.json gets its own dnsmasq instance that handles
both DHCP and DNS for that VLAN. WireGuard VLANs get a DNS-only instance
(no DHCP, since peers have statically assigned IPs).
Each instance binds exclusively to its VLAN gateway IP on port 53, so
instances do not conflict with each other or with the system dnsmasq.service,
which is stopped and disabled on --apply.
Blocklists are downloaded, parsed into unique domain sets, and merged per
unique blocklist combination (identified by a stable SHA256 hash). Each
VLAN's dnsmasq instance loads the merged file for its specific combination,
giving true per-VLAN DNS filtering. Blocked domains and all their subdomains
return NXDOMAIN via dnsmasq's local=/ syntax.
nftables rules are applied atomically into dedicated tables (routlin-nat,
routlin-filter) that do not touch Docker-managed tables. A systemd boot
service (core-nat.service) re-applies the rules on every boot.
File layout:
blocklists/
<save_as> -- raw downloaded blocklist files
merged-<hash>.conf -- merged file per unique blocklist combo
/etc/dnsmasq-routlin/
<name>.conf -- per-VLAN dnsmasq config
/etc/systemd/system/
dnsmasq-routlin-<name>.service -- per-VLAN dnsmasq service unit
routlin-dns-blocklist-update.timer -- daily blocklist refresh timer
routlin-dns-blocklist-update.service -- timer service unit
routlin-nat.service -- boot service to re-apply nftables rules
/var/lib/misc/
dnsmasq-routlin-<name>.leases -- per-VLAN DHCP lease files
.dns-metrics -- cumulative lifetime DNS metrics
Validation:
gateway -- Must exactly match one of the server_identities IPs.
dns_server -- Must be a valid IPv4 within the VLAN subnet.
ntp_server -- Must be a valid IPv4 within the VLAN subnet if specified.
pool range -- dynamic_pool_start must be <= dynamic_pool_end. Both must
fall within the VLAN subnet.
identities -- All server_identity IPs must fall within the VLAN subnet
and must not fall inside the dynamic pool range.
reservations -- All reservation IPs must fall within the VLAN subnet, must
not fall inside the dynamic pool range, must not duplicate
another reservation IP or MAC within the same VLAN, and
must not conflict with any server_identity IP.
vlan_id -- Must be unique across all VLAN blocks.
name -- Must be unique across all VLAN blocks.
interface -- Must be unique across all VLAN blocks.
blocklists -- Each entry must have: name, description, save_as, url,
format. Names must be unique. Format must be 'dnsmasq' or
'hosts'.
use_blocklists -- Each name must exist in the blocklists library. An empty
list is allowed (VLAN receives unfiltered DNS).
wan_interface -- Must exist on the system.
port_forwarding -- top-level array. nat_ip must be a valid IPv4. dest_port and
nat_port must be valid (1-65535). Protocol must be
tcp, udp, or both.
port_wrangling -- redirect_to must be within the VLAN subnet. dest_port
must be valid. Protocol must be tcp, udp, or both.
Generates DNAT rules only; no forward chain rules needed
since redirect_to is always a local IP (INPUT handles it).
inter_vlan_exceptions -- src_ip_or_subnet and dst_ip_or_subnet must be valid IPv4 addresses declared in the vlans array.
inter_vlan_exceptions -- src_ip_or_subnet and dst_ip_or_subnet must be valid IPv4 addresses
or networks. dst_port must be valid (1-65535). Protocol
must be tcp, udp, or both.
Usage:
sudo python3 core.py --apply Apply config fast: restart running services only
sudo python3 core.py --update-blocklists Refresh blocklists and apply (used by timer)
sudo python3 core.py --status Show service and timer status
sudo python3 core.py --view-configs Show active per-VLAN dnsmasq config files
sudo python3 core.py --view-leases Show active DHCP leases
sudo python3 core.py --view-rules Show active nftables ruleset
sudo python3 core.py --disable Run the interactive disable wizard to stop instances, remove nftables, remove all generated config files
sudo python3 core.py --apply [--dry-run] Preview --apply without making changes
sudo python3 core.py --disable [--dry-run] Preview --disable without making changes
python3 core.py --view-metrics Show lifetime DNS metrics across all instances
"""
import hashlib
import ipaddress
import json
import logging
import os
import re
import subprocess
import sys
import time
import urllib.request
import urllib.error
import argparse
from datetime import datetime
from pathlib import Path
from validation import (
VALID_PROTOCOLS, VALID_BLOCKLIST_FORMATS,
int_range, domainname,
is_wg, is_dynamic_ip,
resolve_vlan_derived_fields, validate_config,
)
PRODUCT_NAME = "routlin"
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR / "core.json"
BLOCKLIST_DIR = SCRIPT_DIR / "blocklists"
LOG_FILE = SCRIPT_DIR / "core.log"
METRICS_FILE = SCRIPT_DIR / ".dns-metrics"
DNSMASQ_CONF_DIR = Path(f"/etc/dnsmasq-{PRODUCT_NAME}")
LEASES_DIR = Path("/var/lib/misc")
NETWORKD_DIR = Path("/etc/systemd/network")
SYSTEMD_DIR = Path("/etc/systemd/system")
BLIST_TIMER_NAME = f"{PRODUCT_NAME}-dns-blocklist-update"
BLIST_TIMER_FILE = SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.timer"
BLIST_TIMER_SVC_FILE = SYSTEMD_DIR / f"{BLIST_TIMER_NAME}.service"
DASHB_TIMER_NAME = f"{PRODUCT_NAME}-dashboard-queue"
DASHB_TIMER_FILE = SYSTEMD_DIR / f"{DASHB_TIMER_NAME}.timer"
DASHB_TIMER_SVC_FILE = SYSTEMD_DIR / f"{DASHB_TIMER_NAME}.service"
DASHB_TIMER_INTERVAL_SEC = 60
DASHB_QUEUE_FILE = SCRIPT_DIR / ".dashboard-queue"
DASHB_DONE_FILE = SCRIPT_DIR / ".dashboard-done"
DASHB_LAST_RUN_FILE = SCRIPT_DIR / ".dashboard-last-run"
DASHB_LOCK_FILE = SCRIPT_DIR / ".dashboard-lock"
DASHB_SCRIPT_FILE = SCRIPT_DIR / "do_dashboard_queue.sh"
RESOLV_CONF = Path("/etc/resolv.conf")
NAT_SERVICE_NAME = f"{PRODUCT_NAME}-nat"
NAT_SERVICE_FILE = SYSTEMD_DIR / f"{NAT_SERVICE_NAME}.service"
WG_DIR = Path("/etc/wireguard")
WG_KEEPALIVE = 25
log = None
# ===================================================================
# Logging
# ===================================================================
def chown_to_script_dir_owner(path):
"""Chown a file to the owner of the script directory.
This works correctly whether invoked via sudo, directly as root (e.g. systemd timer),
or as a normal user - the script directory owner is always the right target.
"""
try:
stat = SCRIPT_DIR.stat()
os.chown(path, stat.st_uid, stat.st_gid)
except OSError:
pass # non-fatal
def setup_logging(max_kb, errors_only):
global log
try:
if LOG_FILE.exists() and LOG_FILE.stat().st_size > max_kb * 1024:
LOG_FILE.write_text("")
if not LOG_FILE.exists():
LOG_FILE.touch()
chown_to_script_dir_owner(LOG_FILE)
file_handler = logging.FileHandler(LOG_FILE)
except PermissionError:
print(f"WARNING: Cannot write to {LOG_FILE} (permission denied). "
f"Run with sudo or fix ownership: sudo chown $USER {LOG_FILE}")
file_handler = None
level = logging.ERROR if errors_only else logging.INFO
handlers = [logging.StreamHandler(sys.stdout)]
if file_handler:
handlers.insert(0, file_handler)
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=handlers,
)
log = logging.getLogger("dns-dhcp")
# ===================================================================
# Helpers
# ===================================================================
def service_warning(action, svc, stderr):
"""Print a service start/restart warning, adding install hint if unit not found."""
msg = stderr.strip()
print(f"WARNING: Failed to {action} {svc}: {msg}")
if "not found" in msg.lower() or "not-found" in msg.lower():
print(f" -> Package may not be installed. Run: sudo python3 install.py")
def die(msg):
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(1)
def check_root():
if os.geteuid() != 0:
die("This script must be run as root (sudo).")
def prefix_to_dotted(n):
mask = (0xFFFFFFFF << (32 - int(n))) & 0xFFFFFFFF
return '.'.join(str((mask >> (8 * i)) & 0xFF) for i in (3, 2, 1, 0))
def network_for(vlan):
return ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
def lowest_quartet_ip(vlan):
"""Return the server_identity IP with the lowest value in the last octet."""
identities = vlan.get("server_identities", [])
ips = []
for s in identities:
try:
ips.append(ipaddress.IPv4Address(s["ip"]))
except (KeyError, ValueError):
pass
if not ips:
return None
return str(min(ips, key=lambda ip: ip.packed[-1]))
def resolve_vlan_options(vlan):
"""
Resolve gateway, dns_server, and ntp_server for a VLAN.
For both WG and non-WG VLANs: gateway defaults to the lowest-last-octet
server_identity IP unless overridden in explicit_overrides. The gateway
override must be one of the server_identity IPs.
WG VLANs: ntp_server is None (WireGuard has no DHCP so NTP cannot be
advertised to peers). Overrides live in vpn_information.explicit_overrides.
Non-WG VLANs: overrides live in dhcp_information.explicit_overrides.
Returns a dict with keys: gateway, dns_server, ntp_server.
"""
if is_wg(vlan):
vpi = vlan["vpn_information"]
overrides = vpi.get("explicit_overrides", {})
default = lowest_quartet_ip(vlan) or str(next(network_for(vlan).hosts()))
gateway = overrides.get("gateway", "") or default
dns = overrides.get("dns_server", "") or gateway
return {
"gateway": gateway,
"dns_server": dns,
"ntp_server": None,
}
overrides = vlan.get("dhcp_information", {}).get("explicit_overrides", {})
default = lowest_quartet_ip(vlan)
return {
"gateway": overrides.get("gateway", "") or default,
"dns_server": overrides.get("dns_server", "") or default,
"ntp_server": overrides.get("ntp_server", "") or default,
}
def is_physical(vlan):
return vlan["vlan_id"] == 1
def networkd_stem(vlan):
return f"10-{PRODUCT_NAME}-{vlan['name']}"
def vlan_service_name(vlan):
if is_wg(vlan):
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{vlan['interface']}"
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}"
def vlan_service_file(vlan):
return SYSTEMD_DIR / f"{vlan_service_name(vlan)}.service"
def vlan_conf_file(vlan):
return DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
def vlan_leases_file(vlan):
return LEASES_DIR / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.leases"
def vlan_pid_file(vlan):
return Path("/run") / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.pid"
# nftables rule list helpers
def rule_enabled(rules):
return [r for r in rules if r.get("enabled") is True]
def rule_disabled(rules):
return [r for r in rules if r.get("enabled") is not True]
def expand_protocols(rule):
"""Return list of (protocol, rule, comment_suffix) tuples.
When protocol is 'both', expands into tcp and udp with suffixes
' (tcp)' and ' (udp)' so generated comments are unambiguous.
When protocol is a single value, suffix is empty string.
"""
proto = rule["protocol"]
if proto == "both":
return [("tcp", rule, " (tcp)"), ("udp", rule, " (udp)")]
return [(proto, rule, "")]
# ===================================================================
# Load
# ===================================================================
def load_config():
if not CONFIG_FILE.exists():
die(f"Config file not found: {CONFIG_FILE}")
with open(CONFIG_FILE) as f:
data = json.load(f)
if not data.get("vlans"):
die("No vlans defined in core.json.")
return data
# ===================================================================
# Build systemd-networkd files
# ===================================================================
def build_netdev(vlan):
return "\n".join([
"# Generated by core.py -- do not edit manually.",
"# Edit core.json and re-run: sudo python3 core.py --apply",
"",
"[NetDev]",
f"Name={vlan['interface']}",
"Kind=vlan",
"",
"[VLAN]",
f"Id={vlan['vlan_id']}",
"",
])
def build_network(vlan, all_vlan_ids):
network = network_for(vlan)
prefix = network.prefixlen
lines = [
"# Generated by core.py -- do not edit manually.",
"# Edit core.json and re-run: sudo python3 core.py --apply",
"",
"[Match]",
f"Name={vlan['interface']}",
"",
"[Network]",
"DHCP=no",
"LinkLocalAddressing=no",
]
for ident in vlan["server_identities"]:
lines.append(f"# {ident['description']}")
lines.append(f"Address={ident['ip']}/{prefix}")
if is_physical(vlan):
lines.append("")
for vid in all_vlan_ids:
if vid != 1:
lines.append(f"VLAN={vlan['interface']}.{vid}")
lines.append("")
return "\n".join(lines)
def find_legacy_files(managed_interfaces):
to_remove = []
for pattern in ("*.network", "*.netdev"):
for f in NETWORKD_DIR.glob(pattern):
if f.name.startswith(f"10-{PRODUCT_NAME}-"):
continue
try:
content = f.read_text()
except OSError:
continue
for iface in managed_interfaces:
if f"Name={iface}" in content:
to_remove.append(f)
break
return to_remove
def apply_networkd(data, dry_run=False, only_if_changed=False):
"""Write systemd-networkd files and reload.
If only_if_changed=True, write files only when content differs from disk
and skip the networkd reload if nothing changed. Used by --apply mode.
"""
all_vlan_ids = [v["vlan_id"] for v in data["vlans"] if not is_wg(v)]
managed_ifaces = [v["interface"] for v in data["vlans"]]
changed = False
legacy = find_legacy_files(managed_ifaces)
if legacy:
print("Removing legacy networkd files:")
for f in legacy:
if not dry_run:
f.unlink()
changed = True
print(f" {'[dry-run] would remove' if dry_run else 'Removed'}: {f}")
print()
for vlan in data["vlans"]:
if is_wg(vlan):
continue
stem = networkd_stem(vlan)
if not is_physical(vlan):
netdev_path = NETWORKD_DIR / f"{stem}.netdev"
netdev_content = build_netdev(vlan)
if dry_run:
print(f"# -- {netdev_path} (dry-run) --")
print(netdev_content)
else:
existing = netdev_path.read_text() if netdev_path.exists() else None
if existing != netdev_content:
netdev_path.write_text(netdev_content)
print(f"Written: {netdev_path}")
changed = True
elif not only_if_changed:
print(f"Unchanged: {netdev_path}")
network_path = NETWORKD_DIR / f"{stem}.network"
network_content = build_network(vlan, all_vlan_ids)
if dry_run:
print(f"# -- {network_path} (dry-run) --")
print(network_content)
else:
existing = network_path.read_text() if network_path.exists() else None
if existing != network_content:
network_path.write_text(network_content)
print(f"Written: {network_path}")
changed = True
elif not only_if_changed:
print(f"Unchanged: {network_path}")
if not dry_run:
if changed:
print("Reloading systemd-networkd...")
result = subprocess.run(
["networkctl", "reload"], capture_output=True, text=True
)
if result.returncode != 0:
print(f"WARNING: networkctl reload returned non-zero:\n{result.stderr.strip()}")
else:
print("systemd-networkd reloaded.")
elif only_if_changed:
print("systemd-networkd: no changes. Good.")
# ===================================================================
# Blocklist management
# ===================================================================
def combo_hash(names):
"""Return a stable 8-char hex hash for a list/set of blocklist names."""
key = ",".join(sorted(names))
return hashlib.sha256(key.encode()).hexdigest()[:8]
def merged_path(h):
return BLOCKLIST_DIR / f"merged-{h}.conf"
def blocklists_available(data):
"""Return True if at least one merged blocklist file exists on disk."""
combos = set()
for vlan in data.get("vlans", []):
names = vlan.get("use_blocklists", [])
if names:
combos.add(combo_hash(names))
return any(merged_path(h).exists() for h in combos)
def parse_dnsmasq_format(content):
domains = set()
for ln in content.splitlines():
ln = ln.strip()
if not ln or ln.startswith("#"):
continue
if ln.startswith("local=/"):
domain = ln.removeprefix("local=/").rstrip("/")
if domain:
domains.add(domain)
elif ln.startswith("address=/"):
parts = ln.removeprefix("address=/").split("/")
if parts:
domains.add(parts[0])
return domains
def parse_hosts_format(content):
domains = set()
for ln in content.splitlines():
ln = ln.strip()
if not ln or ln.startswith("#"):
continue
parts = ln.split()
if len(parts) >= 2:
domains.add(parts[1])
return domains
def parse_blocklist(content, fmt):
if fmt == "dnsmasq":
return parse_dnsmasq_format(content)
return parse_hosts_format(content)
def build_merged_conf(domains, bl_names):
"""Build a merged dnsmasq conf blocking all domains and their subdomains."""
lines = [
"# Generated by core.py -- do not edit manually.",
f"# Blocklist combination: {', '.join(sorted(bl_names))}",
f"# Merged: {len(domains):,} unique domains.",
"#",
"# Blocks domain and all subdomains via local=/domain/ syntax.",
"",
]
for domain in sorted(domains):
lines.append(f"local=/{domain}/")
return "\n".join(lines)
def download_all_blocklists(data):
"""
Download every blocklist referenced by at least one VLAN.
Returns dict: name -> (content_str, entry) or (None, entry) on failure.
"""
bl_library = {bl["name"]: bl for bl in data.get("blocklists", [])}
needed = set()
for vlan in data["vlans"]:
needed.update(vlan.get("use_blocklists", []))
results = {}
for name in needed:
entry = bl_library[name]
url = entry["url"]
try:
req = urllib.request.Request(url, headers={"User-Agent": "dns-dhcp.py/1.0"})
with urllib.request.urlopen(req, timeout=30) as r:
content = r.read().decode("utf-8", errors="ignore")
log.info(f"Downloaded: {entry['description']} ({len(content):,} bytes)")
results[name] = (content, entry)
except Exception as e:
log.error(f"Failed to download '{entry['description']}' from {url}: {e}")
results[name] = (None, entry)
return results
def update_blocklists(data):
"""
Download all referenced blocklists, build per-combo merged files,
and clean up stale merged files. Returns active hashes set.
"""
BLOCKLIST_DIR.mkdir(exist_ok=True)
log.info("Downloading blocklists...")
downloaded = download_all_blocklists(data)
# Parse domains per blocklist name; save raw files
domains_by_name = {}
for name, (content, entry) in downloaded.items():
if content is None:
log.error(f"Blocklist '{name}' failed to download -- it will be skipped.")
domains_by_name[name] = set()
else:
(BLOCKLIST_DIR / entry["save_as"]).write_text(content)
domains = parse_blocklist(content, entry.get("format", "dnsmasq"))
log.info(f"Parsed {len(domains):,} domains from '{name}'")
domains_by_name[name] = domains
# Build one merged file per unique combo
active_hashes = set()
combos = {}
for vlan in data["vlans"]:
names = frozenset(vlan.get("use_blocklists", []))
if names:
h = combo_hash(names)
combos[h] = names
for h, names in combos.items():
combo_domains = set()
for name in names:
combo_domains.update(domains_by_name.get(name, set()))
merged = build_merged_conf(combo_domains, names)
merged_path(h).write_text(merged)
active_hashes.add(h)
log.info(
f"Merged [{h}] ({', '.join(sorted(names))}): "
f"{len(combo_domains):,} unique domains."
)
# Remove stale merged files (hashes no longer in active combos)
for f in BLOCKLIST_DIR.glob("merged-*.conf"):
h = f.stem.removeprefix("merged-")
if h not in active_hashes:
f.unlink()
log.info(f"Removed stale merged file: {f.name}")
# Return True if all blocklists downloaded successfully
any_failed = any(content is None for content, _ in downloaded.values())
return not any_failed
# ===================================================================
# Build per-VLAN dnsmasq config
# ===================================================================
def _wan_has_ipv6(iface):
"""Return True if the WAN interface has a non-link-local IPv6 address."""
try:
result = subprocess.run(
["ip", "-6", "addr", "show", iface, "scope", "global"],
capture_output=True, text=True
)
return bool(result.stdout.strip())
except Exception:
return False
def build_vlan_dnsmasq_conf(vlan, data):
"""Generate the complete dnsmasq config for one VLAN instance."""
dns_cfg = data.get("upstream_dns", {})
general = data.get("general", {})
overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True]
name = vlan["name"]
iface = vlan["interface"]
d = vlan.get("dhcp_information", {})
opts = resolve_vlan_options(vlan)
gateway = opts["gateway"]
bl_names = vlan.get("use_blocklists", [])
bl_file = None
if bl_names:
p = merged_path(combo_hash(bl_names))
if p.exists():
bl_file = p
L = []
def line(s=""):
L.append(s)
line("# Generated by core.py -- do not edit manually.")
line("# Edit core.json and re-run: sudo python3 core.py --apply")
line(f"# VLAN: {name} (vlan_id={vlan['vlan_id']})")
line()
line(f"pid-file={vlan_pid_file(vlan)}")
if not is_wg(vlan):
line(f"dhcp-leasefile={vlan_leases_file(vlan)}")
line("except-interface=lo")
line("bind-interfaces")
line(f"listen-address={gateway}")
line(f"interface={iface}")
if is_physical(vlan):
bridge_ips = get_container_bridge_ips()
for bridge, ip in bridge_ips.items():
line(f"interface={bridge}")
line(f"listen-address={ip}")
line()
if not is_wg(vlan):
line("# -- DHCP -----------------------------------------------------------")
dotted_mask = prefix_to_dotted(vlan['subnet_mask'])
line(f"dhcp-range=set:{name},{d['dynamic_pool_start']},{d['dynamic_pool_end']},{dotted_mask},{d['lease_time']}")
line(f"domain={d.get('domain', 'local')}")
line()
line(f"dhcp-option=tag:{name},option:router,{gateway}")
line(f"dhcp-option=tag:{name},option:dns-server,{opts['dns_server']}")
line(f"dhcp-option=tag:{name},option:ntp-server,{opts['ntp_server']}")
line()
identity_hosts = [s for s in vlan.get("server_identities", []) if s.get("hostname")]
if identity_hosts:
line("# -- Server identity hostnames ----------------------------------")
for s in identity_hosts:
line(f"# {s['description']}")
line(f"dhcp-host={s['ip']},{s['hostname']}")
line()
active_res = [r for r in vlan.get("reservations", []) if r.get("enabled") is True]
inactive_res = [r for r in vlan.get("reservations", []) if r.get("enabled") is not True]
if active_res:
line("# -- Reservations -----------------------------------------------")
# Group reservations sharing a static IP into single dhcp-host lines
# (multiple MACs for the same device e.g. wired + WiFi interfaces)
# Dynamic reservations are always emitted individually.
seen_ips = {} # ip -> list of reservations
ordered = [] # preserves insertion order for output
for r in active_res:
if is_dynamic_ip(r):
ordered.append([r]) # always individual
else:
ip = r.get("ip", "")
if ip in seen_ips:
seen_ips[ip].append(r)
else:
seen_ips[ip] = [r]
ordered.append(seen_ips[ip])
for group in ordered:
if len(group) == 1:
r = group[0]
line(f"# {r['description']}")
if is_dynamic_ip(r):
line(f"dhcp-host=set:{name},{r['mac']},{r['hostname']},{d['lease_time']}")
else:
line(f"dhcp-host=set:{name},{r['mac']},{r['ip']},{r['hostname']},{d['lease_time']}")
else:
# Multiple MACs share the same IP -- combine into one dhcp-host line
descs = ", ".join(r['description'] for r in group)
macs = ",".join(r['mac'] for r in group)
ip = group[0]['ip']
# Use first entry's hostname; all share the same IP anyway
hostname = group[0]['hostname']
line(f"# {descs}")
line(f"dhcp-host=set:{name},{macs},{ip},{hostname},{d['lease_time']}")
line()
if inactive_res:
line("# -- Skipped reservations (enabled: false) ----------------------")
for r in inactive_res:
line(f"# SKIPPED: {r['description']} ({r.get('mac', '?')} -> {r.get('ip', '?')})")
line()
line("# -- DNS ------------------------------------------------------------")
line("no-resolv")
if dns_cfg.get("strict_order"):
line("strict-order")
wan = data["general"]["wan_interface"]
wan_has_ipv6 = _wan_has_ipv6(wan)
for srv in dns_cfg.get("upstream_servers", []):
if ":" in srv and not wan_has_ipv6:
continue # skip IPv6 upstream -- WAN has no IPv6 address
line(f"server={srv}")
line(f"cache-size={dns_cfg.get('cache_size', 1000)}")
if general.get("dnsmasq_log_queries", False):
line("log-queries")
line()
if overrides:
line("# -- Host overrides -------------------------------------------------")
for o in overrides:
line(f"# {o['description']}")
line(f"address=/{o['host']}/{o['ip']}")
line()
if bl_file:
line("# -- Blocklist ------------------------------------------------------")
line(f"conf-file={bl_file}")
line()
elif bl_names:
line("# Blocklist not yet downloaded -- run --update-blocklists to fetch")
line()
return "\n".join(L)
# ===================================================================
# Build per-VLAN systemd service unit
# ===================================================================
def build_vlan_service(vlan):
name = vlan["name"]
iface = vlan["interface"]
conf = vlan_conf_file(vlan)
if is_wg(vlan):
after = f"network-online.target wg-quick@{iface}.service"
wants = "network-online.target"
bindsto = f"wg-quick@{iface}.service"
else:
after = "network-online.target"
wants = "network-online.target"
bindsto = None
lines = [
"# Generated by core.py -- do not edit manually.",
"",
"[Unit]",
f"Description=dnsmasq for VLAN {name}",
f"After={after}",
f"Wants={wants}",
]
if bindsto:
lines.append(f"BindsTo={bindsto}")
lines += [
"",
"[Service]",
"Type=forking",
f"PIDFile={vlan_pid_file(vlan)}",
f"ExecStart=/usr/sbin/dnsmasq --conf-file={conf}",
"ExecReload=/bin/kill -HUP $MAINPID",
"Restart=on-failure",
"RestartSec=5s",
"",
"[Install]",
"WantedBy=multi-user.target",
"",
]
return "\n".join(lines)
# ===================================================================
# System dnsmasq / resolv.conf
# ===================================================================
def ensure_resolv_conf(data):
"""Ensure /etc/resolv.conf points to the physical VLAN gateway (vlan_id=1)."""
physical = next((v for v in data["vlans"] if is_physical(v)), None)
if physical is None:
return
nameserver = resolve_vlan_options(physical)["gateway"]
wanted = f"nameserver {nameserver}\n"
# A symlink (e.g. to systemd-resolved stub) must be replaced with a plain file.
if RESOLV_CONF.is_symlink():
RESOLV_CONF.unlink()
print("Removed /etc/resolv.conf symlink (was pointing to systemd-resolved stub).")
current = RESOLV_CONF.read_text() if RESOLV_CONF.exists() else ""
if wanted in current:
print(f"/etc/resolv.conf already points to {nameserver}. Good.")
return
RESOLV_CONF.write_text(wanted)
print(f"Updated /etc/resolv.conf: nameserver {nameserver}")
def disable_systemd_resolved():
"""Stop and disable systemd-resolved if it is active."""
result = subprocess.run(
["systemctl", "is-active", "systemd-resolved"],
capture_output=True, text=True
)
if result.stdout.strip() == "active":
subprocess.run(["systemctl", "disable", "--now", "systemd-resolved"],
capture_output=True, text=True)
print("Disabled systemd-resolved.")
else:
print("systemd-resolved is not active. Good.")
def disable_systemd_timesyncd():
"""Stop and disable systemd-timesyncd if it is active."""
result = subprocess.run(
["systemctl", "is-active", "systemd-timesyncd"],
capture_output=True, text=True
)
if result.stdout.strip() == "active":
subprocess.run(["systemctl", "disable", "--now", "systemd-timesyncd"],
capture_output=True, text=True)
print("Disabled systemd-timesyncd.")
else:
print("systemd-timesyncd is not active. Good.")
def ensure_chrony(data):
"""Add VLAN allow directives to chrony.conf and start the service."""
chrony_conf = Path("/etc/chrony/chrony.conf")
if chrony_conf.exists():
content = chrony_conf.read_text()
subnets = []
for v in data["vlans"]:
subnets.append(str(network_for(v)))
added = []
for subnet in subnets:
line = f"allow {subnet}"
if line not in content:
content += f"\n{line}"
added.append(subnet)
if added:
chrony_conf.write_text(content)
print(f"Updated /etc/chrony/chrony.conf: added allow for {', '.join(added)}")
else:
print("chrony.conf already has required allow directives. Good.")
subprocess.run(["systemctl", "enable", "--now", "chrony"],
capture_output=True, text=True)
subprocess.run(["systemctl", "restart", "chrony"],
capture_output=True, text=True)
print("chrony enabled and running. Good.")
def disable_ufw():
"""Disable ufw (without removing it) if it is installed."""
if subprocess.run(["which", "ufw"], capture_output=True, text=True).returncode != 0:
print("ufw is not installed. Good.")
return
status = subprocess.run(["ufw", "status"], capture_output=True, text=True)
if "Status: active" in status.stdout:
subprocess.run(["ufw", "disable"], capture_output=True, text=True)
print("ufw rules cleared.")
else:
print("ufw is not active. Good.")
# Disable the systemd unit regardless, to prevent it starting at boot.
svc = subprocess.run(["systemctl", "is-enabled", "ufw"],
capture_output=True, text=True)
if svc.stdout.strip() in ("enabled", "enabled-runtime"):
subprocess.run(["systemctl", "disable", "ufw"], capture_output=True, text=True)
print("Disabled ufw.service.")
def disable_system_dnsmasq(data):
"""Stop and disable the system dnsmasq.service if it is enabled."""
disable_systemd_resolved()
result = subprocess.run(
["systemctl", "is-enabled", "dnsmasq"],
capture_output=True, text=True
)
if result.stdout.strip() in ("enabled", "enabled-runtime"):
subprocess.run(["systemctl", "disable", "--now", "dnsmasq"],
capture_output=True, text=True)
print("Disabled system dnsmasq.service.")
else:
print("System dnsmasq.service is already disabled. Good.")
ensure_resolv_conf(data)
def restore_ntp():
"""Disable chrony and re-enable systemd-timesyncd for plain client NTP."""
result = subprocess.run(
["systemctl", "is-active", "chrony"], capture_output=True, text=True
)
if result.stdout.strip() == "active":
subprocess.run(["systemctl", "disable", "--now", "chrony"],
capture_output=True, text=True)
print("Disabled chrony.")
else:
print("chrony is not active.")
result = subprocess.run(
["systemctl", "cat", "systemd-timesyncd"], capture_output=True, text=True
)
if result.returncode == 0:
subprocess.run(["systemctl", "enable", "--now", "systemd-timesyncd"],
capture_output=True, text=True)
print("Enabled systemd-timesyncd.")
else:
print("systemd-timesyncd is not available on this system.")
# ===================================================================
# Apply dnsmasq instances
# ===================================================================
def wg_interface_up(iface):
"""Return True if the WireGuard interface exists and is up."""
result = subprocess.run(["ip", "link", "show", iface],
capture_output=True, text=True)
return result.returncode == 0
def wg_server_key_path(iface):
return WG_DIR / f"{iface}.key"
def wg_server_pubkey_path(iface):
"""Public key written to the configs dir so the Flask app can read it."""
return SCRIPT_DIR / f".wg-{iface}.pub"
def wg_conf_path_for(iface):
return WG_DIR / f"{iface}.conf"
def generate_wg_server_key(iface):
WG_DIR.mkdir(exist_ok=True)
result = subprocess.run(["wg", "genkey"], capture_output=True, text=True, check=True)
private = result.stdout.strip()
kf = wg_server_key_path(iface)
kf.write_text(private + "\n")
kf.chmod(0o600)
return private
def build_wg_server_conf(vlan, server_private_key):
"""Build the /etc/wireguard/<iface>.conf content from core.json peers."""
iface = vlan["interface"]
info = vlan["vpn_information"]
gateway = resolve_vlan_options(vlan)["gateway"]
network = network_for(vlan)
server_ip = f"{gateway}/{network.prefixlen}"
listen_port = info["listen_port"]
domain = info.get("domain", "local")
L = [
"# Generated by core.py -- do not edit manually.",
"# Run: sudo python3 core.py --apply",
"",
"[Interface]",
f"PrivateKey = {server_private_key}",
f"Address = {server_ip}",
f"ListenPort = {listen_port}",
"",
]
for peer in vlan.get("peers", []):
if not peer.get("enabled", True):
L += [f"# DISABLED: {peer['name']}", ""]
continue
L += [
f"# {peer['name']}",
"[Peer]",
f"PublicKey = {peer['public_key']}",
f"AllowedIPs = {peer['ip']}/32",
f"PersistentKeepalive = {WG_KEEPALIVE}",
"",
]
return "\n".join(L)
def ensure_wg_interfaces(data):
"""Generate WireGuard server confs and bring up / sync all WG interfaces."""
wg_vlans = [v for v in data.get("vlans", []) if is_wg(v)]
if not wg_vlans:
return
for vlan in wg_vlans:
iface = vlan["interface"]
print(f" [{iface}]")
kf = wg_server_key_path(iface)
if not kf.exists():
print(f" Generating server private key...")
private = generate_wg_server_key(iface)
else:
private = kf.read_text().strip()
pub_result = subprocess.run(
["wg", "pubkey"], input=private, capture_output=True, text=True, check=True
)
public = pub_result.stdout.strip()
pubkey_file = wg_server_pubkey_path(iface)
pubkey_file.write_text(public + "\n")
chown_to_script_dir_owner(pubkey_file)
print(f" Server public key: {public[:20]}...")
WG_DIR.mkdir(exist_ok=True)
conf_file = wg_conf_path_for(iface)
new_conf = build_wg_server_conf(vlan, private)
listen_port = vlan["vpn_information"]["listen_port"]
port_changed = False
if conf_file.exists():
m = re.search(r'ListenPort\s*=\s*(\d+)', conf_file.read_text())
if m and int(m.group(1)) != listen_port:
port_changed = True
conf_file.write_text(new_conf)
conf_file.chmod(0o600)
peer_count = len([p for p in vlan.get("peers", []) if p.get("enabled", True)])
print(f" Wrote {conf_file} ({peer_count} enabled peer(s))")
if not wg_interface_up(iface):
print(f" Bringing up {iface}...")
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} is up.")
elif port_changed:
print(f" Listen port changed -- restarting {iface}...")
subprocess.run(["wg-quick", "down", iface], capture_output=True, text=True)
r = subprocess.run(["wg-quick", "up", iface], capture_output=True, text=True)
if r.returncode != 0:
print(f" WARNING: wg-quick up {iface} failed: {r.stderr.strip()}")
else:
print(f" {iface} restarted.")
else:
print(f" Syncing peers to live {iface}...")
subprocess.run(["wg", "syncconf", iface, str(conf_file)], capture_output=True, text=True)
def get_container_bridges():
"""Return all active bridge interfaces not managed by our VLAN config.
Works universally for Docker, Podman, LXC, libvirt, etc. -- anything
that creates a Linux bridge interface.
"""
try:
result = subprocess.run(
["ip", "-j", "link", "show", "type", "bridge"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return []
import json as _json
links = _json.loads(result.stdout)
return [l["ifname"] for l in links if l.get("operstate") == "UP"]
except Exception:
return []
def get_container_bridge_ips():
"""Return {ifname: ip} for all active container bridge interfaces.
Used to add listen-address directives to the physical VLAN's dnsmasq
instance so containers can reach the local DNS resolver.
Works universally for Docker, Podman, LXC, libvirt, etc.
"""
try:
result = subprocess.run(
["ip", "-j", "addr", "show", "type", "bridge"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return {}
import json as _json
links = _json.loads(result.stdout)
out = {}
for l in links:
if l.get("operstate") != "UP":
continue
for addr in l.get("addr_info", []):
if addr.get("family") == "inet":
out[l["ifname"]] = addr["local"]
break
return out
except Exception:
return {}
def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True):
"""Write per-VLAN dnsmasq configs and service units.
start_if_needed=True: enable and start all instances.
start_if_needed=False (--apply): only restart instances already running;
skip with a warning if not running.
"""
active_service_stems = {vlan_service_name(vlan) for vlan in data["vlans"]}
if not dry_run:
DNSMASQ_CONF_DIR.mkdir(exist_ok=True)
disable_system_dnsmasq(data)
print()
for vlan in data["vlans"]:
if is_wg(vlan) and not dry_run and not wg_interface_up(vlan["interface"]):
print(f"Skipped VLAN '{vlan['name']}': {vlan['interface']} is not up. Run --apply again after WireGuard is up.")
continue
conf_content = build_vlan_dnsmasq_conf(vlan, data)
svc_content = build_vlan_service(vlan)
conf_path = vlan_conf_file(vlan)
svc_path = vlan_service_file(vlan)
if dry_run:
print(f"# -- {conf_path} (dry-run) --")
print(conf_content)
print(f"# -- {svc_path} (dry-run) --")
print(svc_content)
continue
conf_path.write_text(conf_content)
print(f"Written: {conf_path}")
if not svc_path.exists() or svc_path.read_text() != svc_content:
svc_path.write_text(svc_content)
print(f"Written: {svc_path}")
if dry_run:
return
print()
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
# Remove stale service units (VLANs removed from config)
for f in SYSTEMD_DIR.glob(f"dnsmasq-{PRODUCT_NAME}-*.service"):
if f.stem not in active_service_stems:
subprocess.run(["systemctl", "disable", "--now", f.stem],
capture_output=True, text=True)
f.unlink()
n = f.stem.removeprefix(f"dnsmasq-{PRODUCT_NAME}-")
stale_conf = DNSMASQ_CONF_DIR / f"{n}.conf"
if stale_conf.exists():
stale_conf.unlink()
print(f"Removed stale VLAN: {f.stem}")
if start_if_needed:
print("Starting dnsmasq instances...")
for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]):
continue
svc = vlan_service_name(vlan)
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
result = subprocess.run(["systemctl", "restart", svc],
capture_output=True, text=True)
if result.returncode == 0:
print(f" Started: {svc}")
else:
service_warning("start", svc, result.stderr)
else:
print("Reloading dnsmasq instances...")
for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]):
continue
svc = vlan_service_name(vlan)
state = subprocess.run(
["systemctl", "is-active", svc],
capture_output=True, text=True
).stdout.strip()
if state == "active":
result = subprocess.run(["systemctl", "restart", svc],
capture_output=True, text=True)
if result.returncode == 0:
print(f" Restarted: {svc}")
else:
service_warning("restart", svc, result.stderr)
elif is_wg(vlan):
# WG interface is up but dnsmasq isn't running -- start it now
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
result = subprocess.run(["systemctl", "start", svc],
capture_output=True, text=True)
if result.returncode == 0:
print(f" Started: {svc}")
else:
service_warning("start", svc, result.stderr)
else:
print(f" WARNING: {svc} is not running -- skipping (run --apply to start it)")
# ===================================================================
# Timer management
# ===================================================================
def parse_time_to_calendar(time_str):
parts = time_str.strip().split(":")
if len(parts) != 2:
die(f"Invalid daily_execute_time_24hr_local: '{time_str}'. Expected HH:MM.")
hh, mm = parts
return f"*-*-* {hh.zfill(2)}:{mm.zfill(2)}:00"
def install_timer(data):
general = data.get("general", {})
execute_time = general.get("daily_execute_time_24hr_local", "02:30")
on_calendar = parse_time_to_calendar(execute_time)
script_path = Path(__file__).resolve()
timer_content = "\n".join([
"# Generated by core.py -- do not edit manually.",
"",
"[Unit]",
"Description=Daily blocklist refresh",
"",
"[Timer]",
f"OnCalendar={on_calendar}",
"Persistent=true",
"",
"[Install]",
"WantedBy=timers.target",
"",
])
service_content = "\n".join([
"# Generated by core.py -- do not edit manually.",
"",
"[Unit]",
"Description=core.py daily blocklist refresh",
"After=network-online.target",
"Wants=network-online.target",
"",
"[Service]",
"Type=oneshot",
f"ExecStart=/usr/bin/python3 {script_path} --update-blocklists",
"",
])
for path, content in ((BLIST_TIMER_FILE, timer_content), (BLIST_TIMER_SVC_FILE, service_content)):
if not path.exists() or path.read_text() != content:
path.write_text(content)
print(f"Written: {path}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
subprocess.run(["systemctl", "enable", "--now", f"{BLIST_TIMER_NAME}.timer"],
capture_output=True, text=True)
print(f"Timer {BLIST_TIMER_NAME}.timer enabled (runs daily at {execute_time}).")
def install_dashboard_timer():
"""Install the 1-minute dashboard-queue timer that processes .dashboard-queue."""
timer_content = "\n".join([
"# Generated by core.py -- do not edit manually.",
"",
"[Unit]",
"Description=Router dashboard pending-update processor",
"",
"[Timer]",
f"OnActiveSec={DASHB_TIMER_INTERVAL_SEC}s",
f"OnUnitActiveSec={DASHB_TIMER_INTERVAL_SEC}s",
"AccuracySec=10s",
"",
"[Install]",
"WantedBy=timers.target",
"",
])
service_content = "\n".join([
"# Generated by core.py -- do not edit manually.",
"",
"[Unit]",
"Description=Router dashboard update processor",
"",
"[Service]",
"Type=oneshot",
f"ExecStart=/bin/bash {DASHB_SCRIPT_FILE}",
"",
])
for path, content in ((DASHB_TIMER_FILE, timer_content), (DASHB_TIMER_SVC_FILE, service_content)):
if not path.exists() or path.read_text() != content:
path.write_text(content)
print(f"Written: {path}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
subprocess.run(["systemctl", "enable", f"{DASHB_TIMER_NAME}.timer"],
capture_output=True, text=True)
active = subprocess.run(
["systemctl", "is-active", f"{DASHB_TIMER_NAME}.timer"],
capture_output=True, text=True
).stdout.strip() == "active"
verb = "restart" if active else "start"
subprocess.run(["systemctl", verb, f"{DASHB_TIMER_NAME}.timer"],
capture_output=True, text=True)
print(f"Timer {DASHB_TIMER_NAME}.timer enabled (runs every {DASHB_TIMER_INTERVAL_SEC}s).")
def remove_dashboard_timer():
subprocess.run(["systemctl", "disable", "--now", f"{DASHB_TIMER_NAME}.timer"],
capture_output=True, text=True)
for f in (DASHB_TIMER_FILE, DASHB_TIMER_SVC_FILE):
if f.exists():
f.unlink()
print(f"Removed: {f}")
else:
print(f"Not found, skipping: {f}")
def remove_timer():
subprocess.run(["systemctl", "disable", "--now", f"{BLIST_TIMER_NAME}.timer"],
capture_output=True, text=True)
for f in (BLIST_TIMER_FILE, BLIST_TIMER_SVC_FILE):
if f.exists():
f.unlink()
print(f"Removed: {f}")
else:
print(f"Not found, skipping: {f}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
# ===================================================================
# banned_ips expansion
# ===================================================================
def _expand_banned_ipv4(ip_str):
"""Convert an IPv4 pattern (CIDR, wildcard, range) to nftables set elements."""
if '/' in ip_str:
ipaddress.IPv4Network(ip_str, strict=False) # validate
return [ip_str]
parts = ip_str.split('.')
if len(parts) != 4:
raise ValueError(f"Invalid IPv4 pattern: {ip_str!r} - expected 4 octets")
def parse_octet(s, pos):
if s == '*':
return (0, 255)
if '-' in s:
a, b = s.split('-', 1)
lo, hi = int(a), int(b)
if not (0 <= lo <= hi <= 255):
raise ValueError(f"Invalid octet range {s!r} in {ip_str!r}")
return (lo, hi)
v = int(s)
if not 0 <= v <= 255:
raise ValueError(f"Octet value {v} out of range in {ip_str!r}")
return (v, v)
ranges = [parse_octet(p, i) for i, p in enumerate(parts)]
# Count trailing full-wildcard octets to determine CIDR suffix length
trailing = 0
for lo, hi in reversed(ranges):
if lo == 0 and hi == 255:
trailing += 1
else:
break
prefix_len = 32 - 8 * trailing
prefix_ranges = ranges[:4 - trailing]
# Guard against combinatorial explosion
total = 1
for lo, hi in prefix_ranges:
total *= (hi - lo + 1)
if total > 1024:
raise ValueError(
f"Pattern {ip_str!r} would expand to {total} entries (limit 1024). "
f"Use CIDR notation instead."
)
results = []
if trailing > 0:
# Enumerate prefix octets; each combination yields a CIDR
def _enum_cidr(idx, chosen):
if idx == len(prefix_ranges):
base = '.'.join(str(v) for v in chosen) + '.0' * trailing
if prefix_len == 32:
results.append(base)
else:
results.append(f"{base}/{prefix_len}")
return
lo, hi = prefix_ranges[idx]
for v in range(lo, hi + 1):
_enum_cidr(idx + 1, chosen + [v])
_enum_cidr(0, [])
else:
# No trailing wildcards - enumerate outer 3 octets, express last as range
outer_ranges = ranges[:3]
lo4, hi4 = ranges[3]
def _enum_range(idx, chosen):
if idx == 3:
base = '.'.join(str(v) for v in chosen)
if lo4 == hi4:
results.append(f"{base}.{lo4}")
else:
results.append(f"{base}.{lo4}-{base}.{hi4}")
return
lo, hi = outer_ranges[idx]
for v in range(lo, hi + 1):
_enum_range(idx + 1, chosen + [v])
_enum_range(0, [])
return results
def _expand_banned_ipv6(ip_str):
"""Convert an IPv6 pattern (CIDR, single IP, or trailing-wildcard) to nftables set elements.
Supported formats:
Single address : "2a01:4f8:c17:b0f::2" -- passed through as-is
CIDR : "2a01:4f8::/32" -- passed through as-is
Wildcard : "2a01:4f8:c17:*" -- prefix:* expands to a CIDR
"2a01:4f8:c17:b00::*" -- :: compression is supported
Range notation (e.g. "b00-bff") is not supported for IPv6. Use CIDR instead.
"""
if '/' in ip_str:
ipaddress.IPv6Network(ip_str, strict=False) # validate
return [ip_str]
if '*' not in ip_str:
ipaddress.IPv6Address(ip_str) # validate single address
return [ip_str]
if not ip_str.endswith(':*'):
raise ValueError(
f"Unsupported IPv6 wildcard pattern {ip_str!r}. "
f"Use 'prefix:*' (e.g. '2a01:4f8:c17:*') or CIDR notation. "
f"Range notation (e.g. 'b00-bff') is not supported for IPv6."
)
prefix_part = ip_str[:-2] # strip trailing ':*'
# Expand '::' compression if present.
# IPv6 has 8 groups total. The wildcard occupies one slot, so the prefix
# may have at most 7 explicit groups. We know exactly how many zero groups
# '::' represents: 8 - len(left_groups) - len(right_groups) - 1 (for wildcard).
if '::' in prefix_part:
left, right = prefix_part.split('::', 1)
left_groups = [g for g in left.split(':') if g] if left else []
right_groups = [g for g in right.split(':') if g] if right else []
zero_count = 8 - len(left_groups) - len(right_groups) - 1
if zero_count < 0:
raise ValueError(f"IPv6 wildcard pattern {ip_str!r} has too many groups.")
groups = left_groups + ['0000'] * zero_count + right_groups
else:
groups = [g for g in prefix_part.split(':') if g]
num_groups = len(groups)
prefix_bits = num_groups * 16
if num_groups < 1 or num_groups > 7:
raise ValueError(
f"IPv6 wildcard pattern {ip_str!r} must have between 1 and 7 "
f"prefix groups before the wildcard."
)
base = ':'.join(groups) + ':' + ':'.join(['0000'] * (8 - num_groups))
addr = ipaddress.IPv6Address(base)
return [f"{addr}/{prefix_bits}"]
def expand_banned_ip(ip_str):
"""Return (family, [nftables_elements]) for a banned_ips entry.
family is 'ipv4' or 'ipv6'."""
if ':' in ip_str:
return ('ipv6', _expand_banned_ipv6(ip_str))
return ('ipv4', _expand_banned_ipv4(ip_str))
def banned_ip_sets(data):
"""Return (v4_elements, v6_elements) as flat lists of nftables set element strings."""
v4, v6 = [], []
for entry in rule_enabled(data.get("banned_ips", [])):
family, elements = expand_banned_ip(entry["ip"])
if family == 'ipv4':
v4.extend(elements)
else:
v6.extend(elements)
return v4, v6
# ===================================================================
# nftables config generation
# ===================================================================
def build_nft_config(data, dry_run=False):
wan = data["general"]["wan_interface"]
# Exclude WG VLANs whose interface is not up -- nft rejects rules that
# reference non-existent interfaces, which would leave no firewall at all.
vlans = [v for v in data["vlans"]
if not is_wg(v) or dry_run or wg_interface_up(v["interface"])]
all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_wrngl = [(v, r) for v in vlans for r in rule_enabled(v.get("port_wrangling", []))]
# Interfaces that are active (WG interfaces only included if up)
active_ifaces = {v["interface"] for v in vlans}
# Build interface -> network map for nat_ip -> iface lookup in forward chain
vlan_networks = {}
for v in vlans:
try:
net = network_for(v)
vlan_networks[v["interface"]] = net
except (KeyError, ValueError):
pass
all_except = rule_enabled(data.get("inter_vlan_exceptions", []))
banned_v4, banned_v6 = banned_ip_sets(data)
container_bridges = get_container_bridges()
L = []
def line(s=""):
L.append(s)
line("# Generated by core.py -- do not edit manually.")
line("# Edit core.json and re-run: sudo python3 core.py --apply")
line()
# ==========================================================================
# {PRODUCT_NAME}-nat table
# ==========================================================================
line(f"table ip {PRODUCT_NAME}-nat {{")
line()
line(" chain prerouting {")
line(" type nat hook prerouting priority dstnat - 10; policy accept;")
line()
if all_fwd:
line(" # -- Port forwarding (inbound WAN -> LAN host) ---------------")
line()
for rule in all_fwd:
for proto, r, suffix in expand_protocols(rule):
line(f" # {r['description']}{suffix}")
line(f" iif \"{wan}\" {proto} dport {r['dest_port']} dnat to {r['nat_ip']}:{r['nat_port']}")
line()
if all_wrngl:
line(" # -- Port wrangling (redirect VLAN traffic to local host) ----")
line()
for vlan, rule in all_wrngl:
iface = vlan["interface"]
for proto, r, suffix in expand_protocols(rule):
line(f" # {r['description']}{suffix}")
line(f" iif \"{iface}\" {proto} dport {r['dest_port']} ip daddr != {r['redirect_to']} dnat to {r['redirect_to']}")
line()
line(" }")
line()
line(" chain postrouting {")
line(" type nat hook postrouting priority srcnat; policy accept;")
line()
line(" # Masquerade all outbound traffic through WAN")
line(f" oif \"{wan}\" masquerade")
line()
line(" }")
line()
line("}")
line()
# ==========================================================================
# {PRODUCT_NAME}-filter table
# ==========================================================================
line(f"table ip {PRODUCT_NAME}-filter {{")
line()
if banned_v4:
line(" set banned_ipv4 {")
line(" type ipv4_addr")
line(" flags interval")
elements = ", ".join(banned_v4)
line(f" elements = {{ {elements} }}")
line(" }")
line()
# INPUT chain
line(" # INPUT -- traffic destined for this machine itself")
line(" chain input {")
line(" type filter hook input priority filter; policy drop;")
line()
if banned_v4:
line(" # Drop banned IPs on WAN inbound")
line(f" iif \"{wan}\" ip saddr @banned_ipv4 drop")
line()
line(" # Allow loopback")
line(" iif \"lo\" accept")
line()
line(" # Allow established/related return traffic")
line(" ct state established,related accept")
line()
line(" # Allow ICMP (ping) from anywhere")
line(" ip protocol icmp accept")
line()
# mDNS -- allow avahi-daemon to receive mDNS on reflection interfaces
if avahi_enabled(data):
mdns_ifaces = avahi_interfaces(data)
if mdns_ifaces:
iface_set = ", ".join(f'"{i}"' for i in mdns_ifaces)
line(" # mDNS (port 5353) -- allow on reflection interfaces for avahi")
line(f" iif {{ {iface_set} }} udp dport 5353 accept")
line()
# RADIUS -- must come BEFORE the broad VLAN accept rules below,
# otherwise the broad accept fires first and the drop is never reached.
r_clients = radius_clients(data)
if r_clients:
allowed_ips = ", ".join(r["ip"] for r, _ in r_clients)
line(" # RADIUS (port 1812) -- allow only designated authenticators")
line(f" ip saddr {{ {allowed_ips} }} udp dport 1812 accept")
line(" udp dport 1812 drop")
line()
if container_bridges:
iface_set = ", ".join(f'"{b}"' for b in container_bridges)
line(" # Allow DNS from container bridge networks (Docker, Podman, etc.)")
line(f" iif {{ {iface_set} }} meta l4proto {{ tcp, udp }} th dport 53 accept")
line()
line(" # Allow all traffic inbound from any VLAN interface")
for vlan in vlans:
line(f" iif \"{vlan['interface']}\" accept # {vlan['name']}")
line()
if all_fwd:
line(" # Allow inbound WAN access for port-forwarded services")
line()
for rule in all_fwd:
for proto, r, suffix in expand_protocols(rule):
line(f" # {r['description']}{suffix}")
line(f" iif \"{wan}\" {proto} dport {r['dest_port']} accept")
line()
line(" # Drop all other inbound WAN traffic")
line(" }")
line()
# FORWARD chain
line(" # FORWARD -- traffic being routed through this machine")
line(" chain forward {")
line(" type filter hook forward priority filter; policy drop;")
line()
if banned_v4:
line(" # Drop banned IPs on WAN inbound")
line(f" iif \"{wan}\" ip saddr @banned_ipv4 drop")
line()
line(" # Allow established/related return traffic")
line(" ct state established,related accept")
line()
line(" # Allow each VLAN -> WAN (outbound internet)")
for vlan in vlans:
line(f" iif \"{vlan['interface']}\" oif \"{wan}\" accept # {vlan['name']} -> WAN")
line()
if container_bridges:
line(" # Allow VLAN -> Docker bridge forwarding")
for vlan in vlans:
for bridge in container_bridges:
line(f" iif \"{vlan['interface']}\" oif \"{bridge}\" ct state new accept"
f" # {vlan['name']} -> {bridge}")
line()
line(" # Allow Docker containers -> WAN (outbound internet access)")
line(f" iif != \"{wan}\" oif \"{wan}\" ct state new accept")
line()
if avahi_enabled(data):
mdns_ifaces = avahi_interfaces(data)
if len(mdns_ifaces) > 1:
iface_set = ", ".join(f'"{i}"' for i in mdns_ifaces)
line(" # mDNS forwarding between reflection interfaces for avahi")
line(f" iif {{ {iface_set} }} oif {{ {iface_set} }} udp dport 5353 accept")
line()
if all_except:
line(" # -- Inter-VLAN exceptions ------------------------------------------")
line()
for r in all_except:
src = r["src_ip_or_subnet"]
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
port = r.get("dst_port")
for proto, _, suffix in expand_protocols(r):
line(f" # {r['description']}{suffix}")
if port is not None:
line(f" ip saddr {src} ip daddr {dst} {proto} dport {port} ct state new accept")
else:
line(f" ip saddr {src} ip daddr {dst} ct state new accept")
line()
if all_fwd:
line(" # Allow inbound WAN -> VLAN for active port forwarding rules")
line()
for rule in all_fwd:
try:
nat_addr = ipaddress.IPv4Address(rule["nat_ip"])
iface = wan # fallback
for iface_key, net in vlan_networks.items():
if nat_addr in net:
iface = iface_key
break
except ValueError:
iface = wan
for proto, r, suffix in expand_protocols(rule):
line(f" # {r['description']}{suffix}")
line(f" iif \"{wan}\" oif \"{iface}\" {proto} dport {r['nat_port']} ip daddr {r['nat_ip']} ct state new accept")
line()
line(" }")
line()
line(" chain output {")
line(" type filter hook output priority filter; policy accept;")
line(" }")
line()
line("}")
if banned_v6:
line()
line(f"table ip6 {PRODUCT_NAME}-ban {{")
line()
line(" set banned_ipv6 {")
line(" type ipv6_addr")
line(" flags interval")
elements = ", ".join(banned_v6)
line(f" elements = {{ {elements} }}")
line(" }")
line()
line(" chain input {")
line(" type filter hook input priority filter; policy accept;")
line(f" iif \"{wan}\" ip6 saddr @banned_ipv6 drop")
line(" }")
line()
line(" chain forward {")
line(" type filter hook forward priority filter; policy accept;")
line(f" iif \"{wan}\" ip6 saddr @banned_ipv6 drop")
line(" }")
line()
line("}")
return "\n".join(L)
# ===================================================================
# nftables apply / disable / status
# ===================================================================
def table_exists(family, name):
result = subprocess.run(
["nft", "list", "table", family, name],
capture_output=True, text=True
)
return result.returncode == 0
def delete_our_tables():
for family, table in [("ip", f"{PRODUCT_NAME}-nat"), ("ip", f"{PRODUCT_NAME}-filter"), ("ip6", f"{PRODUCT_NAME}-ban")]:
if table_exists(family, table):
result = subprocess.run(
["nft", "delete", "table", family, table],
capture_output=True, text=True
)
if result.returncode != 0:
die(f"Failed to delete table {family} {table}: {result.stderr.strip()}")
print(f"Removed existing table: {family} {table}")
else:
print(f"Table not present, skipping delete: {family} {table}")
def apply_nft_config(config_text):
result = subprocess.run(
["nft", "-f", "-"],
input=config_text,
capture_output=True, text=True
)
if result.returncode != 0:
print("ERROR: nft rejected the ruleset:", file=sys.stderr)
print(result.stderr, file=sys.stderr)
sys.exit(1)
def apply_nftables(data, dry_run=False):
config = build_nft_config(data, dry_run=dry_run)
if dry_run:
print(config)
return
active_ifaces = {v["interface"] for v in data["vlans"]
if not is_wg(v) or wg_interface_up(v["interface"])}
active_vlans = [v for v in data["vlans"] if v["interface"] in active_ifaces]
all_fwd = list(rule_enabled(data.get("port_forwarding", [])))
all_dis_fwd = list(rule_disabled(data.get("port_forwarding", [])))
all_wrngl = [(v, r) for v in active_vlans for r in rule_enabled(v.get("port_wrangling", []))]
all_dis_wrngl = [(v, r) for v in data["vlans"] for r in rule_disabled(v.get("port_wrangling", []))]
all_except = rule_enabled(data.get("inter_vlan_exceptions", []))
print(f"Applying {len(all_fwd)} port forwarding rule(s), {len(all_dis_fwd)} skipped.")
print(f"Applying {len(all_wrngl)} port wrangling rule(s), {len(all_dis_wrngl)} skipped.")
print(f"Applying {len(all_except)} inter-VLAN exception(s).")
container_bridges = get_container_bridges()
if container_bridges:
print(f"Container bridges: {', '.join(container_bridges)}")
print()
delete_our_tables()
apply_nft_config(config)
print("nftables rules applied successfully.")
# Build set of active subnets for filtering exception display
active_subnets = []
for v in data["vlans"]:
if is_wg(v) and not wg_interface_up(v["interface"]):
continue
try:
active_subnets.append(network_for(v))
except (KeyError, ValueError):
pass
def dst_is_active(r):
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
try:
# Single IP -- check if it's in an active subnet
addr = ipaddress.IPv4Address(dst)
return any(addr in net for net in active_subnets)
except ValueError:
try:
# Subnet -- check if it overlaps with any active subnet
net = ipaddress.IPv4Network(dst, strict=False)
return any(net.overlaps(s) for s in active_subnets)
except ValueError:
return True
if all_fwd:
print()
print("Active port forwarding:")
for r in all_fwd:
print(f" [{r['protocol'].upper():<4}] :{r['dest_port']} -> {r['nat_ip']}:{r['nat_port']} ({r['description']})")
if all_dis_fwd:
print()
print("Skipped port forwarding (disabled):")
for r in all_dis_fwd:
print(f" [{r['protocol'].upper():<4}] :{r['dest_port']} -> {r['nat_ip']}:{r['nat_port']} ({r['description']})")
if all_wrngl:
print()
print("Active port wrangling:")
for vlan, r in all_wrngl:
print(f" [{r['protocol'].upper():<4}] :{r['dest_port']} -> {r['redirect_to']} ({r['description']}) [{vlan['name']}]")
active_except = [r for r in all_except if dst_is_active(r)]
if active_except:
print()
print("Active inter-VLAN exceptions:")
for r in active_except:
src = r["src_ip_or_subnet"]
dst = r.get("dst_ip_or_subnet") or r.get("dst_ip", "")
port = r.get("dst_port")
dst_str = f"{dst}:{port}" if port is not None else dst
print(f" [{r['protocol'].upper():<4}] {src} -> {dst_str} ({r['description']})")
def show_rules():
for table in (f"{PRODUCT_NAME}-nat", f"{PRODUCT_NAME}-filter"):
result = subprocess.run(
["nft", "list", "table", "ip", table],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"[{table}] not found (not yet applied)")
else:
print(result.stdout)
# ===================================================================
# nftables boot service
# ===================================================================
def install_nat_service():
script_path = Path(__file__).resolve()
service_content = f"""[Unit]
Description=Apply {PRODUCT_NAME} NAT and firewall rules
After=network-online.target docker.service
Wants=network-online.target docker.service
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 {script_path} --apply
RemainAfterExit=yes
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
"""
existing = NAT_SERVICE_FILE.read_text() if NAT_SERVICE_FILE.exists() else None
if existing == service_content:
print(f"Boot service already up to date: {NAT_SERVICE_FILE}")
return
NAT_SERVICE_FILE.write_text(service_content)
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", NAT_SERVICE_NAME], check=True)
if existing is None:
print(f"Boot service installed and enabled: {NAT_SERVICE_FILE}")
else:
print(f"Boot service updated: {NAT_SERVICE_FILE}")
def remove_nat_service():
if NAT_SERVICE_FILE.exists():
subprocess.run(["systemctl", "disable", "--now", NAT_SERVICE_NAME],
capture_output=True, text=True)
NAT_SERVICE_FILE.unlink()
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
print(f"Removed boot service: {NAT_SERVICE_NAME}.service")
else:
print(f"Boot service not found, skipping: {NAT_SERVICE_NAME}.service")
# ===================================================================
# Status
# ===================================================================
# ===================================================================
# RADIUS
# ===================================================================
RADIUS_SECRET_FILE = SCRIPT_DIR / ".radius-secret"
RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf")
RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users")
def radius_clients(data):
"""Return list of (reservation, vlan) tuples where radius_client is True."""
result = []
for vlan in data["vlans"]:
for r in vlan.get("reservations", []):
if r.get("radius_client") is True:
result.append((r, vlan))
return result
def radius_enabled(data):
"""Return True if any reservation has radius_client: true."""
return len(radius_clients(data)) > 0
def ensure_radius_secret():
"""Generate a random RADIUS shared secret if .radius-secret does not exist."""
if RADIUS_SECRET_FILE.exists():
return RADIUS_SECRET_FILE.read_text().strip()
import secrets as _secrets
secret = _secrets.token_urlsafe(32)
RADIUS_SECRET_FILE.write_text(secret + "\n")
RADIUS_SECRET_FILE.chmod(0o600)
print(f"Generated RADIUS shared secret: {RADIUS_SECRET_FILE}")
return secret
def build_radius_clients_conf(data, secret):
"""Generate freeradius clients.conf from reservations with radius_client: true."""
lines = [
"# Generated by core.py -- do not edit manually.",
"# Edit core.json and re-run: sudo python3 core.py --apply",
"",
"# localhost (required)",
"client localhost {",
" ipaddr = 127.0.0.1",
f" secret = {secret}",
" shortname = localhost",
"}",
"",
]
for r, vlan in radius_clients(data):
name = r.get("hostname") or r.get("description", "unknown").replace(" ", "-").lower()
lines += [
f"# {r['description']}",
f"client {name} {{",
f" ipaddr = {r['ip']}",
f" secret = {secret}",
f" shortname = {name}",
"}",
"",
]
return "\n".join(lines)
def build_radius_users(data):
"""
Generate freeradius users file.
Each MAC reservation across all VLANs gets an entry mapping it to its VLAN ID.
Unknown MACs fall through to DEFAULT which returns the radius_default VLAN.
MACs are formatted without colons (FreeRADIUS MAB format).
"""
default_vlan = next(
(v for v in data["vlans"] if v.get("radius_default") is True), None
)
if default_vlan is None:
die("No VLAN has radius_default: true. Cannot generate RADIUS users file.")
lines = [
"# Generated by core.py -- do not edit manually.",
"# Edit core.json and re-run: sudo python3 core.py --apply",
"",
]
for vlan in data["vlans"]:
vlan_id = vlan["vlan_id"]
for r in vlan.get("reservations", []):
if r.get("enabled") is not True:
continue
mac = r.get("mac", "").replace(":", "").lower()
if not mac:
continue
lines += [
f"# {r['description']} -> VLAN {vlan_id} ({vlan['name']})",
f"{mac} Cleartext-Password := \"{mac}\"",
f" Tunnel-Type = VLAN,",
f" Tunnel-Medium-Type = IEEE-802,",
f" Tunnel-Private-Group-Id = \"{vlan_id}\"",
"",
]
default_id = default_vlan["vlan_id"]
lines += [
f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})",
"DEFAULT Auth-Type := Accept",
f" Tunnel-Type = VLAN,",
f" Tunnel-Medium-Type = IEEE-802,",
f" Tunnel-Private-Group-Id = \"{default_id}\"",
"",
]
return "\n".join(lines)
def apply_radius(data):
"""Write FreeRADIUS config files and restart the service."""
secret = ensure_radius_secret()
clients_content = build_radius_clients_conf(data, secret)
users_content = build_radius_users(data)
changed = False
for path, content in [(RADIUS_CLIENTS_CONF, clients_content),
(RADIUS_USERS_FILE, users_content)]:
existing = path.read_text() if path.exists() else None
if existing != content:
path.write_text(content)
print(f"Written: {path}")
changed = True
else:
print(f"Unchanged: {path}")
# Always ensure service is running; restart only if config changed
svc = "freeradius"
state = subprocess.run(
["systemctl", "is-active", svc], capture_output=True, text=True
).stdout.strip()
if state == "active":
if changed:
result = subprocess.run(["systemctl", "restart", svc],
capture_output=True, text=True)
if result.returncode == 0:
print("freeradius restarted.")
else:
service_warning("restart", "freeradius", result.stderr)
else:
print("freeradius: running, config unchanged.")
else:
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
result = subprocess.run(["systemctl", "start", svc],
capture_output=True, text=True)
if result.returncode == 0:
print("freeradius started.")
else:
service_warning("start", "freeradius", result.stderr)
# ===================================================================
# Avahi mDNS Reflector
# ===================================================================
AVAHI_CONF_FILE = Path("/etc/avahi/avahi-daemon.conf")
def avahi_enabled(data):
"""Return True if at least one non-WireGuard VLAN has mdns_reflection enabled."""
return any(v.get("mdns_reflection") is True for v in data.get("vlans", []) if not is_wg(v))
def avahi_interfaces(data):
"""Return list of interface names for VLANs with mdns_reflection enabled."""
return [v["interface"] for v in data.get("vlans", []) if v.get("mdns_reflection") is True and not is_wg(v)]
def build_avahi_conf(data):
"""Patch avahi-daemon.conf directives needed for cross-VLAN mDNS reflection.
Reads the existing file (default or previously patched) and modifies only
the specific directives we need, leaving everything else untouched.
"""
ifaces = avahi_interfaces(data)
if not AVAHI_CONF_FILE.exists():
return None
content = AVAHI_CONF_FILE.read_text()
def set_directive(text, directive, value):
"""Enable and set a directive, whether it is commented out or already set."""
import re
# Match the directive commented out (#directive=...) or set (directive=...)
pattern = re.compile(
rf"^#?\s*{re.escape(directive)}\s*=.*$", re.MULTILINE
)
replacement = f"{directive}={value}"
if pattern.search(text):
return pattern.sub(replacement, text)
# Not present at all - this shouldn't happen with a standard avahi install
# but append it to the relevant section if needed
return text + f"\n{replacement}\n"
content = set_directive(content, "use-ipv6", "no")
content = set_directive(content, "disallow-other-stacks", "yes")
content = set_directive(content, "allow-interfaces", ",".join(ifaces))
content = set_directive(content, "enable-reflector", "yes")
content = set_directive(content, "disable-publishing", "yes")
return content
def apply_avahi(data):
"""Write avahi-daemon.conf and ensure service is running."""
import shutil
if not shutil.which("avahi-daemon"):
print("avahi-daemon is not installed.")
print(" -> Run: sudo python3 install.py")
return
ifaces = avahi_interfaces(data)
if len(ifaces) < 2:
print("mDNS reflection requires at least two VLANs in reflect_vlans. Skipping.")
return
if not AVAHI_CONF_FILE.exists():
print(f"WARNING: {AVAHI_CONF_FILE} not found. Run: sudo python3 install.py")
return
content = build_avahi_conf(data)
existing = AVAHI_CONF_FILE.read_text()
changed = existing != content
if changed:
AVAHI_CONF_FILE.write_text(content)
print(f"Written: {AVAHI_CONF_FILE}")
print(f" Reflecting mDNS across: {', '.join(ifaces)}")
else:
print(f"Unchanged: {AVAHI_CONF_FILE}")
svc = "avahi-daemon"
state = subprocess.run(
["systemctl", "is-active", svc], capture_output=True, text=True
).stdout.strip()
if state == "active":
if changed:
result = subprocess.run(["systemctl", "restart", svc],
capture_output=True, text=True)
if result.returncode == 0:
print("avahi-daemon restarted.")
else:
service_warning("restart", "avahi-daemon", result.stderr)
else:
print("avahi-daemon: running, config unchanged.")
else:
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
result = subprocess.run(["systemctl", "start", svc],
capture_output=True, text=True)
if result.returncode == 0:
print("avahi-daemon started.")
else:
service_warning("start", "avahi-daemon", result.stderr)
def disable_avahi():
"""Stop and disable avahi-daemon."""
result = subprocess.run(
["systemctl", "is-active", "avahi-daemon"], capture_output=True, text=True
)
if result.stdout.strip() == "active":
subprocess.run(["systemctl", "disable", "--now", "avahi-daemon"],
capture_output=True, text=True)
print("avahi-daemon stopped and disabled.")
else:
print("avahi-daemon: not running, skipping.")
def show_status(data):
import shutil
col = shutil.get_terminal_size((80, 24)).columns
def svc_row(unit, expected_active="active"):
r_active = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
r_enabled = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
active = r_active.stdout.strip()
enabled = r_enabled.stdout.strip()
active_sym = "+" if active == "active" else "x"
enabled_sym = "+" if enabled == "enabled" else "x"
active_ok = "(OK) " if active == expected_active else "(BAD)"
enabled_ok = "(OK) " if enabled == "enabled" else "(BAD)"
return active_sym, active, active_ok, enabled_sym, enabled, enabled_ok
units = []
for vlan in data["vlans"]:
if is_wg(vlan) and not wg_interface_up(vlan["interface"]):
units.append((vlan_service_name(vlan), "(wg0 not up)", "active"))
else:
units.append((vlan_service_name(vlan), None, "active"))
units.append((f"{BLIST_TIMER_NAME}.timer", None, "active"))
units.append((NAT_SERVICE_NAME, None, "inactive")) # oneshot - exits after running
units.append(("freeradius", None, "active"))
units.append(("avahi-daemon", None, "active"))
print(f" {'UNIT':<45} {'ACTIVE':<18} {'ENABLED'}")
print(f" {'-'*45} {'-'*18} {'-'*15}")
for unit, note, expected_active in units:
if note:
print(f" {unit:<45} {note}")
else:
active_sym, active, active_ok, enabled_sym, enabled, enabled_ok = svc_row(unit, expected_active)
print(f" {unit:<45} {active_sym} {active:<10} {active_ok} {enabled_sym} {enabled:<10} {enabled_ok}")
# Timer next trigger
r = subprocess.run(
["systemctl", "show", f"{BLIST_TIMER_NAME}.timer", "--property=NextElapseUSecRealtime,NextElapseUSecMonotonic"],
capture_output=True, text=True
)
# Fall back to human-readable 'Trigger' field from status output
r2 = subprocess.run(
["systemctl", "status", f"{BLIST_TIMER_NAME}.timer", "--no-pager"],
capture_output=True, text=True
)
for line in r2.stdout.splitlines():
line = line.strip()
if line.startswith("Trigger:"):
trigger = line.split("Trigger:", 1)[1].strip()
if trigger and trigger != "n/a":
print(f"\n Next blocklist update: {trigger}")
break
def show_configs(data):
for vlan in data["vlans"]:
cf = vlan_conf_file(vlan)
if cf.exists():
print(f"# -- {cf} --")
print(cf.read_text())
else:
print(f"No config found at {cf} (not yet applied).")
# ===================================================================
# Leases
# ===================================================================
def reset_leases(data, vlan_name=None):
"""Stop dnsmasq instances, delete lease files, restart instances.
If vlan_name is given, only reset that VLAN. Otherwise reset all.
"""
check_root()
vlans = [v for v in data["vlans"] if not is_wg(v)]
if vlan_name:
vlans = [v for v in vlans if v["name"] == vlan_name]
if not vlans:
die(f"Unknown VLAN name '{vlan_name}'. "
f"Valid names: {', '.join(v['name'] for v in data['vlans'] if not is_wg(v))}")
print(f"Resetting leases for: {', '.join(v['name'] for v in vlans)}")
print()
# Stop
for vlan in vlans:
svc = vlan_service_name(vlan)
result = subprocess.run(["systemctl", "stop", svc],
capture_output=True, text=True)
if result.returncode == 0:
print(f" Stopped: {svc}")
else:
print(f" WARNING: Could not stop {svc}: {result.stderr.strip()}")
# Delete lease files
print()
for vlan in vlans:
lf = vlan_leases_file(vlan)
if lf.exists():
lf.unlink()
print(f" Deleted: {lf}")
else:
print(f" No lease file: {lf}")
# Restart
print()
for vlan in vlans:
svc = vlan_service_name(vlan)
result = subprocess.run(["systemctl", "start", svc],
capture_output=True, text=True)
if result.returncode == 0:
print(f" Started: {svc}")
else:
print(f" WARNING: Could not start {svc}: {result.stderr.strip()}")
print()
print("Done. Devices will get fresh leases on their next DHCP request.")
def show_leases(data):
# Build MAC -> reservation lookup across all VLANs
res_by_mac = {}
for vlan in data["vlans"]:
for r in vlan.get("reservations", []):
if r.get("enabled") is True:
mac = r.get("mac", "").lower().strip()
if mac:
res_by_mac[mac] = (r, vlan)
now = int(datetime.now().timestamp())
any_leases = False
for vlan in data["vlans"]:
if is_wg(vlan):
continue
lf = vlan_leases_file(vlan)
if not lf.exists():
continue
lines = lf.read_text().strip().splitlines()
if not lines:
continue
if not any_leases:
print(f"{'IP':<18} {'MAC':<20} {'HOSTNAME':<26} {'VLAN':<8} {'EXPIRES':<22} {'TIME LEFT':<18} {'TYPE':<8} {'DESCRIPTION'}")
print("-" * 145)
any_leases = True
for entry in lines:
parts = entry.split()
if len(parts) < 4:
continue
expire_ts = parts[0]
mac = parts[1]
ip = parts[2]
hostname = parts[3] if parts[3] != "*" else "(unknown)"
hostname = hostname[:26]
if expire_ts == "0":
expires_str = "permanent"
time_left = ""
else:
expire_int = int(expire_ts)
seconds = expire_int - now
if seconds <= 0:
expires_str = "expired"
time_left = ""
else:
hours, rem = divmod(seconds, 3600)
mins, _ = divmod(rem, 60)
expire_dt = datetime.fromtimestamp(expire_int)
expires_str = expire_dt.strftime("%Y-%m-%d %H:%M:%S")
time_left = f"{hours}h {mins}m"
match = res_by_mac.get(mac.lower())
lease_type = "static" if (match and not is_dynamic_ip(match[0])) else "dynamic"
description = match[0].get("description", "") if match else ""
print(f"{ip:<18} {mac:<20} {hostname:<26} {vlan['name']:<8} {expires_str:<22} {time_left:<18} {lease_type:<8} {description}")
if not any_leases:
print("No active leases found.")
# ===================================================================
# Metrics
# ===================================================================
def collect_metrics(data):
"""
Send SIGUSR1 to each running dnsmasq instance and parse stats from
journalctl. Returns a combined metrics dict, or None if unavailable.
"""
metrics = {
"queries_forwarded": 0,
"queries_answered_locally": 0,
"queries_authoritative": 0,
"cache_reused": 0,
"tcp_hwm": 0,
"tcp_max_allowed": 0,
"pool_memory_max": 0,
"dnssec_subqueries_hwm": 0,
"dnssec_crypto_hwm": 0,
"dnssec_sig_fails_hwm": 0,
"servers": []
}
any_running = False
for vlan in data["vlans"]:
svc = vlan_service_name(vlan)
result = subprocess.run(
["systemctl", "kill", "--signal=SIGUSR1", svc],
capture_output=True, text=True
)
if result.returncode != 0:
continue
any_running = True
if not any_running:
print("No dnsmasq instances are running.")
return None
time.sleep(1)
server_map = {}
for vlan in data["vlans"]:
svc = vlan_service_name(vlan)
result = subprocess.run(
["journalctl", "-u", svc, "--since", "5 seconds ago",
"--no-pager", "-o", "cat"],
capture_output=True, text=True
)
for line in result.stdout.splitlines():
m = re.search(r"cache size \d+, (\d+)/\d+ cache insertions re-used", line)
if m:
metrics["cache_reused"] += int(m.group(1))
m = re.search(r"queries forwarded (\d+), queries answered locally (\d+)", line)
if m:
metrics["queries_forwarded"] += int(m.group(1))
metrics["queries_answered_locally"] += int(m.group(2))
m = re.search(r"queries for authoritative zones (\d+)", line)
if m:
metrics["queries_authoritative"] += int(m.group(1))
m = re.search(r"highest since last SIGUSR1 (\d+), max allowed (\d+)", line)
if m:
metrics["tcp_hwm"] = max(metrics["tcp_hwm"], int(m.group(1)))
metrics["tcp_max_allowed"] = max(metrics["tcp_max_allowed"], int(m.group(2)))
m = re.search(r"pool memory in use \d+, max (\d+)", line)
if m:
metrics["pool_memory_max"] = max(metrics["pool_memory_max"], int(m.group(1)))
m = re.search(
r"server (\S+): queries sent (\d+), retried (\d+), failed (\d+), "
r"nxdomain replies (\d+), avg\. latency (\d+)ms",
line
)
if m:
addr = m.group(1)
if addr not in server_map:
server_map[addr] = {
"address": addr, "queries_sent": 0, "retried": 0,
"failed": 0, "nxdomain": 0, "avg_latency_ms": 0
}
server_map[addr]["queries_sent"] += int(m.group(2))
server_map[addr]["retried"] += int(m.group(3))
server_map[addr]["failed"] += int(m.group(4))
server_map[addr]["nxdomain"] += int(m.group(5))
server_map[addr]["avg_latency_ms"] = int(m.group(6))
metrics["servers"] = list(server_map.values())
return metrics
def update_metrics_file(new_metrics):
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if METRICS_FILE.exists():
with open(METRICS_FILE) as f:
stored = json.load(f)
else:
stored = {
"metadata": {"first_recorded": now_str, "last_recorded": now_str, "total_updates": 0},
"totals": {
"queries_forwarded": 0, "queries_answered_locally": 0,
"queries_authoritative": 0, "cache_reused": 0,
"tcp_hwm": 0, "tcp_max_allowed": 0, "pool_memory_max": 0,
"dnssec_subqueries_hwm": 0, "dnssec_crypto_hwm": 0,
"dnssec_sig_fails_hwm": 0, "servers": []
}
}
t = stored["totals"]
t["queries_forwarded"] += new_metrics["queries_forwarded"]
t["queries_answered_locally"] += new_metrics["queries_answered_locally"]
t["queries_authoritative"] += new_metrics["queries_authoritative"]
t["cache_reused"] += new_metrics["cache_reused"]
t["tcp_hwm"] = max(t["tcp_hwm"], new_metrics["tcp_hwm"])
t["pool_memory_max"] = max(t["pool_memory_max"], new_metrics["pool_memory_max"])
t["dnssec_subqueries_hwm"] = max(t["dnssec_subqueries_hwm"], new_metrics["dnssec_subqueries_hwm"])
t["dnssec_crypto_hwm"] = max(t["dnssec_crypto_hwm"], new_metrics["dnssec_crypto_hwm"])
t["dnssec_sig_fails_hwm"] = max(t["dnssec_sig_fails_hwm"], new_metrics["dnssec_sig_fails_hwm"])
if new_metrics["tcp_max_allowed"]:
t["tcp_max_allowed"] = new_metrics["tcp_max_allowed"]
existing = {s["address"]: s for s in t["servers"]}
for srv in new_metrics["servers"]:
addr = srv["address"]
if addr in existing:
existing[addr]["queries_sent"] += srv["queries_sent"]
existing[addr]["retried"] += srv["retried"]
existing[addr]["failed"] += srv["failed"]
existing[addr]["nxdomain"] += srv["nxdomain"]
existing[addr]["avg_latency_ms"] = srv["avg_latency_ms"]
else:
existing[addr] = srv.copy()
t["servers"] = list(existing.values())
stored["metadata"]["last_recorded"] = now_str
stored["metadata"]["total_updates"] += 1
with open(METRICS_FILE, "w") as f:
json.dump(stored, f, indent=2)
chown_to_script_dir_owner(METRICS_FILE)
def show_metrics(data):
check_root()
new = collect_metrics(data)
if new is None:
return
update_metrics_file(new)
with open(METRICS_FILE) as f:
data_m = json.load(f)
m = data_m["metadata"]
t = data_m["totals"]
print("DNS Metrics (lifetime totals across all VLAN instances)")
print(f" First recorded : {m['first_recorded']}")
print(f" Last recorded : {m['last_recorded']}")
print(f" Total updates : {m['total_updates']}")
print()
print("Queries")
print(f" Forwarded to upstream : {t['queries_forwarded']:,}")
print(f" Answered from cache : {t['queries_answered_locally']:,}")
print(f" Authoritative : {t['queries_authoritative']:,}")
print(f" Cache reused : {t['cache_reused']:,}")
print()
print("TCP")
print(f" Peak concurrent (HWM) : {t['tcp_hwm']}")
print(f" Max allowed : {t['tcp_max_allowed']}")
print()
print(f"Pool memory peak : {t['pool_memory_max']} bytes")
if t["servers"]:
print()
print("Upstream servers")
for s in t["servers"]:
print(f" {s['address']}")
print(f" Sent : {s['queries_sent']:,}")
print(f" Retried : {s['retried']:,}")
print(f" Failed : {s['failed']:,}")
print(f" NXDOMAIN : {s['nxdomain']:,}")
print(f" Latency : {s['avg_latency_ms']}ms (last recorded)")
# ===================================================================
# Stop / disable
# ===================================================================
def stop_instances(data):
"""Remove timer and stop all per-VLAN instances (config files preserved)."""
remove_timer()
remove_dashboard_timer()
print()
for vlan in data["vlans"]:
svc = vlan_service_name(vlan)
subprocess.run(["systemctl", "disable", "--now", svc],
capture_output=True, text=True)
print(f"Stopped and disabled: {svc}")
def disable_all(data):
"""Full teardown: stop dnsmasq instances, remove nftables, remove all generated config files."""
stop_instances(data)
print()
for vlan in data["vlans"]:
for f in (vlan_conf_file(vlan), vlan_service_file(vlan)):
if f.exists():
f.unlink()
print(f"Removed: {f}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True)
print("systemd daemon reloaded.")
print()
print("-- Removing nftables rules -------------------------------------------")
delete_our_tables()
remove_nat_service()
if radius_enabled(data):
print()
print("-- Stopping RADIUS ---------------------------------------------------")
subprocess.run(["systemctl", "disable", "--now", "freeradius"],
capture_output=True, text=True)
print("freeradius stopped and disabled.")
if avahi_enabled(data):
print()
print("-- Stopping mDNS Reflector -------------------------------------------")
disable_avahi()
def _write_client_network(iface, dhcp, static_cidr=None):
"""Remove all router networkd files and write a plain client .network file."""
for pattern in (f"10-{PRODUCT_NAME}-*.network", f"10-{PRODUCT_NAME}-*.netdev"):
for f in NETWORKD_DIR.glob(pattern):
f.unlink()
print(f"Removed: {f}")
lines = [
"# Generated by core.py --disable -- do not edit manually.",
"",
"[Match]",
f"Name={iface}",
"",
"[Network]",
]
if dhcp:
lines.append("DHCP=yes")
else:
lines.append("DHCP=no")
lines.append(f"Address={static_cidr}")
lines.append("")
path = NETWORKD_DIR / f"10-client-{iface}.network"
path.write_text("\n".join(lines))
print(f"Written: {path}")
result = subprocess.run(["networkctl", "reload"], capture_output=True, text=True)
if result.returncode != 0:
print(f"WARNING: networkctl reload returned non-zero:\n{result.stderr.strip()}")
else:
print("systemd-networkd reloaded.")
def _configure_dns_resolved():
"""Re-enable systemd-resolved and restore the resolv.conf symlink."""
result = subprocess.run(
["systemctl", "enable", "--now", "systemd-resolved"],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"ERROR: Failed to enable systemd-resolved:\n{result.stderr.strip()}")
return False
RESOLV_CONF.unlink(missing_ok=True)
RESOLV_CONF.symlink_to("/run/systemd/resolve/stub-resolv.conf")
print("systemd-resolved enabled. /etc/resolv.conf restored as symlink to stub resolver.")
return True
def _configure_dns_static(nameserver):
"""Write a plain /etc/resolv.conf with a single user-specified nameserver."""
RESOLV_CONF.unlink(missing_ok=True)
RESOLV_CONF.write_text(f"nameserver {nameserver}\n")
print(f"Updated /etc/resolv.conf: nameserver {nameserver}")
def _suggest_static_ip(physical_vlan):
"""
Suggest a client static IP from the physical VLAN's subnet.
Prefers server_identity IPs whose last octet is not 1 (highest wins).
Falls back to a random unused IP in the subnet if all are .1.
"""
import random
network = network_for(physical_vlan)
prefix = network.prefixlen
identities = physical_vlan.get("server_identities", [])
known_ips = {ipaddress.IPv4Address(i["ip"]) for i in identities}
non_gateway = [ip for ip in known_ips if ip.packed[-1] != 1]
if non_gateway:
chosen = max(non_gateway, key=lambda ip: ip.packed[-1])
return f"{chosen}/{prefix}"
# All identities end in .1 - pick a random unused host in the subnet
hosts = list(network.hosts())
candidates = [h for h in hosts if h not in known_ips and h.packed[-1] != 1]
if candidates:
chosen = random.choice(candidates)
return f"{chosen}/{prefix}"
# Degenerate fallback - extremely small subnet
return f"{list(network.hosts())[0]}/{prefix}"
# ===================================================================
# Dry-run helpers
# ===================================================================
def _svc_state(unit):
"""Return 'active', 'inactive', or 'unknown' for a systemd unit."""
r = subprocess.run(["systemctl", "is-active", unit], capture_output=True, text=True)
return r.stdout.strip() or "unknown"
def _svc_enabled(unit):
"""Return True if the systemd unit is enabled."""
r = subprocess.run(["systemctl", "is-enabled", unit], capture_output=True, text=True)
return r.stdout.strip() in ("enabled", "enabled-runtime")
def _dry_run_conflicting_services(data):
print("-- Conflicting services (dry-run) ------------------------------------")
for unit, label in [("systemd-resolved", "systemd-resolved"),
("systemd-timesyncd", "systemd-timesyncd")]:
state = _svc_state(unit)
if state == "active":
print(f" Would stop and disable: {label} (currently: active)")
else:
print(f" {label}: not active - no action needed")
chrony_ok = subprocess.run(["systemctl", "cat", "chrony"],
capture_output=True, text=True).returncode == 0
if not chrony_ok:
print(" chrony: not installed - dependency check would have prompted to install it")
else:
chrony_conf = Path("/etc/chrony/chrony.conf")
if chrony_conf.exists():
content = chrony_conf.read_text()
subnets = [str(network_for(v)) for v in data["vlans"]]
missing = [s for s in subnets if f"allow {s}" not in content]
if missing:
print(f" Would add chrony allow directives for: {', '.join(missing)}")
else:
print(" chrony.conf already has required allow directives - no change needed")
print(f" Would enable and restart: chrony")
if subprocess.run(["which", "ufw"], capture_output=True, text=True).returncode == 0:
status = subprocess.run(["ufw", "status"], capture_output=True, text=True)
if "Status: active" in status.stdout:
print(" Would disable: ufw (currently: active)")
else:
print(" ufw: not active - no rule action needed")
if _svc_enabled("ufw"):
print(" Would disable: ufw.service (currently: enabled at boot)")
else:
print(" ufw.service: not enabled at boot - no action needed")
else:
print(" ufw: not installed - no action needed")
r = subprocess.run(["systemctl", "is-enabled", "dnsmasq"],
capture_output=True, text=True)
if r.stdout.strip() in ("enabled", "enabled-runtime"):
print(f" Would stop and disable: system dnsmasq.service (currently: enabled)")
else:
print(" system dnsmasq.service: not enabled - no action needed")
physical = next((v for v in data["vlans"] if is_physical(v)), None)
if physical:
gw = resolve_vlan_options(physical)["gateway"]
if RESOLV_CONF.is_symlink():
print(f" Would replace /etc/resolv.conf symlink with plain file: nameserver {gw}")
else:
wanted = f"nameserver {gw}\n"
current = RESOLV_CONF.read_text() if RESOLV_CONF.exists() else ""
if wanted not in current:
print(f" Would update /etc/resolv.conf: nameserver {gw}")
else:
print(f" /etc/resolv.conf already points to {gw} - no change needed")
def _dry_run_blocklists(data):
print("-- Blocklists (dry-run) ----------------------------------------------")
for entry in data.get("blocklists", []):
print(f" Would download: {entry['description']}")
print(f" URL: {entry['url']}")
seen = {}
for vlan in data["vlans"]:
names = vlan.get("use_blocklists", [])
if names:
h = combo_hash(names)
if h not in seen:
seen[h] = sorted(names)
path = merged_path(h)
action = "update" if path.exists() else "create"
print(f" Would {action} merged blocklist: {path}")
print(f" Sources: {', '.join(sorted(names))}")
def _dry_run_timer(data):
print("-- Timer (dry-run) ---------------------------------------------------")
general = data.get("general", {})
execute_time = general.get("daily_execute_time_24hr_local", "02:30")
for path, label in [(BLIST_TIMER_FILE, "timer unit"), (BLIST_TIMER_SVC_FILE, "service unit")]:
action = "update" if path.exists() else "create and enable"
print(f" Would {action}: {path}")
print(f" Schedule: daily at {execute_time} local time (Persistent=true - catches up if missed)")
def _dry_run_boot_service():
print("-- Boot service (dry-run) --------------------------------------------")
script_path = Path(__file__).resolve()
action = "update" if NAT_SERVICE_FILE.exists() else "create and enable"
print(f" Would {action}: {NAT_SERVICE_FILE}")
print(f" ExecStart: /usr/bin/python3 {script_path} --apply")
print(f" After: network-online.target docker.service")
print(f" WantedBy: multi-user.target (runs on every boot)")
def _dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, static_nameserver):
print()
print("[DRY RUN] Based on your selections, --disable would perform the following:")
print()
print(f"-- Stopping {PRODUCT_NAME} services (dry-run) --------------------------------")
print(f" Would disable and stop: {BLIST_TIMER_NAME}.timer")
for vlan in data["vlans"]:
svc = vlan_service_name(vlan)
conf = vlan_conf_file(vlan)
svc_f = vlan_service_file(vlan)
print(f" Would stop and disable: {svc}")
if conf.exists():
print(f" Would remove: {conf}")
if svc_f.exists():
print(f" Would remove: {svc_f}")
print(f" Would reload: systemd daemon")
for table in (f"{PRODUCT_NAME}-nat", f"{PRODUCT_NAME}-filter"):
r = subprocess.run(["nft", "list", "table", "ip", table],
capture_output=True, text=True)
if r.returncode == 0:
print(f" Would flush nftables table: {table}")
else:
print(f" nftables table {table}: not present - no action needed")
if NAT_SERVICE_FILE.exists():
print(f" Would stop, disable, and remove: {NAT_SERVICE_NAME}.service")
else:
print(f" {NAT_SERVICE_NAME}.service: not installed - no action needed")
print()
print("-- Restoring NTP client (dry-run) ------------------------------------")
state = _svc_state("chrony")
if state == "active":
print(f" Would stop and disable: chrony (currently: active)")
else:
print(f" chrony: not active - no action needed")
r = subprocess.run(["systemctl", "cat", "systemd-timesyncd"],
capture_output=True, text=True)
if r.returncode == 0:
print(f" Would enable and start: systemd-timesyncd")
else:
print(f" systemd-timesyncd: not available on this system")
print()
print("-- Network interface (dry-run) ----------------------------------------")
router_net = list(NETWORKD_DIR.glob(f"10-{PRODUCT_NAME}-*.network"))
router_dev = list(NETWORKD_DIR.glob(f"10-{PRODUCT_NAME}-*.netdev"))
client_file = NETWORKD_DIR / f"10-client-{iface}.network"
for f in router_net + router_dev:
print(f" Would remove: {f}")
print(f" Would write: {client_file}")
if use_dhcp:
print(f" [Match] Name={iface}")
print(f" [Network] DHCP=yes")
else:
print(f" [Match] Name={iface}")
print(f" [Network] DHCP=no Address={static_cidr}")
print(f" Would reload: systemd-networkd")
print()
if not resolv_ok:
print("-- DNS (dry-run) -----------------------------------------------------")
if dns_choice == "resolved":
print(" Would enable: systemd-resolved")
print(" Would restore: /etc/resolv.conf -> /run/systemd/resolve/stub-resolv.conf")
else:
print(f" Would write: /etc/resolv.conf")
print(f" nameserver {static_nameserver}")
print()
# ===================================================================
# Disable wizard
# ===================================================================
def cmd_disable(data, dry_run=False):
"""Interactive wizard to revert the machine from router to plain network client."""
import readline
data = resolve_vlan_derived_fields(data)
print()
print("=" * 70)
print(" REVERT TO NETWORK CLIENT" + (" [DRY RUN]" if dry_run else ""))
print("=" * 70)
print()
print(" You are reverting this machine from a gateway/router back to being")
print(" a plain network client. All router services, firewall rules, and")
print(" VLAN configuration will be removed.")
if dry_run:
print()
print(" DRY RUN: No changes will be made. This shows what would happen.")
print()
# ------------------------------------------------------------------
# Step 1 - Confirmation
# ------------------------------------------------------------------
while True:
print(" [1] Proceed with reversion")
print(" [2] Cancel")
choice = input(" Choice [1/2]: ").strip()
if choice == "2":
print("Cancelled.")
return
if choice == "1":
break
print(" Invalid choice. Enter 1 or 2.")
print()
# ------------------------------------------------------------------
# Step 2 - IP configuration
# ------------------------------------------------------------------
physical = next((v for v in data["vlans"] if is_physical(v)), None)
if physical is None:
die("No physical VLAN (vlan_id=1) found in config. Cannot determine interface.")
iface = physical["interface"]
print(" How should this machine obtain its IP address after reversion?")
print()
print(" [1] Obtain IP via DHCP (recommended - let the new router assign one)")
print(" [2] Use a static IP")
print()
use_dhcp = None
static_cidr = None
while True:
choice = input(" Choice [1/2]: ").strip()
if choice == "1":
use_dhcp = True
break
if choice == "2":
use_dhcp = False
break
print(" Invalid choice. Enter 1 or 2.")
if not use_dhcp:
print()
print(" WARNING: Do not assign an IP that will conflict with another")
print(" LAN device, especially the new gateway/router.")
print()
suggested = _suggest_static_ip(physical)
print(f" Suggested IP (edit as needed): {suggested}")
print()
while True:
try:
readline.set_startup_hook(lambda: readline.insert_text(suggested))
entry = input(" Static IP/prefix: ").strip()
finally:
readline.set_startup_hook(None)
if not entry:
print(" Cannot be empty.")
continue
try:
ipaddress.IPv4Interface(entry)
static_cidr = entry
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address/prefix (e.g. 192.168.1.50/24).")
print()
# ------------------------------------------------------------------
# Step 3 - DNS resolver
# ------------------------------------------------------------------
# If resolv.conf is already a plain file with no router gateway IPs, leave it alone.
gateway_ips = {resolve_vlan_options(v)["gateway"] for v in data["vlans"] if not is_wg(v)}
resolv_ok = False
if not RESOLV_CONF.is_symlink() and RESOLV_CONF.exists():
current_servers = {
parts[1] for line in RESOLV_CONF.read_text().splitlines()
if (parts := line.strip().split()) and parts[0] == "nameserver"
}
if current_servers and not current_servers.intersection(gateway_ips):
resolv_ok = True
static_nameserver = None # set if user chooses manual entry
dns_choice = None # "resolved" or "static"
if resolv_ok:
print(" /etc/resolv.conf already contains client-appropriate DNS settings.")
print(" Leaving it as-is.")
print()
else:
resolved_available = subprocess.run(
["systemctl", "cat", "systemd-resolved"],
capture_output=True, text=True
).returncode == 0
print(" How should DNS resolution be handled after reversion?")
print()
if resolved_available:
print(" [1] Re-enable systemd-resolved (recommended - adapts to any network)")
print(" [2] Enter a static nameserver IP")
while True:
choice = input(" Choice [1/2]: ").strip()
if choice == "1":
dns_choice = "resolved"
break
if choice == "2":
dns_choice = "static"
break
print(" Invalid choice. Enter 1 or 2.")
else:
print(" systemd-resolved is not installed on this system.")
print(" A static nameserver IP will be used.")
dns_choice = "static"
if dns_choice == "static":
print()
while True:
entry = input(" Nameserver IP: ").strip()
if not entry:
print(" Cannot be empty.")
continue
try:
ipaddress.IPv4Address(entry)
static_nameserver = entry
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address.")
print()
# ------------------------------------------------------------------
# Step 4 - Execute (or dry-run summary)
# ------------------------------------------------------------------
if dry_run:
_dry_run_disable(data, iface, use_dhcp, static_cidr, resolv_ok, dns_choice, static_nameserver)
return
print(f"-- Stopping {PRODUCT_NAME} services ------------------------------------------")
disable_all(data)
print()
print("-- Restoring NTP client ----------------------------------------------")
restore_ntp()
print()
print("-- Configuring network interface -------------------------------------")
_write_client_network(iface, dhcp=use_dhcp, static_cidr=static_cidr)
print()
if not resolv_ok:
print("-- Configuring DNS ---------------------------------------------------")
if dns_choice == "static":
_configure_dns_static(static_nameserver)
else:
if not _configure_dns_resolved():
print("Failed to re-enable systemd-resolved. Please enter a nameserver IP.")
while True:
entry = input(" Nameserver IP: ").strip()
try:
ipaddress.IPv4Address(entry)
_configure_dns_static(entry)
break
except ValueError:
print(f" '{entry}' is not a valid IPv4 address.")
print()
print("Done. This machine is now configured as a network client.")
if use_dhcp:
print(f" Interface {iface} will obtain its IP via DHCP.")
else:
print(f" Interface {iface} will use static IP: {static_cidr}")
# ===================================================================
# Main
# ===================================================================
def cmd_apply(data, dry_run=False):
"""--apply: full apply. Handles conflicting services, networkd (if changed),
dnsmasq confs, start/restart all services whose interface is up, nftables,
timer, and boot service. Safe to run repeatedly.
"""
data = resolve_vlan_derived_fields(data)
if dry_run:
print("[DRY RUN] --apply would perform the following actions:")
print()
_dry_run_conflicting_services(data)
print()
print("-- systemd-networkd (dry-run) ----------------------------------------")
apply_networkd(data, dry_run=True)
print()
print("-- dnsmasq instances (dry-run) ---------------------------------------")
apply_dnsmasq_instances(data, dry_run=True, start_if_needed=True)
print()
print("-- nftables (dry-run) ------------------------------------------------")
apply_nftables(data, dry_run=True)
print()
_dry_run_timer(data)
print()
_dry_run_boot_service()
if radius_enabled(data):
print()
print("-- RADIUS (dry-run) --------------------------------------------------")
num_clients = len(radius_clients(data))
default_vlan = next((v for v in data["vlans"] if v.get("radius_default") is True), None)
total_macs = sum(
len([r for r in v.get("reservations", []) if r.get("enabled") is True])
for v in data["vlans"]
)
print(f" Would write: {RADIUS_CLIENTS_CONF}")
print(f" {num_clients} RADIUS client(s)")
print(f" Would write: {RADIUS_USERS_FILE}")
print(f" {total_macs} MAC reservation(s)")
if default_vlan:
print(f" DEFAULT -> VLAN {default_vlan['vlan_id']} ({default_vlan['name']})")
print(f" Would ensure freeradius is running")
if avahi_enabled(data):
print()
print("-- mDNS Reflection (dry-run) -----------------------------------------")
ifaces = avahi_interfaces(data)
print(f" Would write: {AVAHI_CONF_FILE}")
print(f" Reflecting across: {', '.join(ifaces)}")
print(f" Would ensure avahi-daemon is running")
return
check_root()
total_enabled = sum(
len([r for r in v.get("reservations", []) if r.get("enabled") is True])
for v in data["vlans"] if not is_wg(v)
)
total_disabled = sum(
len([r for r in v.get("reservations", []) if r.get("enabled") is not True])
for v in data["vlans"] if not is_wg(v)
)
total_wg_peers = sum(len(v.get("peers", [])) for v in data["vlans"] if is_wg(v))
wg_part = f", {total_wg_peers} WG peer(s)" if total_wg_peers else ""
print(f"Applying config: {len(data['vlans'])} VLAN(s), "
f"{total_enabled} reservation(s), {total_disabled} skipped{wg_part}.")
print()
print("-- Conflicting services ----------------------------------------------")
disable_systemd_timesyncd()
ensure_chrony(data)
disable_ufw()
print()
print("-- systemd-networkd --------------------------------------------------")
apply_networkd(data, only_if_changed=True)
print()
if any(is_wg(v) for v in data["vlans"]):
print("-- WireGuard interfaces ----------------------------------------------")
ensure_wg_interfaces(data)
print()
print("-- dnsmasq instances -------------------------------------------------")
if not blocklists_available(data):
print(" NOTE: No merged blocklist files found -- blocklist rules will be absent.")
print(" Run --update-blocklists to download and merge blocklists.")
apply_dnsmasq_instances(data, start_if_needed=True)
print()
print("-- nftables ----------------------------------------------------------")
apply_nftables(data)
print()
print("-- Timer -------------------------------------------------------------")
install_timer(data)
print()
print("-- Dashboard timer ---------------------------------------------------")
install_dashboard_timer()
print()
print("-- Boot service ------------------------------------------------------")
install_nat_service()
print()
if radius_enabled(data):
print("-- RADIUS ------------------------------------------------------------")
apply_radius(data)
print()
else:
svc = "freeradius"
if subprocess.run(["systemctl", "is-active", svc],
capture_output=True, text=True).stdout.strip() == "active":
print("-- RADIUS ------------------------------------------------------------")
subprocess.run(["systemctl", "disable", "--now", svc],
capture_output=True, text=True)
print("freeradius stopped and disabled (no radius_client reservations).")
print()
if avahi_enabled(data):
print("-- mDNS Reflection ---------------------------------------------------")
apply_avahi(data)
print()
else:
svc = "avahi-daemon"
if subprocess.run(["systemctl", "is-active", svc],
capture_output=True, text=True).stdout.strip() == "active":
print("-- mDNS Reflection ---------------------------------------------------")
disable_avahi()
print()
print("Done.")
def cmd_update_blocklists(data):
"""--update-blocklists: download and merge blocklists. On success, call
cmd_apply to reload dnsmasq instances with the new blocklists.
"""
check_root()
print("-- Updating blocklists -----------------------------------------------")
success = update_blocklists(data)
print()
if success:
print("-- Applying updated configs ------------------------------------------")
cmd_apply(data)
else:
print("WARNING: Blocklist update had errors -- skipping --apply.")
print(" Existing merged files (if any) are unchanged.")
def main():
parser = argparse.ArgumentParser(
description="Apply core.json to systemd-networkd, per-VLAN dnsmasq instances, and nftables",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" sudo python3 core.py --apply Apply full config (idempotent, safe to re-run)\n"
" sudo python3 core.py --update-blocklists Refresh blocklists and apply\n"
" sudo python3 core.py --status Show service and timer status\n"
" sudo python3 core.py --view-configs Show active per-VLAN dnsmasq config files\n"
" sudo python3 core.py --view-leases Show active DHCP leases\n"
" sudo python3 core.py --view-rules Show active nftables ruleset\n"
" sudo python3 core.py --disable Stop instances, remove nftables, remove all config files\n"
" python3 core.py --view-metrics Show lifetime DNS metrics\n"
"\n"
" [--dry-run] may be combined with --apply or --disable\n"
" to preview all actions verbosely without making any changes:\n"
" sudo python3 core.py --apply --dry-run\n"
" sudo python3 core.py --disable --dry-run\n"
)
)
parser.add_argument("--apply", action="store_true", help="Apply full config: services, networkd, dnsmasq, nftables, timer, boot service")
parser.add_argument("--update-blocklists", action="store_true", help="Refresh blocklists and apply (used by timer)")
parser.add_argument("--dry-run", action="store_true", help="Preview all actions without making changes (combine with --apply or --disable)")
parser.add_argument("--status", action="store_true", help="Show service and timer status")
parser.add_argument("--view-configs", action="store_true", help="Show active per-VLAN dnsmasq config files")
parser.add_argument("--view-leases", action="store_true", help="Show active DHCP leases")
parser.add_argument("--reset-leases", nargs="?", const="__all__", metavar="VLAN",
help="Reset DHCP leases (stop dnsmasq, delete lease files, restart). "
"Optionally specify a VLAN name to reset only that VLAN.")
parser.add_argument("--view-rules", action="store_true", help="Show active nftables ruleset")
parser.add_argument("--disable", action="store_true", help="Stop instances, remove nftables, remove all config files")
parser.add_argument("--view-metrics", action="store_true", help="Show lifetime DNS metrics across all instances")
args = parser.parse_args()
update_blocklists_flag = getattr(args, "update_blocklists", False)
if not any([args.apply, update_blocklists_flag,
args.dry_run, args.status, args.view_configs, args.view_leases,
args.view_rules, args.disable, args.view_metrics,
args.reset_leases]):
parser.print_help()
sys.exit(0)
if args.dry_run and not any([args.apply, args.disable]):
print("ERROR: --dry-run must be combined with --apply or --disable.", file=sys.stderr)
sys.exit(1)
data = load_config()
errors = validate_config(data)
if errors:
print("Validation failed:", file=sys.stderr)
for e in errors:
print(f" - {e}", file=sys.stderr)
sys.exit(1)
general = data.get("general", {})
setup_logging(
general.get("log_max_kb", 1024),
general.get("log_errors_only", False)
)
if args.status:
show_status(data)
return
if args.view_configs:
show_configs(data)
return
if args.view_leases:
show_leases(data)
return
if args.reset_leases:
vlan_name = None if args.reset_leases == "__all__" else args.reset_leases
reset_leases(data, vlan_name)
return
if args.view_rules:
show_rules()
return
if args.view_metrics:
show_metrics(data)
return
if args.disable:
if not args.dry_run:
check_root()
cmd_disable(data, dry_run=args.dry_run)
return
if update_blocklists_flag:
cmd_update_blocklists(data)
return
if args.apply:
cmd_apply(data, dry_run=args.dry_run)
return
if __name__ == "__main__":
main()