linuxrouter/docker/routlin-dash/app/pages/clientcredentials/action.py
2026-06-08 01:38:18 -04:00

328 lines
11 KiB
Python

import sqlite3
import time
import bcrypt
from cryptography.fernet import Fernet
from flask import Blueprint, request, redirect, flash
import auth
import config_utils
import sanitize
import settings
_PAGE = 'clientcredentials'
PRO_LICENSE = settings.is_pro()
bp = Blueprint(_PAGE, __name__)
# Enum values stored in the database
USER_TYPE_CAPTIVE = 0
USER_TYPE_SUPPLICANT = 1
DIGEST_CYPHERTEXT_FERNET = 0
DIGEST_HASH_BCRYPT = 2
VALID_USER_TYPES = {USER_TYPE_CAPTIVE, USER_TYPE_SUPPLICANT}
HASH_FOR_USER_TYPE = {
USER_TYPE_CAPTIVE: DIGEST_HASH_BCRYPT,
USER_TYPE_SUPPLICANT: DIGEST_CYPHERTEXT_FERNET,
}
# ===================================================================
# Encryption helpers (cleartext passwords only)
# ===================================================================
_credentials_key = settings.get_credentials_key()
_FERNET = Fernet(_credentials_key) if _credentials_key else None
def encrypt_password(plaintext):
if _FERNET is None:
return plaintext
return _FERNET.encrypt(plaintext.encode()).decode()
def decrypt_password(stored):
if _FERNET is None:
return stored
try:
return _FERNET.decrypt(stored.encode()).decode()
except Exception:
return stored
# ===================================================================
# DB helpers
# ===================================================================
def _db_conn():
conn = sqlite3.connect(config_utils.CREDENTIALS_DB)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("""
CREATE TABLE IF NOT EXISTS credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE COLLATE NOCASE,
password TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
user_type INTEGER NOT NULL,
digest_type INTEGER NOT NULL,
vlan TEXT NOT NULL DEFAULT '',
enabled INTEGER NOT NULL DEFAULT 1,
date_set INTEGER NOT NULL,
session_seconds INTEGER NOT NULL DEFAULT 0,
expires_seconds INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
credential_id INTEGER REFERENCES credentials(id) ON DELETE CASCADE,
started_at INTEGER NOT NULL,
expires_at INTEGER,
mac TEXT NOT NULL DEFAULT ''
)
""")
conn.commit()
return conn
def _get_by_index(conn, row_index):
rows = conn.execute("SELECT * FROM credentials ORDER BY id").fetchall()
if row_index < 0 or row_index >= len(rows):
return None
return rows[row_index]
def _hash_password(plaintext, digest_type):
if digest_type == DIGEST_CYPHERTEXT_FERNET:
return encrypt_password(plaintext)
if digest_type == DIGEST_HASH_BCRYPT:
return bcrypt.hashpw(plaintext.encode(), bcrypt.gensalt()).decode()
raise ValueError(f"Unknown digest_type: {digest_type}")
def _parse_session_seconds(value_str, unit_str):
"""Return session_seconds (int). 0 means no limit."""
unit_str = (unit_str or '').strip()
if not value_str or not value_str.strip():
return 0
try:
n = int(value_str.strip())
if n <= 0:
return 0
except (ValueError, TypeError):
return 0
multipliers = {'hours': 3600, 'days': 86400}
mult = multipliers.get(unit_str)
if mult is None:
return 0
return n * mult
def _session_seconds_to_display(session_seconds):
"""Return (value_str, unit_str) for form pre-population."""
if not session_seconds:
return '0', 'hours'
if session_seconds % 86400 == 0:
return str(session_seconds // 86400), 'days'
if session_seconds % 3600 == 0:
return str(session_seconds // 3600), 'hours'
return str(session_seconds // 3600 or 1), 'hours'
def _row_index():
try:
return int(request.form.get('row_index', ''))
except (ValueError, TypeError):
return None
# ===================================================================
# Routes
# ===================================================================
@bp.route('/action/clientcredentials/addedit', methods=['POST'])
@auth.require_level('administrator')
def addedit():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
is_edit = ri is not None
username = sanitize.name(request.form.get('username', ''))
password = request.form.get('password', '').strip()
description = sanitize.description(request.form.get('description', ''))
try:
user_type = int(request.form.get('user_type', ''))
except (ValueError, TypeError):
flash('Invalid user type.', 'error')
return redirect(f'/{_PAGE}')
if user_type not in VALID_USER_TYPES:
flash('Invalid user type.', 'error')
return redirect(f'/{_PAGE}')
digest_type = HASH_FOR_USER_TYPE[user_type]
vlan = sanitize.name(request.form.get('vlan', ''))
if not vlan:
flash('VLAN is required.', 'error')
return redirect(f'/{_PAGE}')
cfg = config_utils.load_config()
vlans_by_name = {v['name']: v for v in cfg.get('vlans', [])}
if vlan not in vlans_by_name:
flash('Selected VLAN does not exist.', 'error')
return redirect(f'/{_PAGE}')
if user_type == USER_TYPE_CAPTIVE and vlans_by_name[vlan].get('restricted_vlan') != 'c':
flash('Captive portal credentials must be assigned to a captive portal VLAN.', 'error')
return redirect(f'/{_PAGE}')
if user_type == USER_TYPE_SUPPLICANT and vlans_by_name[vlan].get('is_vpn'):
flash('802.1X credentials cannot be assigned to a VPN VLAN.', 'error')
return redirect(f'/{_PAGE}')
enabled = 'enabled' in request.form
session_seconds = _parse_session_seconds(
request.form.get('session_duration_value', ''),
request.form.get('session_duration_unit', 'hours'),
)
expires_seconds = _parse_session_seconds(
request.form.get('expiration_duration_value', ''),
request.form.get('expiration_duration_unit', 'hours'),
)
if not username:
flash('Username is required.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
if is_edit:
existing = _get_by_index(conn, ri)
if not existing:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
if password:
try:
hashed = _hash_password(password, digest_type)
except ValueError as exc:
conn.close()
flash(str(exc), 'error')
return redirect(f'/{_PAGE}')
stored_password = hashed
stored_digest_type = digest_type
date_set = int(time.time())
else:
stored_password = existing['password']
stored_digest_type = existing['digest_type']
date_set = existing['date_set']
try:
conn.execute(
"""UPDATE credentials
SET username=?, password=?, description=?, user_type=?, digest_type=?,
vlan=?, enabled=?, date_set=?, session_seconds=?, expires_seconds=?
WHERE id=?""",
(username, stored_password, description, user_type, stored_digest_type,
vlan, int(enabled), date_set, session_seconds, expires_seconds, existing['id']),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
flash(f"Username '{username}' is already taken.", 'error')
return redirect(f'/{_PAGE}')
conn.close()
if password:
flash(f'User account "{username}" updated with password "{password}".', 'success')
else:
flash(f'User account "{username}" updated.', 'success')
else:
if not password:
import secrets, string
password = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(8))
try:
hashed = _hash_password(password, digest_type)
except ValueError as exc:
conn.close()
flash(str(exc), 'error')
return redirect(f'/{_PAGE}')
try:
conn.execute(
"""INSERT INTO credentials
(username, password, description, user_type, digest_type, vlan, enabled, date_set, session_seconds, expires_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(username, hashed, description, user_type, digest_type,
vlan, int(enabled), int(time.time()), session_seconds, expires_seconds),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
flash(f"Username '{username}' already exists.", 'error')
return redirect(f'/{_PAGE}')
conn.close()
flash(f'User account "{username}" created with password "{password}".', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/clientcredentials/delete', methods=['POST'])
@auth.require_level('administrator')
def delete():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
if ri is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
row = _get_by_index(conn, ri)
if not row:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
conn.execute("DELETE FROM credentials WHERE id=?", (row['id'],))
conn.commit()
conn.close()
flash(f"Credential '{row['username']}' deleted.", 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/clientcredentials/toggle', methods=['POST'])
@auth.require_level('administrator')
def toggle():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
if ri is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
row = _get_by_index(conn, ri)
if not row:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
new_enabled = 0 if row['enabled'] else 1
conn.execute("UPDATE credentials SET enabled=? WHERE id=?", (new_enabled, row['id']))
conn.commit()
conn.close()
state = 'enabled' if new_enabled else 'disabled'
flash(f"Credential '{row['username']}' {state}.", 'success')
return redirect(f'/{_PAGE}')