""" mod_radius.py -- FreeRADIUS configuration management. Generates clients.conf, users, and huntgroups files from config.json, and manages EAP settings (use_tunneled_reply, md5 block) in the freeradius EAP module config. """ import re import sqlite3 import subprocess from pathlib import Path from cryptography.fernet import Fernet import mod_shared as shared RADIUS_SECRET_FILE = shared.SCRIPT_DIR / ".radius-secret" CREDENTIALS_KEY_FILE = shared.SCRIPT_DIR / ".credentials-key" CREDENTIALS_DB_FILE = shared.SCRIPT_DIR / ".client-credentials" RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf") RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users") RADIUS_CONF_FILE = Path("/etc/freeradius/3.0/radiusd.conf") RADIUS_EAP_FILE = Path("/etc/freeradius/3.0/mods-available/eap") RADIUS_HUNTGROUPS = Path("/etc/freeradius/3.0/huntgroups") RADIUS_LOG_FILE = Path("/var/log/freeradius/radius.log") RADIUS_HUNTGROUP_NAME = "routlin-aps" # digest_type values (mirrors clientcredentials/action.py) DIGEST_CYPHERTEXT_FERNET = 0 USER_TYPE_SUPPLICANT = 1 # =================================================================== # Credential helpers # =================================================================== def _load_fernet(): if not CREDENTIALS_KEY_FILE.exists(): print(f"WARNING: {CREDENTIALS_KEY_FILE} not found - cannot decrypt supplicant passwords") return None try: return Fernet(CREDENTIALS_KEY_FILE.read_text().strip().encode()) except Exception as exc: print(f"WARNING: Could not load credentials key: {exc}") return None def _load_supplicant_credentials(): if not CREDENTIALS_DB_FILE.exists(): return [] try: conn = sqlite3.connect(str(CREDENTIALS_DB_FILE)) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT username, password, digest_type, vlan FROM credentials" " WHERE user_type=? AND enabled=1", (USER_TYPE_SUPPLICANT,) ).fetchall() conn.close() return [dict(r) for r in rows] except Exception as exc: print(f"WARNING: Could not read credentials DB: {exc}") return [] # =================================================================== # Data helpers # =================================================================== def radius_clients(data): """Return list of (reservation, vlan) tuples where radius_client is True.""" vlan_by_name = {v["name"]: v for v in data.get("vlans", [])} return [ (r, vlan_by_name[r["vlan"]]) for r in data.get("dhcp_reservations", []) if r.get("radius_client") is True and r.get("vlan") in vlan_by_name ] def radius_enabled(data): """Return True if any reservation has radius_client: true.""" return len(radius_clients(data)) > 0 # =================================================================== # Secret # =================================================================== def ensure_radius_secret(): """Generate a random RADIUS shared secret if .radius-secret does not exist.""" if RADIUS_SECRET_FILE.exists(): return RADIUS_SECRET_FILE.read_text().strip() import secrets as _secrets secret = _secrets.token_urlsafe(32) RADIUS_SECRET_FILE.write_text(secret + "\n") RADIUS_SECRET_FILE.chmod(0o644) print(f"Generated RADIUS shared secret: {RADIUS_SECRET_FILE}") print(f" ACTION REQUIRED: enter this shared secret into your managed switch's RADIUS configuration:") print(f" {secret}") return secret # =================================================================== # Config file builders # =================================================================== def build_radius_clients_conf(data, secret): """Generate freeradius clients.conf from reservations with radius_client: true.""" lines = [ "# Generated by core.py -- do not edit manually.", "# Edit config.json and re-run: sudo python3 core.py --apply", "", "# localhost (required)", "client localhost {", " ipaddr = 127.0.0.1", f" secret = {secret}", " shortname = localhost", "}", "", ] for r, vlan in radius_clients(data): name = r.get("hostname") or r.get("description", "unknown").replace(" ", "-").lower() lines += [ f"# {r['description']}", f"client {name} {{", f" ipaddr = {r['ip']}", f" secret = {secret}", f" shortname = {name}", "}", "", ] return "\n".join(lines) def fmt_mac(raw, fmt): c = raw.replace(':', '').replace('-', '').lower() pairs = [c[i:i+2] for i in range(0, 12, 2)] upper = fmt[0].isupper() if fmt in ('aabbccddeeff', 'AABBCCDDEEFF'): sep = '' elif fmt in ('aa-bb-cc-dd-ee-ff', 'AA-BB-CC-DD-EE-FF'): sep = '-' else: sep = ':' joined = sep.join(pairs) return joined.upper() if upper else joined def build_radius_users(data): """ Generate freeradius users file content. Each MAC reservation across all VLANs gets an entry mapping it to its VLAN ID. Unknown MACs fall through to DEFAULT which returns the radius_default VLAN. MAC format and DEFAULT rule scope are read from radius.options in config. Returns the file content as a string, or None if no VLAN has radius_default: true. """ default_vlan = next( (v for v in data["vlans"] if v.get("radius_default") is True), None ) if default_vlan is None: return None fr_opts = data.get('radius', {}).get('options', {}) mac_fmt = fr_opts.get('mac_format', 'aabbccddeeff') apply_to = fr_opts.get('apply_to', 'all') auth_mode = fr_opts.get('auth_mode', 'mab') mab_first = fr_opts.get('mab_first', True) emit_mac_entries = (auth_mode == 'mab') or mab_first lines = [ "# Generated by core.py -- do not edit manually.", "# Edit config.json and re-run: sudo python3 core.py --apply", "", ] vlan_by_name = {v["name"]: v for v in data.get("vlans", [])} if emit_mac_entries: for r in data.get("dhcp_reservations", []): if r.get("enabled") is not True: continue raw_mac = r.get("mac", "") if not raw_mac: continue vlan = vlan_by_name.get(r.get("vlan", "")) if not vlan: continue mac = fmt_mac(raw_mac, mac_fmt) vlan_id = vlan.get('vlan_id') lines += [ f"# {r['description']} -> VLAN {vlan_id} ({vlan['name']})", f"{mac} Cleartext-Password := \"{mac}\"", f" Tunnel-Type = VLAN,", f" Tunnel-Medium-Type = IEEE-802,", f" Tunnel-Private-Group-Id = \"{vlan_id}\"", "", ] if auth_mode in ('eap_password', 'eap_certificate'): creds = _load_supplicant_credentials() fernet = _load_fernet() if auth_mode == 'eap_password' else None for cred in creds: vlan = vlan_by_name.get(cred['vlan']) if not vlan: continue vlan_id = vlan.get('vlan_id') username = cred['username'] if auth_mode == 'eap_password': if fernet is None: print(f"WARNING: Skipping '{username}' - credentials key unavailable") continue if cred['digest_type'] != DIGEST_CYPHERTEXT_FERNET: print(f"WARNING: Skipping '{username}' - unexpected digest_type {cred['digest_type']}") continue try: plaintext = fernet.decrypt(cred['password'].encode()).decode() except Exception: print(f"WARNING: Skipping '{username}' - decryption failed") continue lines += [ f"# {username} -> VLAN {vlan_id} ({vlan['name']})", f"{username} Cleartext-Password := \"{plaintext}\"", f" Tunnel-Type = VLAN,", f" Tunnel-Medium-Type = IEEE-802,", f" Tunnel-Private-Group-Id = \"{vlan_id}\"", "", ] else: # eap_certificate - cert verified by TLS stack, entry provides VLAN reply attrs lines += [ f"# {username} -> VLAN {vlan_id} ({vlan['name']})", f"{username} Auth-Type := EAP", f" Tunnel-Type = VLAN,", f" Tunnel-Medium-Type = IEEE-802,", f" Tunnel-Private-Group-Id = \"{vlan_id}\"", "", ] default_id = default_vlan.get('vlan_id') ap_ips = fr_opts.get('ap_ips', []) if apply_to == 'wireless': default_check = "DEFAULT NAS-Port-Type = Wireless-802.11, Auth-Type := Accept" elif apply_to == 'huntgroup' and ap_ips: default_check = f'DEFAULT Huntgroup-Name == "{RADIUS_HUNTGROUP_NAME}", Auth-Type := Accept' else: default_check = "DEFAULT Auth-Type := Accept" lines += [ f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})", default_check, f" Tunnel-Type = VLAN,", f" Tunnel-Medium-Type = IEEE-802,", f" Tunnel-Private-Group-Id = \"{default_id}\"", "", ] return "\n".join(lines) # =================================================================== # freeradius config file patching # =================================================================== def set_freeradius_log(enabled): """Enable or disable auth logging lines in radiusd.conf. Returns True if the file was modified.""" if not RADIUS_CONF_FILE.exists(): return False value = 'yes' if enabled else 'no' content = RADIUS_CONF_FILE.read_text() updated = re.sub(r'(?m)^(\s*auth\s*=\s*)(yes|no)', rf'\g<1>{value}', content) updated = re.sub(r'(?m)^(\s*auth_accept\s*=\s*)(yes|no)', rf'\g<1>{value}', updated) updated = re.sub(r'(?m)^(\s*auth_reject\s*=\s*)(yes|no)', rf'\g<1>{value}', updated) if updated == content: print(f"radiusd.conf: auth logging already {'enabled' if enabled else 'disabled'}.") return False RADIUS_CONF_FILE.write_text(updated) print(f"radiusd.conf: auth logging {'enabled' if enabled else 'disabled'}.") return True def write_huntgroups(data): """Write the huntgroups file if apply_to=huntgroup and ap_ips are configured. Returns True if the file was written, False if not applicable or already up to date. """ opts = data.get('radius', {}).get('options', {}) apply_to = opts.get('apply_to', 'all') ap_ips = opts.get('ap_ips', []) if apply_to != 'huntgroup' or not ap_ips: return False lines = [ "# Generated by core.py -- do not edit manually.", "# Edit config.json and re-run: sudo python3 core.py --apply", "", ] for ip in ap_ips: lines.append(f"{RADIUS_HUNTGROUP_NAME} NAS-IP-Address == {ip}") content = "\n".join(lines) + "\n" existing = RADIUS_HUNTGROUPS.read_text() if RADIUS_HUNTGROUPS.exists() else None if existing == content: return False RADIUS_HUNTGROUPS.write_text(content) print(f"Written: {RADIUS_HUNTGROUPS}") return True def toggle_freeradius_block(content, block_name, enable): """Comment out or uncomment a named brace block in a freeradius config file. Finds the block by name (whether currently commented or not), locates its matching closing brace via depth counting, then comments or uncomments the entire range as a unit. """ lines = content.splitlines(keepends=True) i = 0 while i < len(lines): effective = re.sub(r'^(\s*)#', r'\1', lines[i]) if re.match(r'\s*' + re.escape(block_name) + r'\s*\{', effective): depth = 0 j = i while j < len(lines): eff = re.sub(r'^(\s*)#', r'\1', lines[j]) depth += eff.count('{') - eff.count('}') if depth == 0: break j += 1 for k in range(i, j + 1): line = lines[k] if enable: lines[k] = re.sub(r'^(\s*)#', r'\1', line, count=1) else: if line.strip() and not re.match(r'\s*#', line): lines[k] = re.sub(r'^(\s*)', r'\1#', line, count=1) return ''.join(lines) i += 1 return content def _patch_setting_in_block(content, block_name, key, value): """Patch `key = value` inside the first occurrence of `block_name { ... }`.""" lines = content.splitlines(keepends=True) in_block = False depth = 0 for i, line in enumerate(lines): if not in_block: if re.match(r'\s*' + re.escape(block_name) + r'\s*\{', line): in_block = True depth = 1 else: depth += line.count('{') - line.count('}') if depth <= 0: break if re.match(r'\s*' + re.escape(key) + r'\s*=', line): lines[i] = re.sub( r'(' + re.escape(key) + r'\s*=\s*)\S+', rf'\g<1>{value}', line, count=1 ) return ''.join(lines) return content def set_freeradius_eap(data): """Patch EAP config for eap_protocol and tunneled_reply settings. Returns True if the file was modified, False if unchanged or not found. """ if not RADIUS_EAP_FILE.exists(): return False opts = data.get('radius', {}).get('options', {}) eap_protocol = opts.get('eap_protocol', 'eap_peap') tunneled_reply = opts.get('tunneled_reply', False) use_md5 = eap_protocol == 'eap_md5' eap_type = {'eap_peap': 'peap', 'eap_ttls': 'ttls', 'eap_md5': 'md5'}.get(eap_protocol, 'peap') content = RADIUS_EAP_FILE.read_text() tr_val = 'yes' if tunneled_reply else 'no' content2 = re.sub(r'(?m)^(\s*use_tunneled_reply\s*=\s*)(yes|no)', rf'\g<1>{tr_val}', content) # Only replace the first occurrence -- that is the outer eap{} block's default. # Inner blocks (e.g. peap's tunneled default) must not be touched. content3 = re.sub(r'(?m)^(\s*default_eap_type\s*=\s*)\w+', rf'\g<1>{eap_type}', content2, count=1) content4 = toggle_freeradius_block(content3, 'md5', use_md5) inner_protocol = opts.get('inner_protocol', '') _valid_inner = {'eap_peap': {'mschapv2', 'md5', 'gtc'}, 'eap_ttls': {'md5', 'mschapv2', 'gtc'}} if eap_protocol in _valid_inner and inner_protocol in _valid_inner[eap_protocol]: inner_block = 'peap' if eap_protocol == 'eap_peap' else 'ttls' content4 = _patch_setting_in_block(content4, inner_block, 'default_eap_type', inner_protocol) auth_mode = opts.get('auth_mode', 'mab') if auth_mode == 'eap_certificate': il_val = 'yes' if opts.get('include_length', False) else 'no' content4 = _patch_setting_in_block(content4, 'tls', 'include_length', il_val) elif eap_protocol in ('eap_peap', 'eap_ttls'): il_val = 'yes' if opts.get('include_length', False) else 'no' inner_blk = 'peap' if eap_protocol == 'eap_peap' else 'ttls' content4 = _patch_setting_in_block(content4, inner_blk, 'include_length', il_val) if content4 == content: return False RADIUS_EAP_FILE.write_text(content4) print(f"EAP: default_eap_type={eap_type}, inner={inner_protocol or '(default)'}, tunneled_reply={tr_val}") return True # =================================================================== # Apply # =================================================================== def apply_radius(data): """Write FreeRADIUS config files and restart the service. Returns error string on failure, None on success. """ secret = ensure_radius_secret() clients_content = build_radius_clients_conf(data, secret) users_content = build_radius_users(data) if users_content is None: return "No VLAN has radius_default: true. Cannot generate RADIUS users file." logging = data.get('radius', {}).get('general', {}).get('logging', False) changed = set_freeradius_log(logging) changed |= write_huntgroups(data) changed |= set_freeradius_eap(data) for path, content in [(RADIUS_CLIENTS_CONF, clients_content), (RADIUS_USERS_FILE, users_content)]: existing = path.read_text() if path.exists() else None if existing != content: path.write_text(content) print(f"Written: {path}") changed = True else: print(f"Unchanged: {path}") svc = "freeradius" state = subprocess.run( ["systemctl", "is-active", svc], capture_output=True, text=True ).stdout.strip() if state == "active": if changed: result = subprocess.run(["systemctl", "restart", svc], capture_output=True, text=True) if result.returncode == 0: print("freeradius restarted.") else: shared.service_warning("restart", "freeradius", result.stderr) else: print("freeradius: running, config unchanged.") else: subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True) result = subprocess.run(["systemctl", "start", svc], capture_output=True, text=True) if result.returncode == 0: print("freeradius started.") else: shared.service_warning("start", "freeradius", result.stderr)