#!/usr/bin/env python3 """ maintenance.py -- Periodic maintenance tasks run by the routlin-maintenance systemd timer. Tasks performed on each run: 1. DDNS: fetch current public IP and update enabled provider(s) if changed. 2. FreeRADIUS log rotation: truncate radius.log if it exceeds radius.general.log_max_kb. Reads config.json in the same directory. Designed to be invoked by core.py --apply via the routlin-maintenance.timer systemd timer. IP check services are rotated each run using .ddns-last-service so no single provider is spammed. If the selected service fails, the script falls back through the remaining services in order. Per-provider cache files are named .ddns-last-ip-. DDNS activity is logged to ddns.log in the same directory as this script. DDNS log is cleared when it exceeds ddns.general.log_max_kb from config. Usage: python3 maintenance.py --update Run all tasks once (used by timer) python3 maintenance.py --force Force DDNS update regardless of cached IP python3 maintenance.py --getip Print current public IP and exit """ import ipaddress import json import os import subprocess import re import urllib.request import urllib.error import sys import logging from pathlib import Path import mod_metrics as metrics import mod_dns_queries as dns_queries SCRIPT_DIR = Path(__file__).parent CONFIG_FILE = SCRIPT_DIR / "config.json" CACHE_SERVICE_FILE = SCRIPT_DIR / ".ddns-last-service" LOG_FILE = SCRIPT_DIR / "ddns.log" RADIUS_LOG_FILE = Path("/var/log/freeradius/radius.log") ARP_CACHE_FILE = Path("/var/lib/misc/arp-cache.json") # log is assigned in setup_logging() after config is loaded log = None # =================================================================== # Load config # =================================================================== def load_config(): if not CONFIG_FILE.exists(): print(f"ERROR: Config file not found: {CONFIG_FILE}", file=sys.stderr) sys.exit(1) with open(CONFIG_FILE) as f: full = json.load(f) data = full.get("ddns", {}) # Validate general block required_general = {"log_max_kb", "log_errors_only"} missing = required_general - set(data.get("general", {}).keys()) if missing: print(f"ERROR: Missing keys in ddns.general block: {missing}", file=sys.stderr) sys.exit(1) services = data.get("ip_check_services", []) if not services: print("ERROR: ddns.general.ip_check_services is empty.", file=sys.stderr) sys.exit(1) for svc in services: if not isinstance(svc, dict) or "type" not in svc: print(f"ERROR: ip_check_services entry missing 'type': {svc}", file=sys.stderr) sys.exit(1) if svc["type"] == "http" and "url" not in svc: print(f"ERROR: ip_check_services 'http' entry missing 'url': {svc}", file=sys.stderr) sys.exit(1) if svc["type"] == "dig" and "url" not in svc: print(f"ERROR: ip_check_services 'dig' entry missing 'url': {svc}", file=sys.stderr) sys.exit(1) # Validate providers block if not data.get("providers"): print("ERROR: No DDNS providers defined in config.", file=sys.stderr) sys.exit(1) for p in data["providers"]: base_required = {"description", "provider", "enabled"} missing = base_required - set(p.keys()) if missing: print(f"ERROR: Provider '{p.get('description', '?')}' missing keys: {missing}", file=sys.stderr) sys.exit(1) ptype = p.get("provider", "").lower() if ptype == "noip": extra = {"username", "password", "hostnames"} elif ptype == "duckdns": extra = {"api_token", "hostnames"} elif ptype == "cloudflare": extra = {"api_token", "hostnames"} else: print(f"ERROR: Provider '{p.get('description', '?')}' has unknown provider type: '{ptype}'", file=sys.stderr) sys.exit(1) missing = extra - set(p.keys()) if missing: print(f"ERROR: Provider '{p.get('description', '?')}' missing keys for {ptype}: {missing}", file=sys.stderr) sys.exit(1) data['_radius'] = full.get("radius", {}) return data # =================================================================== # Helpers # =================================================================== def chown_to_script_dir_owner(path): """Chown a file to the owner of the script directory. This works correctly whether invoked via sudo, directly as root (e.g. systemd timer), or as a normal user - the script directory owner is always the right target. """ try: stat = SCRIPT_DIR.stat() os.chown(path, stat.st_uid, stat.st_gid) except OSError: pass # non-fatal # =================================================================== # Logging # =================================================================== def setup_logging(max_kb, errors_only): """Clear log if oversized, then initialise logger. Must be called before log is used.""" global log max_bytes = int(max_kb * 1024) try: if LOG_FILE.exists() and LOG_FILE.stat().st_size > max_bytes: LOG_FILE.write_text("") if not LOG_FILE.exists(): LOG_FILE.touch() chown_to_script_dir_owner(LOG_FILE) file_handler = logging.FileHandler(LOG_FILE) except PermissionError: print(f"WARNING: Cannot write to {LOG_FILE} (permission denied). " f"Run with sudo or fix ownership: sudo chown $USER {LOG_FILE}") file_handler = None level = logging.ERROR if errors_only else logging.INFO handlers = [logging.StreamHandler(sys.stdout)] if file_handler: handlers.insert(0, file_handler) logging.basicConfig( level=level, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=handlers, ) log = logging.getLogger("ddns") # =================================================================== # Per-provider IP cache # =================================================================== def cache_file_for(description): """Return the cache file path for a given provider description.""" safe_name = description.replace(" ", "-") return SCRIPT_DIR / f".ddns-last-ip-{safe_name}" def get_cached_ip(description): f = cache_file_for(description) if f.exists(): return f.read_text().strip() return None def save_cached_ip(description, ip): f = cache_file_for(description) f.write_text(ip) chown_to_script_dir_owner(f) # =================================================================== # Service rotation # =================================================================== def get_next_service_index(total): """Read last used index, increment, wrap around, return next index.""" if CACHE_SERVICE_FILE.exists(): try: last = int(CACHE_SERVICE_FILE.read_text().strip()) except ValueError: last = -1 else: last = -1 return (last + 1) % total def save_service_index(index): CACHE_SERVICE_FILE.write_text(str(index)) chown_to_script_dir_owner(CACHE_SERVICE_FILE) # =================================================================== # Public IP detection # =================================================================== def _extract_ip(body): """Extract an IPv4 address from an HTTP response body. Handles plain text, key=value (e.g. Cloudflare /cdn-cgi/trace), and HTML. """ for line in body.splitlines(): if line.startswith("ip="): candidate = line[3:].strip() if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', candidate): return candidate plain = body.strip() if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', plain): return plain match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', body) return match.group(1) if match else None def _get_ip_via_http(spec): """Fetch public IP from an HTTP endpoint. spec: {"type": "http", "url": "..."}""" req = urllib.request.Request(spec["url"], headers={"User-Agent": "ddns-update/1.0"}) with urllib.request.urlopen(req, timeout=10) as r: return _extract_ip(r.read().decode().strip()) _SAFE_DIG_RE = re.compile(r'^[a-zA-Z0-9.\-_@+:\s]+$') def _get_ip_via_dig(spec): """Query public IP via dig. spec: {"type": "dig", "url": ""} Requires the 'dig' utility to be installed. """ url = spec["url"] if not _SAFE_DIG_RE.match(url): log.warning(f"Skipping dig service with disallowed characters: {url!r}") return None cmd = ["dig", "+short"] + url.split() try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0: return None match = re.search(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', result.stdout) if match: return match.group(1) except FileNotFoundError: log.warning("'dig' not found; cannot use dig IP check service.") except Exception: pass return None # =================================================================== def get_public_ip(services): """ Start at the next service in rotation. If it fails, fall through the remaining services in order. Saves the index of the service that succeeded so the next run starts with the following one. """ total = len(services) start = get_next_service_index(total) ordered = [services[(start + i) % total] for i in range(total)] for i, spec in enumerate(ordered): stype = spec.get("type", "http") label = spec.get("url", "?") try: if stype == "dig": ip = _get_ip_via_dig(spec) else: ip = _get_ip_via_http(spec) if ip: save_service_index((start + i) % total) log.info(f"Public IP retrieved from {label}: {ip}") return ip except Exception as ex: log.warning(f"IP check failed for {label}: {ex}") continue log.error("Could not determine public IP from any configured service.") sys.exit(1) # =================================================================== # No-IP update # =================================================================== def update_noip(provider, ip): """ No-IP HTTP update API. Docs: https://www.noip.com/integrate/request Uses HTTP Basic Auth. Supports comma-separated list of hostnames. """ username = provider["username"] password = provider["password"] hostnames = ",".join(provider["hostnames"]) url = f"https://dynupdate.no-ip.com/nic/update?hostname={hostnames}&myip={ip}" password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm() password_mgr.add_password(None, url, username, password) handler = urllib.request.HTTPBasicAuthHandler(password_mgr) opener = urllib.request.build_opener(handler) req = urllib.request.Request(url, headers={"User-Agent": "ddns-update/1.0"}) try: with opener.open(req, timeout=10) as r: return r.read().decode().strip() except urllib.error.URLError as e: log.error(f"Network error contacting No-IP: {e}") return None def interpret_noip_response(response, hostnames, ip): """ No-IP response codes: good -- update successful nochg -- IP already set to this value (no change needed) nohost -- hostname not found in account badauth -- invalid credentials badagent -- client blocked !donator -- feature requires paid account abuse -- account blocked for abuse 911 -- server-side error, retry later """ if response is None: return False if response.startswith("good"): log.info(f"No-IP updated successfully: {hostnames} -> {ip}") return True elif response.startswith("nochg"): log.info(f"No-IP: no change needed ({hostnames} already set to {ip})") return True elif response == "nohost": log.error(f"No-IP: hostname '{hostnames}' not found in account.") elif response == "badauth": log.error(f"No-IP: authentication failed for '{hostnames}'. Check username and password.") elif response == "badagent": log.error("No-IP: client blocked by No-IP.") elif response == "!donator": log.error("No-IP: this feature requires a paid account.") elif response == "abuse": log.error("No-IP: account blocked for abuse.") elif response == "911": log.error("No-IP: server error. Will retry on next run.") else: log.error(f"No-IP: unexpected response: {response}") return False # =================================================================== # DuckDNS update # =================================================================== def update_duckdns(provider, ip): """ DuckDNS HTTP update API. Docs: https://www.duckdns.org/spec.jsp Token-based, no username/password. Subdomains are the short name only (e.g. "myhome", not "myhome.duckdns.org"). Supports multiple subdomains as a comma-separated list. Returns True on success, False on failure. """ token = provider["api_token"] subdomains = ",".join(h.replace(".duckdns.org", "") for h in provider["hostnames"]) description = provider["description"] url = f"https://www.duckdns.org/update?domains={subdomains}&token={token}&ip={ip}" try: req = urllib.request.Request(url, headers={"User-Agent": "ddns-update/1.0"}) with urllib.request.urlopen(req, timeout=10) as r: response = r.read().decode().strip() if response == "OK": log.info(f"DuckDNS updated successfully: {subdomains} -> {ip}") return True else: log.error(f"DuckDNS update failed for '{description}': response was '{response}'") return False except urllib.error.URLError as e: log.error(f"Network error contacting DuckDNS: {e}") return False # =================================================================== # Cloudflare DNS update # =================================================================== def _cf_api_get(url, headers): req = urllib.request.Request(url, headers=headers) try: with urllib.request.urlopen(req, timeout=10) as r: return json.loads(r.read().decode()) except Exception as e: log.error(f"Cloudflare API GET error ({url}): {e}") return None def _cf_get_zone_id(zone_name, headers): data = _cf_api_get( f"https://api.cloudflare.com/client/v4/zones?name={zone_name}", headers ) if data and data.get("success") and data["result"]: return data["result"][0]["id"] return None def _cf_get_record_id(zone_id, hostname, headers): data = _cf_api_get( f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records?name={hostname}&type=A", headers, ) if data and data.get("success") and data["result"]: return data["result"][0]["id"] return None def update_cloudflare(provider, ip): """ Cloudflare DNS update API. Docs: https://developers.cloudflare.com/api/resources/dns/subresources/records/methods/edit/ Bearer-token auth. Looks up zone and record IDs dynamically, then PATCHes each A record. """ token = provider["api_token"] headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": "ddns-update/1.0", } success = True for hostname in provider["hostnames"]: zone_name = ".".join(hostname.split(".")[-2:]) zone_id = _cf_get_zone_id(zone_name, headers) if not zone_id: log.error(f"Cloudflare: zone '{zone_name}' not found in account.") success = False continue record_id = _cf_get_record_id(zone_id, hostname, headers) if not record_id: log.error(f"Cloudflare: A record for '{hostname}' not found in zone '{zone_name}'.") success = False continue url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}" payload = json.dumps({"content": ip}).encode() req = urllib.request.Request(url, data=payload, headers=headers, method="PATCH") try: with urllib.request.urlopen(req, timeout=10) as r: data = json.loads(r.read().decode()) if data.get("success"): log.info(f"Cloudflare updated successfully: {hostname} -> {ip}") else: log.error(f"Cloudflare update failed for '{hostname}': {data.get('errors')}") success = False except Exception as e: log.error(f"Cloudflare API PATCH error for '{hostname}': {e}") success = False return success # =================================================================== # Process a single provider block # =================================================================== def process_provider(provider, current_ip, force=False): description = provider["description"] if not provider.get("enabled") is True: log.info(f"Provider '{description}' is disabled, skipping.") return cached_ip = get_cached_ip(description) if not force and current_ip == cached_ip: log.info(f"[{description}] IP unchanged ({current_ip}), skipping update.") return if force: log.info(f"[{description}] Force update requested. Updating with {current_ip}...") elif cached_ip: log.info(f"[{description}] IP changed: {cached_ip} -> {current_ip}. Updating...") else: log.info(f"[{description}] No cached IP found. Updating with {current_ip}...") ptype = provider["provider"].lower() if ptype == "noip": hostnames = ",".join(provider["hostnames"]) response = update_noip(provider, current_ip) success = interpret_noip_response(response, hostnames, current_ip) elif ptype == "duckdns": success = update_duckdns(provider, current_ip) elif ptype == "cloudflare": success = update_cloudflare(provider, current_ip) else: log.error(f"[{description}] Unknown provider type: '{ptype}'") return if success: save_cached_ip(description, current_ip) # =================================================================== # FreeRADIUS log rotation # =================================================================== def _clear_radius_log_dir(log_dir, reason): """Delete all files in log_dir and print reason.""" try: files = [p for p in log_dir.iterdir() if p.is_file()] if not files: return for p in files: try: p.unlink() except PermissionError: print(f"WARNING: Cannot delete {p} (permission denied).") except OSError as e: print(f"WARNING: Error deleting {p}: {e}") print(f"FreeRADIUS logs cleared ({reason}).") except PermissionError: print(f"WARNING: Cannot read {log_dir} (permission denied).") except OSError as e: print(f"WARNING: Error clearing FreeRADIUS log dir: {e}") def rotate_radius_log(radius_cfg): """Clear the FreeRADIUS log dir if logging is disabled or total size exceeds log_max_kb.""" general = radius_cfg.get("general", {}) log_dir = RADIUS_LOG_FILE.parent if not log_dir.exists(): return if not general.get("logging", False): _clear_radius_log_dir(log_dir, "logging disabled") return max_kb = general.get("log_max_kb", 1024) max_bytes = int(max_kb * 1024) try: files = [p for p in log_dir.iterdir() if p.is_file()] total = sum(p.stat().st_size for p in files) if total > max_bytes: _clear_radius_log_dir(log_dir, f"total {total // 1024} KB exceeded {max_kb} KB") except PermissionError: print(f"WARNING: Cannot read {log_dir} (permission denied).") except OSError as e: print(f"WARNING: Error checking FreeRADIUS log dir: {e}") # =================================================================== # Main # =================================================================== ARP_MAX_AGE_SECS = 4 * 3600 def refresh_arp_cache(cfg): try: with open(CONFIG_FILE) as f: full_cfg = json.load(f) except Exception: full_cfg = {} vlan_networks = [] for v in full_cfg.get('vlans', []): subnet = v.get('subnet') mask = v.get('subnet_mask') if subnet and mask: try: vlan_networks.append(ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False)) except ValueError: pass try: result = subprocess.run(['ip', '-stats', 'neigh'], capture_output=True, text=True, timeout=5) best = {} # mac -> (used_secs, entry_dict) for line in result.stdout.splitlines(): parts = line.split() if 'lladdr' not in parts: continue if ':' in parts[0]: # skip IPv6 continue try: addr = ipaddress.IPv4Address(parts[0]) if vlan_networks and not any(addr in n for n in vlan_networks): continue except ValueError: continue iface = parts[2] if len(parts) > 2 else '' if iface.startswith('br-') or iface == 'docker0': continue state = parts[-1] if state in ('FAILED', 'PERMANENT', 'NOARP', 'INCOMPLETE'): continue used_match = re.search(r'used\s+(\d+)/', line) used_secs = int(used_match.group(1)) if used_match else 0 if state != 'REACHABLE' and used_secs > ARP_MAX_AGE_SECS: continue idx = parts.index('lladdr') mac = parts[idx + 1].lower() if mac not in best or used_secs < best[mac][0]: best[mac] = (used_secs, {'ip': parts[0], 'state': 'REACHABLE'}) ARP_CACHE_FILE.write_text(json.dumps({m: e for m, (_, e) in best.items()})) except Exception as exc: print(f"WARNING: Could not refresh ARP cache: {exc}") def run_update(cfg, force=False, getip_only=False): """Perform a single DDNS update pass. If force=True, bypasses the cached IP check and always updates. If getip_only=True, prints the detected public IP and returns without updating providers.""" current_ip = get_public_ip(cfg["ip_check_services"]) if getip_only: print(current_ip) return enabled = [p for p in cfg["providers"] if p.get("enabled") is True] if not enabled: log.error("No enabled providers found in config.") sys.exit(1) for provider in enabled: process_provider(provider, current_ip, force=force) def main(): import argparse parser = argparse.ArgumentParser( description="Routlin periodic maintenance (DDNS update + log rotation)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "examples:\n" " python3 maintenance.py --update Run all tasks once (used by timer)\n" " python3 maintenance.py --force Force DDNS update regardless of cached IP\n" " python3 maintenance.py --getip Print current public IP and exit\n" ) ) parser.add_argument("--update", action="store_true", help="Run all tasks once (used by timer)") parser.add_argument("--force", action="store_true", help="Force DDNS update regardless of cached IP") parser.add_argument("--getip", action="store_true", help="Print current public IP and exit") args = parser.parse_args() if not any([args.update, args.force, args.getip]): parser.print_help() return if args.getip: global log log = logging.getLogger("ddns_quiet") log.addHandler(logging.NullHandler()) log.propagate = False cfg = load_config() run_update(cfg, getip_only=True) return cfg = load_config() general = cfg["general"] setup_logging(general["log_max_kb"], general["log_errors_only"]) if args.update or args.force: run_update(cfg, force=args.force) rotate_radius_log(cfg.get("_radius", {})) refresh_arp_cache(cfg) try: with open(CONFIG_FILE) as f: full_cfg = json.load(f) new_metrics = metrics.collect_metrics(full_cfg) if new_metrics: metrics.update_metrics_db(new_metrics) except Exception as e: log.warning(f"DNS metrics collection failed: {e}") try: with open(CONFIG_FILE) as f: full_cfg = json.load(f) inserted = dns_queries.collect(full_cfg) if inserted: log.info(f"DNS query collector: inserted {inserted} new rows.") except Exception as e: log.warning(f"DNS query collection failed: {e}") if __name__ == "__main__": main()