218 lines
7.3 KiB
Python
218 lines
7.3 KiB
Python
import copy
|
|
import gzip
|
|
import io
|
|
import ipaddress
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from flask import Blueprint, request, redirect, flash, send_file, abort, jsonify
|
|
from auth import require_level
|
|
from config_utils import CONFIGS_DIR, load_config, record_group, diff_fields
|
|
import validation as validate
|
|
|
|
_PAGE = Path(__file__).parent.name
|
|
|
|
bp = Blueprint(_PAGE, __name__)
|
|
|
|
RADIUS_SECRET_FILE = Path(CONFIGS_DIR) / '.radius-secret'
|
|
RADIUS_LOG_FILE = '/var/log/freeradius/radius.log'
|
|
|
|
VALID_MAC_FORMATS = {
|
|
'aabbccddeeff', 'aa-bb-cc-dd-ee-ff', 'aa:bb:cc:dd:ee:ff',
|
|
'AABBCCDDEEFF', 'AA-BB-CC-DD-EE-FF', 'AA:BB:CC:DD:EE:FF',
|
|
}
|
|
|
|
|
|
@bp.route('/action/radius/regenerate', methods=['POST'])
|
|
@require_level('administrator')
|
|
def regenerate():
|
|
try:
|
|
RADIUS_SECRET_FILE.unlink(missing_ok=True)
|
|
except OSError as ex:
|
|
flash(f'Could not delete .radius-secret: {ex}', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
flash('Secret deleted. A new secret will be generated when the pending command is applied.', 'success')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
|
|
@bp.route('/action/radius/options_save', methods=['POST'])
|
|
@require_level('administrator')
|
|
def options_save():
|
|
mac_format = request.form.get('mac_format', 'aabbccddeeff')
|
|
apply_to = request.form.get('apply_to', 'all')
|
|
ap_ips_raw = request.form.get('ap_ips', '')
|
|
|
|
if mac_format not in VALID_MAC_FORMATS:
|
|
flash('Invalid MAC format.', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
if apply_to not in ('all', 'wireless', 'huntgroup'):
|
|
flash('Invalid apply_to value.', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
ap_ips = [line.strip() for line in ap_ips_raw.splitlines() if line.strip()]
|
|
for ip in ap_ips:
|
|
try:
|
|
ipaddress.IPv4Address(ip)
|
|
except ValueError:
|
|
flash(f'Invalid IP address: {ip}', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
cfg = load_config()
|
|
before = copy.deepcopy(cfg.get('radius', {}).get('options', {}))
|
|
after = {'mac_format': mac_format, 'apply_to': apply_to, 'ap_ips': ap_ips}
|
|
cfg.setdefault('radius', {})['options'] = after
|
|
|
|
changes = diff_fields(before, after)
|
|
flash(record_group(cfg, 'radius.options', 'setting', 'radius', changes, 'core apply'), 'success')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
|
|
@bp.route('/action/radius/default_vlan_save', methods=['POST'])
|
|
@require_level('administrator')
|
|
def default_vlan_save():
|
|
chosen = request.form.get('default_vlan', '').strip()
|
|
cfg = load_config()
|
|
vlans = cfg.get('vlans', [])
|
|
|
|
if chosen and not any(v['name'] == chosen for v in vlans):
|
|
flash('Invalid VLAN selection.', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
old_name = next((v['name'] for v in vlans if v.get('radius_default') is True), '')
|
|
for v in vlans:
|
|
v['radius_default'] = (v['name'] == chosen) if chosen else False
|
|
|
|
changes = diff_fields({'radius_default': old_name}, {'radius_default': chosen})
|
|
flash(record_group(cfg, 'radius', 'fallback_vlan', chosen or 'none', changes, 'core apply'), 'success')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
|
|
@bp.route('/action/radius/eap_save', methods=['POST'])
|
|
@require_level('administrator')
|
|
def eap_save():
|
|
allow_weak_eap = 'allow_weak_eap' in request.form
|
|
tunneled_reply = 'tunneled_reply' in request.form
|
|
|
|
cfg = load_config()
|
|
before = copy.deepcopy(cfg.get('radius', {}).get('eap', {}))
|
|
after = {'allow_weak_eap': allow_weak_eap, 'tunneled_reply': tunneled_reply}
|
|
cfg.setdefault('radius', {})['eap'] = after
|
|
|
|
changes = diff_fields(before, after)
|
|
flash(record_group(cfg, 'radius.eap', 'setting', 'radius', changes, 'core apply'), 'success')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
|
|
@bp.route('/action/radius/logging_save', methods=['POST'])
|
|
@require_level('administrator')
|
|
def logging_save():
|
|
log_max_kb = validate.int_range(request.form.get('log_max_kb', '').strip(), 64, None)
|
|
if log_max_kb is None:
|
|
flash('Max Log Size must be a number >= 64.', 'error')
|
|
return redirect(f'/{_PAGE}')
|
|
logging = 'logging' in request.form
|
|
|
|
cfg = load_config()
|
|
before = copy.deepcopy(cfg.get('radius', {}).get('general', {}))
|
|
after = {'logging': logging, 'log_max_kb': log_max_kb}
|
|
cfg.setdefault('radius', {})['general'] = after
|
|
|
|
changes = diff_fields(before, after)
|
|
flash(record_group(cfg, 'radius.general', 'setting', 'radius', changes, 'core apply'), 'success')
|
|
return redirect(f'/{_PAGE}')
|
|
|
|
|
|
@bp.route('/action/radius/logging_download', methods=['GET'])
|
|
@require_level('administrator')
|
|
def logging_download():
|
|
log_dir = os.path.dirname(RADIUS_LOG_FILE)
|
|
chunks = []
|
|
|
|
# Collect radius.log.N.gz files, sorted oldest-first (highest N first)
|
|
gz_files = []
|
|
for name in os.listdir(log_dir) if os.path.isdir(log_dir) else []:
|
|
m = re.fullmatch(r'radius\.log\.(\d+)\.gz', name)
|
|
if m:
|
|
gz_files.append((int(m.group(1)), os.path.join(log_dir, name)))
|
|
for _, path in sorted(gz_files, reverse=True):
|
|
try:
|
|
with gzip.open(path, 'rb') as f:
|
|
chunks.append(f.read())
|
|
except OSError:
|
|
pass
|
|
|
|
# radius.log.1 (plain, older than current)
|
|
rotated = RADIUS_LOG_FILE + '.1'
|
|
if os.path.isfile(rotated):
|
|
try:
|
|
with open(rotated, 'rb') as f:
|
|
chunks.append(f.read())
|
|
except OSError:
|
|
pass
|
|
|
|
# radius.log (current)
|
|
if os.path.isfile(RADIUS_LOG_FILE):
|
|
try:
|
|
with open(RADIUS_LOG_FILE, 'rb') as f:
|
|
chunks.append(f.read())
|
|
except OSError:
|
|
pass
|
|
|
|
if not chunks:
|
|
abort(404)
|
|
|
|
data = b''.join(chunks)
|
|
return send_file(
|
|
io.BytesIO(data),
|
|
as_attachment=True,
|
|
download_name='radius.log',
|
|
mimetype='text/plain',
|
|
)
|
|
|
|
|
|
@bp.route('/api/radius/log-tail', methods=['GET'])
|
|
@require_level('administrator')
|
|
def api_log_tail():
|
|
try:
|
|
cfg = load_config()
|
|
log_max_kb = cfg.get('radius', {}).get('general', {}).get('log_max_kb', 1024)
|
|
|
|
current = []
|
|
try:
|
|
with open(RADIUS_LOG_FILE) as f:
|
|
current = f.readlines()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
prev = []
|
|
if len(current) < 50:
|
|
try:
|
|
with open(RADIUS_LOG_FILE + '.1') as f:
|
|
prev = f.readlines()
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
need = max(0, 50 - len(current))
|
|
lines = (prev[-need:] if need and prev else []) + current
|
|
|
|
if not lines:
|
|
return jsonify({'log': '(log is empty)', 'left': '', 'right': ''})
|
|
|
|
log_dir = os.path.dirname(RADIUS_LOG_FILE)
|
|
try:
|
|
size_kb = sum(
|
|
os.path.getsize(os.path.join(log_dir, f))
|
|
for f in os.listdir(log_dir)
|
|
if os.path.isfile(os.path.join(log_dir, f))
|
|
) / 1024
|
|
except OSError:
|
|
size_kb = 0.0
|
|
|
|
tail = lines[-50:]
|
|
pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0
|
|
note = ' (includes rotated log)' if (prev and need) else ''
|
|
left = f'Showing {len(tail)} lines{note}'
|
|
right = f'Total log size: {size_kb:.1f} KB ({pct}% of max)'
|
|
return jsonify({'log': ''.join(tail).strip(), 'left': left, 'right': right})
|
|
except Exception:
|
|
return jsonify({'log': '(error reading log)', 'left': '', 'right': ''})
|