""" mod_dnsmasq.py -- dnsmasq instance configuration, service management, and leases. Handles blocklist merging, per-VLAN dnsmasq config and systemd service unit generation, applying/reloading instances, system service conflict resolution (systemd-resolved, dnsmasq, chrony, ufw), and DHCP lease display. """ import json import logging import sqlite3 import subprocess import time from datetime import datetime from pathlib import Path import mod_shared as shared import mod_wireguard as wireguard import mod_validation as validation BLOCKLIST_DIR = shared.SCRIPT_DIR / "blocklists" DB_FILE = BLOCKLIST_DIR / "domains.db" LOG_FILE = BLOCKLIST_DIR / ".log" RESOLV_CONF = Path("/etc/resolv.conf") _log = logging.getLogger("blocklists") # =================================================================== # Blocklist management # =================================================================== def vlan_hosts_file(vlan): """Stable per-VLAN hosts file in the system dnsmasq config dir (world-readable, accessible after dnsmasq drops privileges from root to the dnsmasq user).""" return shared.DNSMASQ_CONF_DIR / f"for-{vlan['name']}.hosts" def blocklists_available(data): """Return True if at least one per-VLAN hosts file is non-empty.""" for vlan in data.get("vlans", []): if vlan.get("use_blocklists"): f = vlan_hosts_file(vlan) if f.exists() and f.stat().st_size > 0: return True return False # =================================================================== # Blocklist parse / detect # =================================================================== def _parse_dnsmasq_format(content): domains = set() for ln in content.splitlines(): ln = ln.strip() if not ln or ln.startswith("#"): continue if ln.startswith("local=/"): domain = ln.removeprefix("local=/").rstrip("/") if domain: domains.add(domain) elif ln.startswith("address=/"): parts = ln.removeprefix("address=/").split("/") if parts: domains.add(parts[0]) return domains def _parse_hosts_format(content): domains = set() for ln in content.splitlines(): ln = ln.strip() if not ln or ln.startswith("#"): continue parts = ln.split() if len(parts) >= 2: domains.add(parts[1]) return domains def _parse_local_format(content): domains = set() for ln in content.splitlines(): ln = ln.strip() if ln and not ln.startswith("#"): domains.add(ln) return domains def _detect_format(content): for ln in content.splitlines(): ln = ln.strip() if not ln or ln.startswith("#"): continue if ln.startswith("local=/") or ln.startswith("address=/"): return "dnsmasq" if ln[0].isdigit(): return "hosts" return "dnsmasq" def _parse_blocklist(content, is_local=False): if is_local: return _parse_local_format(content) fmt = _detect_format(content) if fmt == "dnsmasq": return _parse_dnsmasq_format(content) return _parse_hosts_format(content) # =================================================================== # Blocklist SQLite # =================================================================== def _open_db(): db = sqlite3.connect(DB_FILE) db.execute("PRAGMA journal_mode=WAL") db.execute("PRAGMA foreign_keys=ON") db.executescript(""" CREATE TABLE IF NOT EXISTS blocklists ( id INTEGER PRIMARY KEY, name TEXT UNIQUE NOT NULL, mtime REAL, fetched_at INTEGER, domain_count INTEGER ); CREATE TABLE IF NOT EXISTS domains ( domain TEXT NOT NULL, blocklist_id INTEGER NOT NULL REFERENCES blocklists(id) ON DELETE CASCADE, PRIMARY KEY (domain, blocklist_id) ); CREATE INDEX IF NOT EXISTS idx_domains_domain ON domains(domain); """) db.commit() return db def _get_stored_mtime(db, name): row = db.execute("SELECT mtime FROM blocklists WHERE name = ?", (name,)).fetchone() return row[0] if row else None def _upsert_blocklist(db, name, domains, mtime): now = int(time.time()) db.execute(""" INSERT INTO blocklists (name, mtime, fetched_at, domain_count) VALUES (?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET mtime = excluded.mtime, fetched_at = excluded.fetched_at, domain_count = excluded.domain_count """, (name, mtime, now, len(domains))) bl_id = db.execute("SELECT id FROM blocklists WHERE name = ?", (name,)).fetchone()[0] db.execute("DELETE FROM domains WHERE blocklist_id = ?", (bl_id,)) db.executemany("INSERT INTO domains (domain, blocklist_id) VALUES (?, ?)", ((d, bl_id) for d in domains)) db.commit() def _query_merged_domains(db, names): placeholders = ",".join("?" * len(names)) rows = db.execute(f""" SELECT DISTINCT d.domain FROM domains d JOIN blocklists b ON d.blocklist_id = b.id WHERE b.name IN ({placeholders}) ORDER BY d.domain """, list(names)).fetchall() return [r[0] for r in rows] # =================================================================== # Blocklist merge and SIGHUP # =================================================================== def _build_merged_hosts(domains, bl_names): lines = [ "# Generated by core.py -- do not edit manually.", f"# Blocklists: {', '.join(sorted(bl_names))}", f"# Domains: {len(domains):,}", "", ] for domain in domains: lines.append(f"0.0.0.0 {domain}") return "\n".join(lines) + "\n" def setup_blocklist_logging(general): """Configure file + stdout logging for blocklists logger.""" max_kb = general.get("log_max_kb", 1024) errors_only = general.get("log_errors_only", False) try: if LOG_FILE.exists() and LOG_FILE.stat().st_size > max_kb * 1024: LOG_FILE.write_text("") if not LOG_FILE.exists(): LOG_FILE.touch() file_handler = logging.FileHandler(LOG_FILE) except PermissionError: print(f"WARNING: Cannot write to {LOG_FILE} -- run with sudo.") file_handler = None level = logging.ERROR if errors_only else logging.INFO handlers = [logging.StreamHandler()] if file_handler: handlers.insert(0, file_handler) logging.basicConfig( level=level, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=handlers, force=True, ) def update_blocklist_hosts(data): """Parse downloaded/local blocklist files, upsert into SQLite, and write per-VLAN hosts files (0.0.0.0 format). Always writes every VLAN's file (empty if no blocklists assigned) so addn-hosts= in the dnsmasq conf is always valid and SIGHUP can update it without a restart. Returns True on full success, False if any fetch/parse failed. """ BLOCKLIST_DIR.mkdir(exist_ok=True) db = _open_db() bl_library = {bl["name"]: bl for bl in data.get("dns_blocking", {}).get("blocklists", [])} needed = set() for vlan in data.get("vlans", []): needed.update(vlan.get("use_blocklists", [])) changed = set() any_fail = False for name in needed: if name not in bl_library: _log.warning(f"Blocklist '{name}' referenced by a VLAN but not defined -- skipping") continue entry = bl_library[name] is_local = entry.get("bl_type") == "local" save_as = entry.get("save_as", "") try: path = BLOCKLIST_DIR / save_as if save_as else None if not path or not path.exists(): _log.warning(f"'{name}': file not found ({path}) -- skipping") any_fail = True continue current_mtime = path.stat().st_mtime except Exception as e: _log.error(f"Failed to stat '{name}': {e}") any_fail = True continue if current_mtime == _get_stored_mtime(db, name): _log.info(f"Unchanged: '{name}' -- skipping") continue try: raw = path.read_text("utf-8", errors="ignore") except Exception as e: _log.error(f"Failed to read '{name}': {e}") any_fail = True continue domains = _parse_blocklist(raw, is_local=is_local) _upsert_blocklist(db, name, domains, current_mtime) _log.info(f"Updated '{name}': {len(domains):,} domains") changed.add(name) active_vlan_names = set() for vlan in data.get("vlans", []): vlan_name = vlan["name"] active_vlan_names.add(vlan_name) bl_names = [n for n in vlan.get("use_blocklists", []) if n in bl_library] hosts_file = vlan_hosts_file(vlan) if not bl_names: if hosts_file.exists(): hosts_file.write_text("") continue if not changed.intersection(bl_names) and hosts_file.exists(): _log.info(f"VLAN '{vlan_name}' blocklists unchanged -- skipping rewrite") continue domains = _query_merged_domains(db, bl_names) hosts_file.write_text(_build_merged_hosts(domains, bl_names)) _log.info(f"VLAN '{vlan_name}': wrote {len(domains):,} domains from [{', '.join(sorted(bl_names))}]") active_vlan_names = {v["name"] for v in data.get("vlans", [])} for f in shared.DNSMASQ_CONF_DIR.glob("for-*.hosts"): vlan_name = f.stem.removeprefix("for-") if vlan_name not in active_vlan_names: f.unlink() _log.info(f"Removed stale hosts file: {f.name}") db.close() return not any_fail def sighup_all_instances(): """Send SIGHUP to every active dnsmasq-routlin-* instance to reload addn-hosts files without restarting. No DNS or DHCP interruption.""" result = subprocess.run( ["systemctl", "list-units", "--state=active", "--no-legend", "--plain", f"dnsmasq-{shared.PRODUCT_NAME}-*.service"], capture_output=True, text=True, ) units = [line.split()[0] for line in result.stdout.splitlines() if line.strip()] if not units: print(" No active dnsmasq instances found.") return for unit in units: r = subprocess.run(["systemctl", "kill", "--signal=SIGHUP", unit], capture_output=True, text=True) if r.returncode == 0: print(f" Reloaded: {unit}") else: print(f" WARNING: Failed to SIGHUP {unit}: {r.stderr.strip()}") # =================================================================== # Build per-VLAN dnsmasq config # =================================================================== def _wan_has_ipv6(iface): """Return True if the WAN interface has a non-link-local IPv6 address.""" try: result = subprocess.run( ["ip", "-6", "addr", "show", iface, "scope", "global"], capture_output=True, text=True ) return bool(result.stdout.strip()) except Exception: return False def get_container_bridge_ips(): """Return {ifname: ip} for all active container bridge interfaces. Used to add listen-address directives to the physical VLAN's dnsmasq instance so containers can reach the local DNS resolver. Works universally for Docker, Podman, LXC, libvirt, etc. """ try: result = subprocess.run( ["ip", "-j", "addr", "show", "type", "bridge"], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return {} links = json.loads(result.stdout) out = {} for l in links: if l.get("operstate") != "UP": continue for addr in l.get("addr_info", []): if addr.get("family") == "inet": out[l["ifname"]] = addr["local"] break return out except Exception: return {} def build_vlan_dnsmasq_conf(vlan, data, iface): """Generate the complete dnsmasq config for one VLAN instance.""" dns_cfg = data.get("upstream_dns", {}) general = data.get("network_interfaces", {}) overrides = [o for o in data.get("host_overrides", []) if o.get("enabled") is True] name = vlan["name"] d = vlan.get("dhcp_information", {}) opts = shared.resolve_vlan_options(vlan) gateway = opts["gateway"] bl_names = vlan.get("use_blocklists", []) hosts_file = vlan_hosts_file(vlan) if bl_names else None L = [ "# Generated by core.py -- do not edit manually.", "# Edit config.json and re-run: sudo python3 core.py --apply", f"# VLAN: {name} (vlan_id={vlan.get('vlan_id')})", "", ] L.append(f"pid-file={shared.vlan_pid_file(vlan)}") if not validation.is_wg(vlan): L.append(f"dhcp-leasefile={shared.vlan_leases_file(vlan)}") L += [ "except-interface=lo", "bind-interfaces", f"listen-address={gateway}", f"interface={iface}", ] if shared.is_physical(vlan): bridge_ips = get_container_bridge_ips() for bridge, ip in bridge_ips.items(): L += [f"interface={bridge}", f"listen-address={ip}"] L.append("") if not validation.is_wg(vlan): dotted_mask = shared.prefix_to_dotted(vlan['subnet_mask']) L += [ "# -- DHCP -----------------------------------------------------------", f"dhcp-range=set:{name},{d['dynamic_pool_start']},{d['dynamic_pool_end']},{dotted_mask},{d['lease_time']}", f"domain={d.get('domain', 'local')}", "", f"dhcp-option=tag:{name},option:router,{gateway}", f"dhcp-option=tag:{name},option:dns-server,{opts['dns_servers']}", f"dhcp-option=tag:{name},option:ntp-server,{opts['ntp_servers']}", "", ] identity_hosts = [s for s in vlan.get("server_identities", []) if s.get("hostname")] if identity_hosts: L.append("# -- Server identity hostnames ----------------------------------") for s in identity_hosts: L += [f"# {s['description']}", f"dhcp-host={s['ip']},{s['hostname']}", ""] vlan_res = [r for r in data.get("dhcp_reservations", []) if r.get("vlan") == name] active_res = [r for r in vlan_res if r.get("enabled") is True] inactive_res = [r for r in vlan_res if r.get("enabled") is not True] if active_res: L.append("# -- Reservations -----------------------------------------------") # Group reservations sharing a static IP into single dhcp-host lines # (multiple MACs for the same device e.g. wired + WiFi interfaces) # Dynamic reservations are always emitted individually. seen_ips = {} # ip -> list of reservations ordered = [] # preserves insertion order for output for r in active_res: if validation.is_dynamic_ip(r): ordered.append([r]) # always individual else: ip = r.get("ip", "") if ip in seen_ips: seen_ips[ip].append(r) else: seen_ips[ip] = [r] ordered.append(seen_ips[ip]) for group in ordered: if len(group) == 1: r = group[0] h = r.get('hostname', '') L.append(f"# {r['description']}") if validation.is_dynamic_ip(r): L.append(f"dhcp-host=set:{name},{r['mac']},{h},{d['lease_time']}" if h else f"dhcp-host=set:{name},{r['mac']},{d['lease_time']}") else: L.append(f"dhcp-host=set:{name},{r['mac']},{r['ip']},{h},{d['lease_time']}" if h else f"dhcp-host=set:{name},{r['mac']},{r['ip']},{d['lease_time']}") else: # Multiple MACs share the same IP -- combine into one dhcp-host line descs = ", ".join(r['description'] for r in group) macs = ",".join(r['mac'] for r in group) ip = group[0]['ip'] hostname = group[0].get('hostname', '') L.append(f"# {descs}") L.append(f"dhcp-host=set:{name},{macs},{ip},{hostname},{d['lease_time']}" if hostname else f"dhcp-host=set:{name},{macs},{ip},{d['lease_time']}") L.append("") if inactive_res: L.append("# -- Skipped reservations (enabled: false) ----------------------") for r in inactive_res: L.append(f"# SKIPPED: {r['description']} ({r.get('mac', '?')} -> {r.get('ip', '?')})") L.append("") L += [ "# -- DNS ------------------------------------------------------------", "no-resolv", ] if dns_cfg.get("strict_order"): L.append("strict-order") wan = data["network_interfaces"]["wan_interface"] wan_has_ipv6 = _wan_has_ipv6(wan) for srv in dns_cfg.get("upstream_servers", []): if ":" in srv and not wan_has_ipv6: continue # skip IPv6 upstream -- WAN has no IPv6 address L.append(f"server={srv}") L.append(f"cache-size={dns_cfg.get('cache_size', 1000)}") if vlan.get("dnsmasq_log_queries", False): L.append("log-queries") L.append("") if overrides: L.append("# -- Host overrides -------------------------------------------------") for o in overrides: L += [f"# {o['description']}", f"address=/{o['host']}/{o['ip']}", ""] if hosts_file and hosts_file.exists(): L += [ "# -- Blocklist ------------------------------------------------------", f"addn-hosts={hosts_file}", "", ] elif bl_names: L += ["# Blocklist not yet merged -- run: sudo python3 core.py --merge-blocklists", ""] return "\n".join(L) # =================================================================== # Build per-VLAN systemd service unit # =================================================================== def build_vlan_service(vlan, iface): name = vlan["name"] conf = shared.vlan_conf_file(vlan) if validation.is_wg(vlan): after = f"network-online.target wg-quick@{iface}.service" wants = "network-online.target" bindsto = f"wg-quick@{iface}.service" else: after = "network-online.target" wants = "network-online.target" bindsto = None lines = [ "# Generated by core.py -- do not edit manually.", "", "[Unit]", f"Description=dnsmasq for VLAN {name}", f"After={after}", f"Wants={wants}", ] if bindsto: lines.append(f"BindsTo={bindsto}") lines += [ "", "[Service]", "Type=forking", f"PIDFile={shared.vlan_pid_file(vlan)}", f"ExecStart=/usr/sbin/dnsmasq --conf-file={conf}", "ExecReload=/bin/kill -HUP $MAINPID", "Restart=on-failure", "RestartSec=5s", "", "[Install]", "WantedBy=multi-user.target", "", ] return "\n".join(lines) # =================================================================== # System service helpers # =================================================================== def ensure_resolv_conf(data): """Ensure /etc/resolv.conf points to the physical VLAN gateway (vlan_id=1).""" physical = next((v for v in data["vlans"] if shared.is_physical(v)), None) if physical is None: return nameserver = shared.resolve_vlan_options(physical)["gateway"] wanted = f"nameserver {nameserver}\n" # A symlink (e.g. to systemd-resolved stub) must be replaced with a plain file. if RESOLV_CONF.is_symlink(): RESOLV_CONF.unlink() print("Removed /etc/resolv.conf symlink (was pointing to systemd-resolved stub).") current = RESOLV_CONF.read_text() if RESOLV_CONF.exists() else "" if wanted in current: print(f"/etc/resolv.conf already points to {nameserver}. Good.") return RESOLV_CONF.write_text(wanted) print(f"Updated /etc/resolv.conf: nameserver {nameserver}") def disable_systemd_resolved(): """Stop and disable systemd-resolved if it is active.""" result = subprocess.run( ["systemctl", "is-active", "systemd-resolved"], capture_output=True, text=True ) if result.stdout.strip() == "active": subprocess.run(["systemctl", "disable", "--now", "systemd-resolved"], capture_output=True, text=True) print("Disabled systemd-resolved.") else: print("systemd-resolved is not active. Good.") def disable_systemd_timesyncd(): """Stop and disable systemd-timesyncd if it is active.""" result = subprocess.run( ["systemctl", "is-active", "systemd-timesyncd"], capture_output=True, text=True ) if result.stdout.strip() == "active": subprocess.run(["systemctl", "disable", "--now", "systemd-timesyncd"], capture_output=True, text=True) print("Disabled systemd-timesyncd.") else: print("systemd-timesyncd is not active. Good.") def ensure_chrony(data): """Add VLAN allow directives to chrony.conf and start the service.""" chrony_conf = Path("/etc/chrony/chrony.conf") if chrony_conf.exists(): content = chrony_conf.read_text() subnets = [] for v in data["vlans"]: subnets.append(str(shared.network_for(v))) added = [] for subnet in subnets: line = f"allow {subnet}" if line not in content: content += f"\n{line}" added.append(subnet) if added: chrony_conf.write_text(content) print(f"Updated /etc/chrony/chrony.conf: added allow for {', '.join(added)}") else: print("chrony.conf already has required allow directives. Good.") subprocess.run(["systemctl", "enable", "--now", "chrony"], capture_output=True, text=True) subprocess.run(["systemctl", "restart", "chrony"], capture_output=True, text=True) print("chrony enabled and running. Good.") def disable_ufw(): """Disable ufw (without removing it) if it is installed.""" if subprocess.run(["which", "ufw"], capture_output=True, text=True).returncode != 0: print("ufw is not installed. Good.") return status = subprocess.run(["ufw", "status"], capture_output=True, text=True) if "Status: active" in status.stdout: subprocess.run(["ufw", "disable"], capture_output=True, text=True) print("ufw rules cleared.") else: print("ufw is not active. Good.") svc = subprocess.run(["systemctl", "is-enabled", "ufw"], capture_output=True, text=True) if svc.stdout.strip() in ("enabled", "enabled-runtime"): subprocess.run(["systemctl", "disable", "ufw"], capture_output=True, text=True) print("Disabled ufw.service.") def disable_system_dnsmasq(data): """Stop and disable the system dnsmasq.service if it is enabled.""" disable_systemd_resolved() result = subprocess.run( ["systemctl", "is-enabled", "dnsmasq"], capture_output=True, text=True ) if result.stdout.strip() in ("enabled", "enabled-runtime"): subprocess.run(["systemctl", "disable", "--now", "dnsmasq"], capture_output=True, text=True) print("Disabled system dnsmasq.service.") else: print("System dnsmasq.service is already disabled. Good.") ensure_resolv_conf(data) def restore_ntp(): """Disable chrony and re-enable systemd-timesyncd for plain client NTP.""" result = subprocess.run( ["systemctl", "is-active", "chrony"], capture_output=True, text=True ) if result.stdout.strip() == "active": subprocess.run(["systemctl", "disable", "--now", "chrony"], capture_output=True, text=True) print("Disabled chrony.") else: print("chrony is not active.") result = subprocess.run( ["systemctl", "cat", "systemd-timesyncd"], capture_output=True, text=True ) if result.returncode == 0: subprocess.run(["systemctl", "enable", "--now", "systemd-timesyncd"], capture_output=True, text=True) print("Enabled systemd-timesyncd.") else: print("systemd-timesyncd is not available on this system.") # =================================================================== # Apply dnsmasq instances # =================================================================== def apply_dnsmasq_instances(data, dry_run=False, start_if_needed=True): """Write per-VLAN dnsmasq configs and service units. start_if_needed=True: enable and start all instances. start_if_needed=False (--apply): only restart instances already running; skip with a warning if not running. """ active_service_stems = {shared.vlan_service_name(vlan, validation.derive_interface(vlan, data)) for vlan in data["vlans"]} if not dry_run: shared.DNSMASQ_CONF_DIR.mkdir(exist_ok=True) disable_system_dnsmasq(data) print() print("Updating blocklist hosts files ======================================") update_blocklist_hosts(data) print() disable_system_dnsmasq(data) print() for vlan in data["vlans"]: iface = validation.derive_interface(vlan, data) if validation.is_wg(vlan) and not dry_run and not wireguard.wg_interface_up(iface): print(f"Skipped VLAN '{vlan['name']}': {iface} is not up. Run --apply again after WireGuard is up.") continue conf_content = build_vlan_dnsmasq_conf(vlan, data, iface) svc_content = build_vlan_service(vlan, iface) conf_path = shared.vlan_conf_file(vlan) svc_path = shared.vlan_service_file(vlan, iface) if dry_run: print(f"# -- {conf_path} (dry-run) --") print(conf_content) print(f"# -- {svc_path} (dry-run) --") print(svc_content) continue conf_path.write_text(conf_content) print(f"Written: {conf_path}") if not svc_path.exists() or svc_path.read_text() != svc_content: svc_path.write_text(svc_content) print(f"Written: {svc_path}") if dry_run: return print() subprocess.run(["systemctl", "daemon-reload"], capture_output=True, text=True) # Remove stale service units (VLANs removed from config) for f in shared.SYSTEMD_DIR.glob(f"dnsmasq-{shared.PRODUCT_NAME}-*.service"): if f.stem not in active_service_stems: subprocess.run(["systemctl", "disable", "--now", f.stem], capture_output=True, text=True) f.unlink() n = f.stem.removeprefix(f"dnsmasq-{shared.PRODUCT_NAME}-") stale_conf = shared.DNSMASQ_CONF_DIR / f"{n}.conf" if stale_conf.exists(): stale_conf.unlink() print(f"Removed stale VLAN: {f.stem}") if start_if_needed: print("Starting dnsmasq instances...") for vlan in data["vlans"]: iface = validation.derive_interface(vlan, data) if validation.is_wg(vlan) and not wireguard.wg_interface_up(iface): continue svc = shared.vlan_service_name(vlan, iface) subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True) result = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True) if result.returncode == 0: print(f" Started: {svc}") else: shared.service_warning("start", svc, result.stderr) else: print("Reloading dnsmasq instances...") for vlan in data["vlans"]: iface = validation.derive_interface(vlan, data) if validation.is_wg(vlan) and not wireguard.wg_interface_up(iface): continue svc = shared.vlan_service_name(vlan, iface) state = subprocess.run( ["systemctl", "is-active", svc], capture_output=True, text=True ).stdout.strip() if state == "active": result = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True) if result.returncode == 0: print(f" Restarted: {svc}") else: shared.service_warning("restart", svc, result.stderr) elif validation.is_wg(vlan): # WG interface is up but dnsmasq isn't running -- start it now subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True) result = subprocess.run(["systemctl", "start", svc], capture_output=True, text=True) if result.returncode == 0: print(f" Started: {svc}") else: shared.service_warning("start", svc, result.stderr) else: print(f" WARNING: {svc} is not running -- skipping (run --apply to start it)") # =================================================================== # Leases # =================================================================== def reset_leases(data, vlan_name=None): """Stop dnsmasq instances, delete lease files, restart instances. If vlan_name is given, only reset that VLAN. Otherwise reset all. """ vlans = [v for v in data["vlans"] if not validation.is_wg(v)] if vlan_name: vlans = [v for v in vlans if v["name"] == vlan_name] if not vlans: valid = ', '.join(v['name'] for v in data['vlans'] if not validation.is_wg(v)) return f"Unknown VLAN name '{vlan_name}'. Valid names: {valid}" print(f"Resetting leases for: {', '.join(v['name'] for v in vlans)}") print() # Stop for vlan in vlans: svc = shared.vlan_service_name(vlan, validation.derive_interface(vlan, data)) result = subprocess.run(["systemctl", "stop", svc], capture_output=True, text=True) if result.returncode == 0: print(f" Stopped: {svc}") else: print(f" WARNING: Could not stop {svc}: {result.stderr.strip()}") # Delete lease files print() for vlan in vlans: lf = shared.vlan_leases_file(vlan) if lf.exists(): lf.unlink() print(f" Deleted: {lf}") else: print(f" No lease file: {lf}") # Restart print() for vlan in vlans: svc = shared.vlan_service_name(vlan, validation.derive_interface(vlan, data)) result = subprocess.run(["systemctl", "start", svc], capture_output=True, text=True) if result.returncode == 0: print(f" Started: {svc}") else: print(f" WARNING: Could not start {svc}: {result.stderr.strip()}") print() print("Done. Devices will get fresh leases on their next DHCP request.") def show_leases(data): # Build MAC -> reservation lookup across all VLANs vlan_by_name = {v["name"]: v for v in data.get("vlans", [])} res_by_mac = {} for r in data.get("dhcp_reservations", []): if r.get("enabled") is True: mac = r.get("mac", "").lower().strip() if mac: res_by_mac[mac] = (r, vlan_by_name.get(r.get("vlan", ""), {})) now = int(datetime.now().timestamp()) any_leases = False for vlan in data["vlans"]: if validation.is_wg(vlan): continue lf = shared.vlan_leases_file(vlan) if not lf.exists(): continue lines = lf.read_text().strip().splitlines() if not lines: continue if not any_leases: print(f"{'IP':<18} {'MAC':<20} {'HOSTNAME':<26} {'VLAN':<8} {'EXPIRES':<22} {'TIME LEFT':<18} {'TYPE':<8} {'DESCRIPTION'}") print("-" * 145) any_leases = True for entry in lines: parts = entry.split() if len(parts) < 4: continue expire_ts = parts[0] mac = parts[1] ip = parts[2] hostname = parts[3] if parts[3] != "*" else "(unknown)" hostname = hostname[:26] if expire_ts == "0": expires_str = "permanent" time_left = "" else: expire_int = int(expire_ts) seconds = expire_int - now if seconds <= 0: expires_str = "expired" time_left = "" else: hours, rem = divmod(seconds, 3600) mins, _ = divmod(rem, 60) expire_dt = datetime.fromtimestamp(expire_int) expires_str = expire_dt.strftime("%Y-%m-%d %H:%M:%S") time_left = f"{hours}h {mins}m" match = res_by_mac.get(mac.lower()) lease_type = "static" if (match and not validation.is_dynamic_ip(match[0])) else "dynamic" description = match[0].get("description", "") if match else "" print(f"{ip:<18} {mac:<20} {hostname:<26} {vlan['name']:<8} {expires_str:<22} {time_left:<18} {lease_type:<8} {description}") if not any_leases: print("No active leases found.")