137 lines
4.2 KiB
Python
137 lines
4.2 KiB
Python
"""
|
|
mod_shared.py -- Constants and helpers shared across all routlin modules.
|
|
"""
|
|
|
|
import ipaddress
|
|
import os
|
|
from pathlib import Path
|
|
import mod_validation as validation
|
|
|
|
PRODUCT_NAME = "routlin"
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
DNSMASQ_CONF_DIR = Path(f"/etc/dnsmasq-{PRODUCT_NAME}")
|
|
LEASES_DIR = Path("/var/lib/misc")
|
|
SYSTEMD_DIR = Path("/etc/systemd/system")
|
|
|
|
|
|
# ===================================================================
|
|
# Process / error helpers
|
|
# ===================================================================
|
|
|
|
def is_root():
|
|
return os.geteuid() == 0
|
|
|
|
|
|
def service_warning(action, svc, stderr):
|
|
msg = stderr.strip()
|
|
print(f"WARNING: Failed to {action} {svc}: {msg}")
|
|
if "not found" in msg.lower() or "not-found" in msg.lower():
|
|
print(f" -> Package may not be installed. Run: sudo python3 install.py")
|
|
|
|
|
|
def chown_to_script_dir_owner(path):
|
|
try:
|
|
stat = SCRIPT_DIR.stat()
|
|
os.chown(path, stat.st_uid, stat.st_gid)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
# ===================================================================
|
|
# Network / VLAN utilities
|
|
# ===================================================================
|
|
|
|
def prefix_to_dotted(n):
|
|
mask = (0xFFFFFFFF << (32 - int(n))) & 0xFFFFFFFF
|
|
return '.'.join(str((mask >> (8 * i)) & 0xFF) for i in (3, 2, 1, 0))
|
|
|
|
|
|
def network_for(vlan):
|
|
return ipaddress.IPv4Network(f"{vlan['subnet']}/{vlan['subnet_mask']}", strict=False)
|
|
|
|
|
|
def lowest_quartet_ip(vlan):
|
|
"""Return the server_identity IP with the lowest last-octet value, or None if there are no valid identities."""
|
|
identities = vlan.get("server_identities", [])
|
|
ips = []
|
|
for s in identities:
|
|
try:
|
|
ips.append(ipaddress.IPv4Address(s["ip"]))
|
|
except (KeyError, ValueError):
|
|
pass
|
|
if not ips:
|
|
return None
|
|
return str(min(ips, key=lambda ip: ip.packed[-1]))
|
|
|
|
|
|
def resolve_vlan_options(vlan):
|
|
"""
|
|
Resolve gateway, dns_server, and ntp_server for a VLAN.
|
|
|
|
For both WG and non-WG VLANs: gateway defaults to the lowest-last-octet
|
|
server_identity IP unless overridden in explicit_overrides. The gateway
|
|
override must be one of the server_identity IPs.
|
|
|
|
WG VLANs: ntp_server is None (WireGuard has no DHCP so NTP cannot be
|
|
advertised to peers). Overrides live in vpn_information.explicit_overrides.
|
|
|
|
Non-WG VLANs: overrides live in dhcp_information.explicit_overrides.
|
|
Returns a dict with keys: gateway, dns_server, ntp_server.
|
|
"""
|
|
if validation.is_wg(vlan):
|
|
vpi = vlan["vpn_information"]
|
|
overrides = vpi.get("explicit_overrides", {})
|
|
default = lowest_quartet_ip(vlan) or str(next(network_for(vlan).hosts()))
|
|
gateway = overrides.get("gateway", "") or default
|
|
dns = overrides.get("dns_servers", "") or gateway
|
|
return {
|
|
"gateway": gateway,
|
|
"dns_servers": dns,
|
|
"ntp_servers": None,
|
|
}
|
|
overrides = vlan.get("dhcp_information", {}).get("explicit_overrides", {})
|
|
default = lowest_quartet_ip(vlan)
|
|
def _resolve(key):
|
|
v = overrides.get(key, "")
|
|
if isinstance(v, list):
|
|
return ','.join(v) if v else default
|
|
return v or default
|
|
return {
|
|
"gateway": overrides.get("gateway", "") or default,
|
|
"dns_servers": _resolve("dns_servers"),
|
|
"ntp_servers": _resolve("ntp_servers"),
|
|
}
|
|
|
|
|
|
def is_physical(vlan):
|
|
return vlan.get("vlan_id") == 1
|
|
|
|
|
|
# ===================================================================
|
|
# VLAN naming / path helpers
|
|
# ===================================================================
|
|
|
|
def networkd_stem(vlan):
|
|
return f"10-{PRODUCT_NAME}-{vlan['name']}"
|
|
|
|
|
|
def vlan_service_name(vlan, iface):
|
|
if validation.is_wg(vlan):
|
|
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}-{iface}"
|
|
return f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}"
|
|
|
|
|
|
def vlan_service_file(vlan, iface):
|
|
return SYSTEMD_DIR / f"{vlan_service_name(vlan, iface)}.service"
|
|
|
|
|
|
def vlan_conf_file(vlan):
|
|
return DNSMASQ_CONF_DIR / f"{vlan['name']}.conf"
|
|
|
|
|
|
def vlan_leases_file(vlan):
|
|
return LEASES_DIR / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.leases"
|
|
|
|
|
|
def vlan_pid_file(vlan):
|
|
return Path("/run") / f"dnsmasq-{PRODUCT_NAME}-{vlan['name']}.pid"
|