import copy import gzip import io import os import re from pathlib import Path from flask import Blueprint, request, redirect, flash, send_file, abort, jsonify import auth import config_utils import mod_validation as validate import settings _PAGE = Path(__file__).parent.name PRO_LICENSE = settings.is_pro() bp = Blueprint(_PAGE, __name__) RADIUS_SECRET_FILE = Path(config_utils.CONFIGS_DIR) / '.radius-secret' RADIUS_LOG_FILE = '/var/log/freeradius/radius.log' VALID_MAC_FORMATS = { 'aabbccddeeff', 'aa-bb-cc-dd-ee-ff', 'aa:bb:cc:dd:ee:ff', 'AABBCCDDEEFF', 'AA-BB-CC-DD-EE-FF', 'AA:BB:CC:DD:EE:FF', } @bp.route('/action/radius/regenerate', methods=['POST']) @auth.require_level('administrator') def regenerate(): try: RADIUS_SECRET_FILE.unlink(missing_ok=True) except OSError as ex: flash(f'Could not delete .radius-secret: {ex}', 'error') return redirect(f'/{_PAGE}') flash('Secret deleted. A new secret will be generated when the pending command is applied.', 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/options_save', methods=['POST']) @auth.require_level('administrator') def options_save(): mac_format = request.form.get('mac_format', 'aabbccddeeff') if mac_format not in VALID_MAC_FORMATS: flash('Invalid MAC format.', 'error') return redirect(f'/{_PAGE}') cfg = config_utils.load_config() before = copy.deepcopy(cfg.get('radius', {}).get('options', {})) after = {**before, 'mac_format': mac_format} cfg.setdefault('radius', {})['options'] = after changes = config_utils.diff_fields(before, after) flash(config_utils.record_group(cfg, 'radius.options', 'setting', 'radius', changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/auth_mode_save', methods=['POST']) @auth.require_level('administrator') def auth_mode_save(): auth_mode = request.form.get('auth_mode', 'mab') if auth_mode not in ('mab', 'eap_password', 'eap_certificate'): flash('Invalid authentication mode.', 'error') return redirect(f'/{_PAGE}') if auth_mode in ('eap_password', 'eap_certificate') and not PRO_LICENSE: flash('This authentication mode requires a Routlin Pro license.', 'error') return redirect(f'/{_PAGE}') eap_protocol = request.form.get('eap_protocol', 'eap_peap') if eap_protocol not in ('eap_peap', 'eap_ttls', 'eap_md5'): eap_protocol = 'eap_peap' tunneled_reply = 'tunneled_reply' in request.form include_length = 'include_length' in request.form mab_first = 'mab_first' in request.form inner_protocol = request.form.get('inner_protocol', '') _valid_inner = { 'eap_peap': {'mschapv2', 'md5', 'gtc'}, 'eap_ttls': {'md5', 'mschapv2', 'gtc'}, } cfg = config_utils.load_config() before = copy.deepcopy(cfg.get('radius', {}).get('options', {})) after = {**before, 'auth_mode': auth_mode} if auth_mode == 'eap_password': after['eap_protocol'] = eap_protocol after['tunneled_reply'] = tunneled_reply and eap_protocol in ('eap_peap', 'eap_ttls') after['mab_first'] = mab_first if eap_protocol in _valid_inner and inner_protocol in _valid_inner[eap_protocol]: after['inner_protocol'] = inner_protocol else: after.pop('inner_protocol', None) if eap_protocol in ('eap_peap', 'eap_ttls'): after['include_length'] = include_length else: after.pop('include_length', None) try: dur_n = int(request.form.get('default_session_value', '0').strip() or '0') dur_unit = request.form.get('default_session_unit', 'hours') mult = {'hours': 3600, 'days': 86400}.get(dur_unit, 3600) after['default_session_seconds'] = dur_n * mult if dur_n > 0 else 0 except (ValueError, TypeError): after['default_session_seconds'] = 0 try: exp_n = int(request.form.get('default_expiration_value', '0').strip() or '0') exp_unit = request.form.get('default_expiration_unit', 'hours') mult = {'hours': 3600, 'days': 86400}.get(exp_unit, 3600) after['default_expiration_seconds'] = exp_n * mult if exp_n > 0 else 0 except (ValueError, TypeError): after['default_expiration_seconds'] = 0 elif auth_mode == 'eap_certificate': after['include_length'] = include_length after['mab_first'] = mab_first after.pop('eap_protocol', None) after.pop('tunneled_reply', None) after.pop('inner_protocol', None) after.pop('default_session_seconds', None) after.pop('default_expiration_seconds', None) else: # mab after.pop('eap_protocol', None) after.pop('tunneled_reply', None) after.pop('inner_protocol', None) after.pop('include_length', None) after.pop('mab_first', None) after.pop('default_session_seconds', None) after.pop('default_expiration_seconds', None) cfg.setdefault('radius', {})['options'] = after changes = config_utils.diff_fields(before, after) flash(config_utils.record_group(cfg, 'radius.options', 'auth_mode', auth_mode, changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/default_rule_save', methods=['POST']) @auth.require_level('administrator') def default_rule_save(): apply_to = request.form.get('apply_to', 'all') ap_ips = request.form.getlist('ap_ips') if apply_to not in ('all', 'wireless', 'huntgroup'): flash('Invalid apply_to value.', 'error') return redirect(f'/{_PAGE}') cfg = config_utils.load_config() valid_ips = { r['ip'] for r in cfg.get('dhcp_reservations', []) if r.get('radius_client') is True and r.get('ip') and r.get('ip') not in ('', 'dynamic') } for ip in ap_ips: if ip not in valid_ips: flash(f'Invalid AP selection: {ip}', 'error') return redirect(f'/{_PAGE}') before = copy.deepcopy(cfg.get('radius', {}).get('options', {})) after = {**before, 'apply_to': apply_to, 'ap_ips': ap_ips} cfg.setdefault('radius', {})['options'] = after changes = config_utils.diff_fields(before, after) flash(config_utils.record_group(cfg, 'radius.options', 'default_rule', 'radius', changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/default_vlan_save', methods=['POST']) @auth.require_level('administrator') def default_vlan_save(): chosen = request.form.get('default_vlan', '').strip() cfg = config_utils.load_config() vlans = cfg.get('vlans', []) if chosen and not any(v['name'] == chosen for v in vlans): flash('Invalid VLAN selection.', 'error') return redirect(f'/{_PAGE}') old_name = next((v['name'] for v in vlans if v.get('radius_default') is True), '') for v in vlans: v['radius_default'] = (v['name'] == chosen) if chosen else False changes = config_utils.diff_fields({'radius_default': old_name}, {'radius_default': chosen}) flash(config_utils.record_group(cfg, 'radius', 'default_vlan', chosen or 'none', changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/logging_save', methods=['POST']) @auth.require_level('administrator') def logging_save(): log_max_kb = validate.int_range(request.form.get('log_max_kb', '').strip(), 64, None) if log_max_kb is None: flash('Max Log Size must be a number >= 64.', 'error') return redirect(f'/{_PAGE}') logging = 'logging' in request.form cfg = config_utils.load_config() before = copy.deepcopy(cfg.get('radius', {}).get('general', {})) after = {'logging': logging, 'log_max_kb': log_max_kb} cfg.setdefault('radius', {})['general'] = after changes = config_utils.diff_fields(before, after) flash(config_utils.record_group(cfg, 'radius.general', 'setting', 'radius', changes, 'core apply'), 'success') return redirect(f'/{_PAGE}') @bp.route('/action/radius/logging_download', methods=['GET']) @auth.require_level('administrator') def logging_download(): log_dir = os.path.dirname(RADIUS_LOG_FILE) chunks = [] # Collect radius.log.N.gz files, sorted oldest-first (highest N first) gz_files = [] for name in os.listdir(log_dir) if os.path.isdir(log_dir) else []: m = re.fullmatch(r'radius\.log\.(\d+)\.gz', name) if m: gz_files.append((int(m.group(1)), os.path.join(log_dir, name))) for _, path in sorted(gz_files, reverse=True): try: with gzip.open(path, 'rb') as f: chunks.append(f.read()) except OSError: pass # radius.log.1 (plain, older than current) rotated = RADIUS_LOG_FILE + '.1' if os.path.isfile(rotated): try: with open(rotated, 'rb') as f: chunks.append(f.read()) except OSError: pass # radius.log (current) if os.path.isfile(RADIUS_LOG_FILE): try: with open(RADIUS_LOG_FILE, 'rb') as f: chunks.append(f.read()) except OSError: pass if not chunks: abort(404) data = b''.join(chunks) return send_file( io.BytesIO(data), as_attachment=True, download_name='radius.log', mimetype='text/plain', ) @bp.route('/api/radius/log-tail', methods=['GET']) @auth.require_level('administrator') def api_log_tail(): try: cfg = config_utils.load_config() log_max_kb = cfg.get('radius', {}).get('general', {}).get('log_max_kb', 1024) current = [] try: with open(RADIUS_LOG_FILE) as f: current = f.readlines() except FileNotFoundError: pass prev = [] if len(current) < 50: try: with open(RADIUS_LOG_FILE + '.1') as f: prev = f.readlines() except FileNotFoundError: pass need = max(0, 50 - len(current)) lines = (prev[-need:] if need and prev else []) + current if not lines: return jsonify({'log': '(no log entries yet)', 'left': '', 'right': ''}) log_dir = os.path.dirname(RADIUS_LOG_FILE) try: size_kb = sum( os.path.getsize(os.path.join(log_dir, f)) for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) ) / 1024 except OSError: size_kb = 0.0 tail = lines[-50:] pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0 note = ' (includes rotated log)' if (prev and need) else '' left = f'Showing {len(tail)} lines{note}' right = f'Total log size: {size_kb:.1f} KB ({pct}% of max)' return jsonify({'log': ''.join(tail).strip(), 'left': left, 'right': right}) except Exception: return jsonify({'log': '(error reading log)', 'left': '', 'right': ''})