Development

This commit is contained in:
Matthew Grotke 2026-05-23 04:14:58 -04:00
parent b99ea35f79
commit 5149e5a035
10 changed files with 197 additions and 213 deletions

View file

@ -2,10 +2,10 @@
"""
ddns.py -- Update DDNS provider(s) with current public IP.
Reads ddns.json, fetches the current public IP, and updates
each enabled provider block only if the IP has changed since the
last successful update for that provider.
Designed to be run on a systemd timer.
Reads the ddns block from core.json, fetches the current public IP,
and updates each enabled provider block only if the IP has changed
since the last successful update for that provider.
Designed to be run on a systemd timer managed by core.py --apply.
IP check services are rotated each run using .ddns-last-service so
no single provider is spammed. If the selected service fails, the
@ -16,11 +16,8 @@ Logs to ddns.log in the same directory as this script.
Log is cleared when it exceeds general.log_max_kb from config.
Usage:
sudo python3 ddns.py --start Run update and install systemd timer
sudo python3 ddns.py --disable Stop updates and remove systemd timer
python3 ddns.py --apply Run update once (used by timer)
python3 ddns.py --force Force update regardless of cached IP
python3 ddns.py --status Show timer/service status
python3 ddns.py --getip Print current public IP and exit
"""
@ -35,12 +32,9 @@ import logging
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
CONFIG_FILE = SCRIPT_DIR / "ddns.json"
CONFIG_FILE = SCRIPT_DIR / "core.json"
CACHE_SERVICE_FILE = SCRIPT_DIR / ".ddns-last-service"
LOG_FILE = SCRIPT_DIR / "ddns.log"
TIMER_NAME = "ddns-update"
SERVICE_FILE = Path(f"/etc/systemd/system/{TIMER_NAME}.service")
TIMER_FILE = Path(f"/etc/systemd/system/{TIMER_NAME}.timer")
# log is assigned in setup_logging() after config is loaded
log = None
@ -54,21 +48,21 @@ def load_config():
print(f"ERROR: Config file not found: {CONFIG_FILE}", file=sys.stderr)
sys.exit(1)
with open(CONFIG_FILE) as f:
data = json.load(f)
data = json.load(f).get("ddns", {})
# Validate general block
required_general = {"log_max_kb", "log_errors_only", "ip_check_services"}
missing = required_general - set(data.get("general", {}).keys())
if missing:
print(f"ERROR: Missing keys in general block: {missing}", file=sys.stderr)
print(f"ERROR: Missing keys in ddns.general block: {missing}", file=sys.stderr)
sys.exit(1)
if not data["general"]["ip_check_services"]:
print("ERROR: ip_check_services list is empty.", file=sys.stderr)
print("ERROR: ddns.ip_check_services list is empty.", file=sys.stderr)
sys.exit(1)
# Validate providers block
if not data.get("providers"):
print("ERROR: No providers defined in config.", file=sys.stderr)
print("ERROR: No DDNS providers defined in config.", file=sys.stderr)
sys.exit(1)
for p in data["providers"]:
base_required = {"description", "provider", "enabled"}
@ -473,98 +467,12 @@ def process_provider(provider, current_ip, force=False):
save_cached_ip(description, current_ip)
# ===================================================================
# Timer management
# ===================================================================
def parse_interval(interval_str):
"""
Convert interval string (e.g. 5m, 2h, 1d) to systemd OnUnitActiveSec value.
Supported units: m (minutes), h (hours), d (days).
"""
interval_str = interval_str.strip()
if interval_str.endswith("m"):
return f"{interval_str[:-1]}min"
elif interval_str.endswith("h"):
return f"{interval_str[:-1]}h"
elif interval_str.endswith("d"):
return f"{interval_str[:-1]}day"
else:
print(f"ERROR: Invalid timer_interval format: '{interval_str}'. Use e.g. 5m, 2h, 1d.")
sys.exit(1)
def get_current_timer_interval():
"""Read the current OnUnitActiveSec value from the timer file, or None if not present."""
if not TIMER_FILE.exists():
return None
for line in TIMER_FILE.read_text().splitlines():
if line.strip().startswith("OnUnitActiveSec="):
return line.strip().split("=", 1)[1]
return None
def install_timer(cfg):
interval = cfg["general"].get("timer_interval", "5m")
systemd_unit = parse_interval(interval)
script_path = Path(__file__).resolve()
current_interval = get_current_timer_interval()
if current_interval == systemd_unit:
log.info(f"Timer already set to {interval}, no update needed.")
return
service_content = f"""[Unit]
Description=DDNS update service
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 {script_path} --apply
"""
timer_content = f"""[Unit]
Description=DDNS update timer
[Timer]
OnUnitActiveSec={systemd_unit}
OnBootSec=1min
[Install]
WantedBy=timers.target
"""
SERVICE_FILE.write_text(service_content)
TIMER_FILE.write_text(timer_content)
subprocess.run(["systemctl", "daemon-reload"], check=True)
subprocess.run(["systemctl", "enable", "--now", f"{TIMER_NAME}.timer"], check=True)
if current_interval is None:
log.info(f"Timer installed: runs every {interval}.")
else:
log.info(f"Timer updated: was {current_interval}, now runs every {interval}.")
def remove_timer():
if TIMER_FILE.exists() or SERVICE_FILE.exists():
subprocess.run(
["systemctl", "disable", "--now", f"{TIMER_NAME}.timer"],
capture_output=True
)
for f in (TIMER_FILE, SERVICE_FILE):
if f.exists():
f.unlink()
print(f"Removed: {f}")
subprocess.run(["systemctl", "daemon-reload"], capture_output=True)
print("Timer removed.")
else:
print("No timer found, nothing to remove.")
# ===================================================================
# Main
# ===================================================================
def run_update(cfg, force=False, getip_only=False):
"""Perform a single DDNS update pass. Called by both timer and --start.
"""Perform a single DDNS update pass.
If force=True, bypasses the cached IP check and always updates.
If getip_only=True, prints the detected public IP and returns without updating providers."""
general = cfg["general"]
@ -583,14 +491,6 @@ def run_update(cfg, force=False, getip_only=False):
for provider in enabled:
process_provider(provider, current_ip, force=force)
def show_status():
"""Show status of managed timer."""
result = subprocess.run(
["systemctl", "status", f"{TIMER_NAME}.timer", "--no-pager"],
capture_output=True, text=True
)
print(result.stdout)
def main():
@ -600,31 +500,21 @@ def main():
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"examples:\n"
" sudo python3 ddns.py --start Run update and install systemd timer\n"
" sudo python3 ddns.py --disable Stop updates and remove systemd timer\n"
" python3 ddns.py --apply Run update once (used by timer)\n"
" python3 ddns.py --force Force update regardless of cached IP\n"
" python3 ddns.py --status Show timer/service status\n"
" python3 ddns.py --getip Print current public IP and exit\n"
)
)
parser.add_argument("--start", action="store_true", help="Run update and install systemd timer")
parser.add_argument("--disable", action="store_true", help="Stop updates and remove systemd timer")
parser.add_argument("--apply", action="store_true", help="Run update once (used by timer)")
parser.add_argument("--force", action="store_true", help="Force update regardless of cached IP")
parser.add_argument("--status", action="store_true", help="Show timer/service status")
parser.add_argument("--getip", action="store_true", help="Print current public IP and exit")
parser.add_argument("--apply", action="store_true", help="Run update once (used by timer)")
parser.add_argument("--force", action="store_true", help="Force update regardless of cached IP")
parser.add_argument("--getip", action="store_true", help="Print current public IP and exit")
args = parser.parse_args()
if not any([args.start, args.disable, args.apply, args.force, args.status, args.getip]):
if not any([args.apply, args.force, args.getip]):
parser.print_help()
return
if args.status:
show_status()
return
if args.getip:
global log
log = logging.getLogger("ddns_quiet")
@ -638,15 +528,6 @@ def main():
general = cfg["general"]
setup_logging(general["log_max_kb"], general["log_errors_only"])
if args.disable:
remove_timer()
return
if args.start:
run_update(cfg)
install_timer(cfg)
return
if args.apply or args.force:
run_update(cfg, force=args.force)