Development

This commit is contained in:
Matthew Grotke 2026-05-23 23:48:11 -04:00
parent bccb260ed0
commit 30fbcdb64c
3 changed files with 61 additions and 108 deletions

View file

@ -51,14 +51,25 @@ def load_config():
data = json.load(f).get("ddns", {})
# Validate general block
required_general = {"log_max_kb", "log_errors_only", "ip_check_services"}
required_general = {"log_max_kb", "log_errors_only"}
missing = required_general - set(data.get("general", {}).keys())
if missing:
print(f"ERROR: Missing keys in ddns.general block: {missing}", file=sys.stderr)
sys.exit(1)
if not data["general"]["ip_check_services"]:
print("ERROR: ddns.ip_check_services list is empty.", file=sys.stderr)
services = data.get("ip_check_services", [])
if not services:
print("ERROR: ddns.general.ip_check_services is empty.", file=sys.stderr)
sys.exit(1)
for svc in services:
if not isinstance(svc, dict) or "type" not in svc:
print(f"ERROR: ip_check_services entry missing 'type': {svc}", file=sys.stderr)
sys.exit(1)
if svc["type"] == "http" and "url" not in svc:
print(f"ERROR: ip_check_services 'http' entry missing 'url': {svc}", file=sys.stderr)
sys.exit(1)
if svc["type"] == "dig" and "url" not in svc:
print(f"ERROR: ip_check_services 'dig' entry missing 'url': {svc}", file=sys.stderr)
sys.exit(1)
# Validate providers block
if not data.get("providers"):
@ -176,46 +187,43 @@ def save_service_index(index):
# Public IP detection
# ===================================================================
def extract_ip(body):
def _extract_ip(body):
"""Extract an IPv4 address from an HTTP response body.
Handles plain text, key=value (e.g. Cloudflare /cdn-cgi/trace), and HTML.
"""
Extract an IP address from a service response.
Handles plain text, key=value format (e.g. Cloudflare /cdn-cgi/trace where
the ip= line is the caller's IP while h= is the server's IP), and HTML.
"""
# Check for key=value format first (e.g. /cdn-cgi/trace)
for line in body.splitlines():
if line.startswith("ip="):
candidate = line[3:].strip()
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', candidate):
return candidate
# Try plain text (strip and validate)
plain = body.strip()
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', plain):
return plain
# Fall back to extracting from HTML
match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', body)
if match:
return match.group(1)
return None
return match.group(1) if match else None
def _get_ip_via_cf_dns(spec):
"""Query Cloudflare's myip.cloudflare via DNS TXT (chaos class) for the caller's IP.
spec format: 'cf-dns:<hostname>' e.g. 'cf-dns:myip.cloudflare'
def _get_ip_via_http(spec):
"""Fetch public IP from an HTTP endpoint. spec: {"type": "http", "url": "..."}"""
req = urllib.request.Request(spec["url"], headers={"User-Agent": "ddns-update/1.0"})
with urllib.request.urlopen(req, timeout=10) as r:
return _extract_ip(r.read().decode().strip())
def _get_ip_via_dig(spec):
"""Query public IP via dig. spec: {"type": "dig", "command": "<dig args>"}
Requires the 'dig' utility to be installed.
"""
hostname = spec[len("cf-dns:"):]
cmd = ["dig", "+short", "@1.1.1.1", "chaos", "txt", hostname]
cmd = ["dig", "+short"] + spec["url"].split()
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return None
# TXT records come back quoted: "203.0.113.50"
ip = result.stdout.strip().strip('"').split()[0]
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
return ip
match = re.search(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', result.stdout)
if match:
return match.group(1)
except FileNotFoundError:
log.warning("'dig' command not found; cannot use cf-dns IP check service.")
log.warning("'dig' not found; cannot use dig IP check service.")
except Exception:
pass
return None
@ -233,22 +241,20 @@ def get_public_ip(services):
start = get_next_service_index(total)
ordered = [services[(start + i) % total] for i in range(total)]
for i, service in enumerate(ordered):
for i, spec in enumerate(ordered):
stype = spec.get("type", "http")
label = spec.get("url", "?")
try:
if service.startswith("cf-dns:"):
ip = _get_ip_via_cf_dns(service)
if stype == "dig":
ip = _get_ip_via_dig(spec)
else:
req = urllib.request.Request(service, headers={"User-Agent": "ddns-update/1.0"})
with urllib.request.urlopen(req, timeout=10) as r:
body = r.read().decode().strip()
ip = extract_ip(body)
ip = _get_ip_via_http(spec)
if ip:
used_index = (start + i) % total
save_service_index(used_index)
log.info(f"Public IP retrieved from {service}: {ip}")
save_service_index((start + i) % total)
log.info(f"Public IP retrieved from {label}: {ip}")
return ip
except Exception as e:
log.warning(f"IP check failed for {service}: {e}")
except Exception as ex:
log.warning(f"IP check failed for {label}: {ex}")
continue
log.error("Could not determine public IP from any configured service.")
@ -476,7 +482,7 @@ def run_update(cfg, force=False, getip_only=False):
If force=True, bypasses the cached IP check and always updates.
If getip_only=True, prints the detected public IP and returns without updating providers."""
general = cfg["general"]
current_ip = get_public_ip(general["ip_check_services"])
current_ip = get_public_ip(cfg["ip_check_services"])
if getip_only:
print(current_ip)