325 lines
12 KiB
Python
325 lines
12 KiB
Python
"""
|
|
mod_radius.py -- FreeRADIUS configuration management.
|
|
|
|
Generates clients.conf, users, and huntgroups files from config.json, and
|
|
manages EAP settings (use_tunneled_reply, md5 block) in the freeradius EAP
|
|
module config.
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
import mod_shared as shared
|
|
|
|
RADIUS_SECRET_FILE = shared.SCRIPT_DIR / ".radius-secret"
|
|
RADIUS_CLIENTS_CONF = Path("/etc/freeradius/3.0/clients.conf")
|
|
RADIUS_USERS_FILE = Path("/etc/freeradius/3.0/users")
|
|
RADIUS_CONF_FILE = Path("/etc/freeradius/3.0/radiusd.conf")
|
|
RADIUS_EAP_FILE = Path("/etc/freeradius/3.0/mods-available/eap")
|
|
RADIUS_HUNTGROUPS = Path("/etc/freeradius/3.0/huntgroups")
|
|
RADIUS_LOG_FILE = Path("/var/log/freeradius/radius.log")
|
|
RADIUS_HUNTGROUP_NAME = "routlin-aps"
|
|
|
|
|
|
# ===================================================================
|
|
# Data helpers
|
|
# ===================================================================
|
|
|
|
def radius_clients(data):
|
|
"""Return list of (reservation, vlan) tuples where radius_client is True."""
|
|
vlan_by_name = {v["name"]: v for v in data.get("vlans", [])}
|
|
return [
|
|
(r, vlan_by_name[r["vlan"]])
|
|
for r in data.get("dhcp_reservations", [])
|
|
if r.get("radius_client") is True and r.get("vlan") in vlan_by_name
|
|
]
|
|
|
|
|
|
def radius_enabled(data):
|
|
"""Return True if any reservation has radius_client: true."""
|
|
return len(radius_clients(data)) > 0
|
|
|
|
|
|
# ===================================================================
|
|
# Secret
|
|
# ===================================================================
|
|
|
|
def ensure_radius_secret():
|
|
"""Generate a random RADIUS shared secret if .radius-secret does not exist."""
|
|
if RADIUS_SECRET_FILE.exists():
|
|
return RADIUS_SECRET_FILE.read_text().strip()
|
|
import secrets as _secrets
|
|
secret = _secrets.token_urlsafe(32)
|
|
RADIUS_SECRET_FILE.write_text(secret + "\n")
|
|
RADIUS_SECRET_FILE.chmod(0o644)
|
|
print(f"Generated RADIUS shared secret: {RADIUS_SECRET_FILE}")
|
|
print(f" ACTION REQUIRED: enter this shared secret into your managed switch's RADIUS configuration:")
|
|
print(f" {secret}")
|
|
return secret
|
|
|
|
|
|
# ===================================================================
|
|
# Config file builders
|
|
# ===================================================================
|
|
|
|
def build_radius_clients_conf(data, secret):
|
|
"""Generate freeradius clients.conf from reservations with radius_client: true."""
|
|
lines = [
|
|
"# Generated by core.py -- do not edit manually.",
|
|
"# Edit config.json and re-run: sudo python3 core.py --apply",
|
|
"",
|
|
"# localhost (required)",
|
|
"client localhost {",
|
|
" ipaddr = 127.0.0.1",
|
|
f" secret = {secret}",
|
|
" shortname = localhost",
|
|
"}",
|
|
"",
|
|
]
|
|
for r, vlan in radius_clients(data):
|
|
name = r.get("hostname") or r.get("description", "unknown").replace(" ", "-").lower()
|
|
lines += [
|
|
f"# {r['description']}",
|
|
f"client {name} {{",
|
|
f" ipaddr = {r['ip']}",
|
|
f" secret = {secret}",
|
|
f" shortname = {name}",
|
|
"}",
|
|
"",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def fmt_mac(raw, fmt):
|
|
c = raw.replace(':', '').replace('-', '').lower()
|
|
pairs = [c[i:i+2] for i in range(0, 12, 2)]
|
|
upper = fmt[0].isupper()
|
|
if fmt in ('aabbccddeeff', 'AABBCCDDEEFF'):
|
|
sep = ''
|
|
elif fmt in ('aa-bb-cc-dd-ee-ff', 'AA-BB-CC-DD-EE-FF'):
|
|
sep = '-'
|
|
else:
|
|
sep = ':'
|
|
joined = sep.join(pairs)
|
|
return joined.upper() if upper else joined
|
|
|
|
|
|
def build_radius_users(data):
|
|
"""
|
|
Generate freeradius users file content.
|
|
Each MAC reservation across all VLANs gets an entry mapping it to its VLAN ID.
|
|
Unknown MACs fall through to DEFAULT which returns the radius_default VLAN.
|
|
MAC format and DEFAULT rule scope are read from radius.options in config.
|
|
Returns the file content as a string, or None if no VLAN has radius_default: true.
|
|
"""
|
|
default_vlan = next(
|
|
(v for v in data["vlans"] if v.get("radius_default") is True), None
|
|
)
|
|
if default_vlan is None:
|
|
return None
|
|
|
|
fr_opts = data.get('radius', {}).get('options', {})
|
|
mac_fmt = fr_opts.get('mac_format', 'aabbccddeeff')
|
|
apply_to = fr_opts.get('apply_to', 'all')
|
|
|
|
lines = [
|
|
"# Generated by core.py -- do not edit manually.",
|
|
"# Edit config.json and re-run: sudo python3 core.py --apply",
|
|
"",
|
|
]
|
|
|
|
vlan_by_name = {v["name"]: v for v in data.get("vlans", [])}
|
|
for r in data.get("dhcp_reservations", []):
|
|
if r.get("enabled") is not True:
|
|
continue
|
|
raw_mac = r.get("mac", "")
|
|
if not raw_mac:
|
|
continue
|
|
vlan = vlan_by_name.get(r.get("vlan", ""))
|
|
if not vlan:
|
|
continue
|
|
mac = fmt_mac(raw_mac, mac_fmt)
|
|
vlan_id = vlan.get('vlan_id')
|
|
lines += [
|
|
f"# {r['description']} -> VLAN {vlan_id} ({vlan['name']})",
|
|
f"{mac} Cleartext-Password := \"{mac}\"",
|
|
f" Tunnel-Type = VLAN,",
|
|
f" Tunnel-Medium-Type = IEEE-802,",
|
|
f" Tunnel-Private-Group-Id = \"{vlan_id}\"",
|
|
"",
|
|
]
|
|
|
|
default_id = default_vlan.get('vlan_id')
|
|
ap_ips = fr_opts.get('ap_ips', [])
|
|
if apply_to == 'wireless':
|
|
default_check = "DEFAULT NAS-Port-Type = Wireless-802.11, Auth-Type := Accept"
|
|
elif apply_to == 'huntgroup' and ap_ips:
|
|
default_check = f'DEFAULT Huntgroup-Name == "{RADIUS_HUNTGROUP_NAME}", Auth-Type := Accept'
|
|
else:
|
|
default_check = "DEFAULT Auth-Type := Accept"
|
|
lines += [
|
|
f"# Default -- unknown MACs land on VLAN {default_id} ({default_vlan['name']})",
|
|
default_check,
|
|
f" Tunnel-Type = VLAN,",
|
|
f" Tunnel-Medium-Type = IEEE-802,",
|
|
f" Tunnel-Private-Group-Id = \"{default_id}\"",
|
|
"",
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ===================================================================
|
|
# freeradius config file patching
|
|
# ===================================================================
|
|
|
|
def set_freeradius_log(enabled):
|
|
"""Enable or disable auth logging lines in radiusd.conf. Returns True if the file was modified."""
|
|
if not RADIUS_CONF_FILE.exists():
|
|
return False
|
|
value = 'yes' if enabled else 'no'
|
|
content = RADIUS_CONF_FILE.read_text()
|
|
updated = re.sub(r'(?m)^(\s*auth\s*=\s*)(yes|no)', rf'\g<1>{value}', content)
|
|
updated = re.sub(r'(?m)^(\s*auth_accept\s*=\s*)(yes|no)', rf'\g<1>{value}', updated)
|
|
updated = re.sub(r'(?m)^(\s*auth_reject\s*=\s*)(yes|no)', rf'\g<1>{value}', updated)
|
|
if updated == content:
|
|
print(f"radiusd.conf: auth logging already {'enabled' if enabled else 'disabled'}.")
|
|
return False
|
|
RADIUS_CONF_FILE.write_text(updated)
|
|
print(f"radiusd.conf: auth logging {'enabled' if enabled else 'disabled'}.")
|
|
return True
|
|
|
|
|
|
def write_huntgroups(data):
|
|
"""Write the huntgroups file if apply_to=huntgroup and ap_ips are configured.
|
|
Returns True if the file was written, False if not applicable or already up to date.
|
|
"""
|
|
opts = data.get('radius', {}).get('options', {})
|
|
apply_to = opts.get('apply_to', 'all')
|
|
ap_ips = opts.get('ap_ips', [])
|
|
if apply_to != 'huntgroup' or not ap_ips:
|
|
return False
|
|
lines = [
|
|
"# Generated by core.py -- do not edit manually.",
|
|
"# Edit config.json and re-run: sudo python3 core.py --apply",
|
|
"",
|
|
]
|
|
for ip in ap_ips:
|
|
lines.append(f"{RADIUS_HUNTGROUP_NAME} NAS-IP-Address == {ip}")
|
|
content = "\n".join(lines) + "\n"
|
|
existing = RADIUS_HUNTGROUPS.read_text() if RADIUS_HUNTGROUPS.exists() else None
|
|
if existing == content:
|
|
return False
|
|
RADIUS_HUNTGROUPS.write_text(content)
|
|
print(f"Written: {RADIUS_HUNTGROUPS}")
|
|
return True
|
|
|
|
|
|
def toggle_freeradius_block(content, block_name, enable):
|
|
"""Comment out or uncomment a named brace block in a freeradius config file.
|
|
|
|
Finds the block by name (whether currently commented or not), locates its
|
|
matching closing brace via depth counting, then comments or uncomments the
|
|
entire range as a unit.
|
|
"""
|
|
lines = content.splitlines(keepends=True)
|
|
i = 0
|
|
while i < len(lines):
|
|
effective = re.sub(r'^(\s*)#', r'\1', lines[i])
|
|
if re.match(r'\s*' + re.escape(block_name) + r'\s*\{', effective):
|
|
depth = 0
|
|
j = i
|
|
while j < len(lines):
|
|
eff = re.sub(r'^(\s*)#', r'\1', lines[j])
|
|
depth += eff.count('{') - eff.count('}')
|
|
if depth == 0:
|
|
break
|
|
j += 1
|
|
for k in range(i, j + 1):
|
|
line = lines[k]
|
|
if enable:
|
|
lines[k] = re.sub(r'^(\s*)#', r'\1', line, count=1)
|
|
else:
|
|
if line.strip() and not re.match(r'\s*#', line):
|
|
lines[k] = re.sub(r'^(\s*)', r'\1#', line, count=1)
|
|
return ''.join(lines)
|
|
i += 1
|
|
return content
|
|
|
|
|
|
def set_freeradius_eap(data):
|
|
"""Patch EAP config for tunneled_reply and allow_weak_eap settings.
|
|
Returns True if the file was modified, False if unchanged or not found.
|
|
"""
|
|
if not RADIUS_EAP_FILE.exists():
|
|
return False
|
|
eap_cfg = data.get('radius', {}).get('eap', {})
|
|
tunneled_reply = eap_cfg.get('tunneled_reply', False)
|
|
allow_weak_eap = eap_cfg.get('allow_weak_eap', False)
|
|
content = RADIUS_EAP_FILE.read_text()
|
|
tr_val = 'yes' if tunneled_reply else 'no'
|
|
eap_type = 'md5' if allow_weak_eap else 'peap'
|
|
content2 = re.sub(r'(?m)^(\s*use_tunneled_reply\s*=\s*)(yes|no)', rf'\g<1>{tr_val}', content)
|
|
# Only replace the first occurrence -- that is the outer eap{} block's default.
|
|
# Inner blocks (e.g. peap's tunneled default) must not be touched.
|
|
content3 = re.sub(r'(?m)^(\s*default_eap_type\s*=\s*)\w+', rf'\g<1>{eap_type}', content2, count=1)
|
|
content4 = toggle_freeradius_block(content3, 'md5', allow_weak_eap)
|
|
if content4 == content:
|
|
return False
|
|
RADIUS_EAP_FILE.write_text(content4)
|
|
print(f"EAP: default_eap_type={eap_type}, tunneled_reply={tr_val}, allow_weak_eap={allow_weak_eap}")
|
|
return True
|
|
|
|
|
|
# ===================================================================
|
|
# Apply
|
|
# ===================================================================
|
|
|
|
def apply_radius(data):
|
|
"""Write FreeRADIUS config files and restart the service.
|
|
Returns error string on failure, None on success.
|
|
"""
|
|
secret = ensure_radius_secret()
|
|
|
|
clients_content = build_radius_clients_conf(data, secret)
|
|
users_content = build_radius_users(data)
|
|
if users_content is None:
|
|
return "No VLAN has radius_default: true. Cannot generate RADIUS users file."
|
|
|
|
logging = data.get('radius', {}).get('general', {}).get('logging', False)
|
|
|
|
changed = set_freeradius_log(logging)
|
|
changed |= write_huntgroups(data)
|
|
changed |= set_freeradius_eap(data)
|
|
for path, content in [(RADIUS_CLIENTS_CONF, clients_content),
|
|
(RADIUS_USERS_FILE, users_content)]:
|
|
existing = path.read_text() if path.exists() else None
|
|
if existing != content:
|
|
path.write_text(content)
|
|
print(f"Written: {path}")
|
|
changed = True
|
|
else:
|
|
print(f"Unchanged: {path}")
|
|
|
|
svc = "freeradius"
|
|
state = subprocess.run(
|
|
["systemctl", "is-active", svc], capture_output=True, text=True
|
|
).stdout.strip()
|
|
if state == "active":
|
|
if changed:
|
|
result = subprocess.run(["systemctl", "restart", svc],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
print("freeradius restarted.")
|
|
else:
|
|
shared.service_warning("restart", "freeradius", result.stderr)
|
|
else:
|
|
print("freeradius: running, config unchanged.")
|
|
else:
|
|
subprocess.run(["systemctl", "enable", svc], capture_output=True, text=True)
|
|
result = subprocess.run(["systemctl", "start", svc],
|
|
capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
print("freeradius started.")
|
|
else:
|
|
shared.service_warning("start", "freeradius", result.stderr)
|