linuxrouter/docker/routlin-dash/app/pages/clientcredentials/action.py
2026-06-07 01:43:04 -04:00

321 lines
10 KiB
Python

import sqlite3
import time
import bcrypt
from cryptography.fernet import Fernet
from flask import Blueprint, request, redirect, flash
import auth
import config_utils
import sanitize
import settings
_PAGE = 'clientcredentials'
PRO_LICENSE = settings.is_pro()
bp = Blueprint(_PAGE, __name__)
# Enum values stored in the database
USER_TYPE_CAPTIVE = 0
USER_TYPE_SUPPLICANT = 1
HASH_CLEARTEXT = 0
HASH_BCRYPT = 2
VALID_USER_TYPES = {USER_TYPE_CAPTIVE, USER_TYPE_SUPPLICANT}
HASH_FOR_USER_TYPE = {
USER_TYPE_CAPTIVE: HASH_BCRYPT,
USER_TYPE_SUPPLICANT: HASH_CLEARTEXT,
}
# ===================================================================
# Encryption helpers (cleartext passwords only)
# ===================================================================
_credentials_key = settings.get_credentials_key()
_FERNET = Fernet(_credentials_key) if _credentials_key else None
def encrypt_password(plaintext):
if _FERNET is None:
return plaintext
return _FERNET.encrypt(plaintext.encode()).decode()
def decrypt_password(stored):
if _FERNET is None:
return stored
try:
return _FERNET.decrypt(stored.encode()).decode()
except Exception:
return stored
# ===================================================================
# DB helpers
# ===================================================================
def _db_conn():
conn = sqlite3.connect(config_utils.CREDENTIALS_DB)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("""
CREATE TABLE IF NOT EXISTS credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE COLLATE NOCASE,
password TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
user_type INTEGER NOT NULL,
hash_type INTEGER NOT NULL,
vlan TEXT NOT NULL DEFAULT '',
enabled INTEGER NOT NULL DEFAULT 1,
date_set INTEGER NOT NULL,
valid_for INTEGER DEFAULT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
credential_id INTEGER REFERENCES credentials(id) ON DELETE CASCADE,
started_at INTEGER NOT NULL,
expires_at INTEGER,
mac TEXT NOT NULL DEFAULT ''
)
""")
conn.commit()
return conn
def _get_by_index(conn, row_index):
rows = conn.execute("SELECT * FROM credentials ORDER BY id").fetchall()
if row_index < 0 or row_index >= len(rows):
return None
return rows[row_index]
def _hash_password(plaintext, hash_type):
if hash_type == HASH_CLEARTEXT:
return encrypt_password(plaintext)
if hash_type == HASH_BCRYPT:
return bcrypt.hashpw(plaintext.encode(), bcrypt.gensalt()).decode()
raise ValueError(f"Unknown hash_type: {hash_type}")
def _parse_valid_for(value_str, unit_str):
"""Return valid_for in seconds (int) or None for no expiry. Value of 0 means no expiry."""
unit_str = (unit_str or '').strip()
if not value_str or not value_str.strip():
return None
try:
n = int(value_str.strip())
if n <= 0:
return None
except (ValueError, TypeError):
return None
multipliers = {'hours': 3600, 'days': 86400}
mult = multipliers.get(unit_str)
if mult is None:
return None
return n * mult
def _valid_for_to_display(valid_for):
"""Return (value_str, unit_str) for form pre-population."""
if valid_for is None:
return '0', 'hours'
if valid_for % 86400 == 0:
return str(valid_for // 86400), 'days'
if valid_for % 3600 == 0:
return str(valid_for // 3600), 'hours'
return str(valid_for // 3600 or 1), 'hours'
def _row_index():
try:
return int(request.form.get('row_index', ''))
except (ValueError, TypeError):
return None
# ===================================================================
# Routes
# ===================================================================
@bp.route('/action/clientcredentials/addedit', methods=['POST'])
@auth.require_level('administrator')
def addedit():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
is_edit = ri is not None
username = sanitize.name(request.form.get('username', ''))
password = request.form.get('password', '').strip()
description = sanitize.description(request.form.get('description', ''))
try:
user_type = int(request.form.get('user_type', ''))
except (ValueError, TypeError):
flash('Invalid user type.', 'error')
return redirect(f'/{_PAGE}')
if user_type not in VALID_USER_TYPES:
flash('Invalid user type.', 'error')
return redirect(f'/{_PAGE}')
hash_type = HASH_FOR_USER_TYPE[user_type]
vlan = sanitize.name(request.form.get('vlan', ''))
if not vlan:
flash('VLAN is required.', 'error')
return redirect(f'/{_PAGE}')
cfg = config_utils.load_config()
vlans_by_name = {v['name']: v for v in cfg.get('vlans', [])}
if vlan not in vlans_by_name:
flash('Selected VLAN does not exist.', 'error')
return redirect(f'/{_PAGE}')
if user_type == USER_TYPE_CAPTIVE and vlans_by_name[vlan].get('restricted_vlan') != 'c':
flash('Captive portal credentials must be assigned to a captive portal VLAN.', 'error')
return redirect(f'/{_PAGE}')
if user_type == USER_TYPE_SUPPLICANT and vlans_by_name[vlan].get('is_vpn'):
flash('802.1X credentials cannot be assigned to a VPN VLAN.', 'error')
return redirect(f'/{_PAGE}')
enabled = 'enabled' in request.form
valid_for = _parse_valid_for(
request.form.get('valid_for_value', ''),
request.form.get('valid_for_unit', 'never'),
)
if not username:
flash('Username is required.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
if is_edit:
existing = _get_by_index(conn, ri)
if not existing:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
if password:
try:
hashed = _hash_password(password, hash_type)
except ValueError as exc:
conn.close()
flash(str(exc), 'error')
return redirect(f'/{_PAGE}')
stored_password = hashed
stored_hash_type = hash_type
date_set = int(time.time())
else:
stored_password = existing['password']
stored_hash_type = existing['hash_type']
date_set = existing['date_set']
try:
conn.execute(
"""UPDATE credentials
SET username=?, password=?, description=?, user_type=?, hash_type=?,
vlan=?, enabled=?, date_set=?, valid_for=?
WHERE id=?""",
(username, stored_password, description, user_type, stored_hash_type,
vlan, int(enabled), date_set, valid_for, existing['id']),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
flash(f"Username '{username}' is already taken.", 'error')
return redirect(f'/{_PAGE}')
conn.close()
flash(f"Credential '{username}' updated.", 'success')
else:
if not password:
import secrets, string
password = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(8))
flash(f"Auto-generated password for '{username}': {password}", 'success')
try:
hashed = _hash_password(password, hash_type)
except ValueError as exc:
conn.close()
flash(str(exc), 'error')
return redirect(f'/{_PAGE}')
try:
conn.execute(
"""INSERT INTO credentials
(username, password, description, user_type, hash_type, vlan, enabled, date_set, valid_for)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(username, hashed, description, user_type, hash_type,
vlan, int(enabled), int(time.time()), valid_for),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
flash(f"Username '{username}' already exists.", 'error')
return redirect(f'/{_PAGE}')
conn.close()
flash(f"Credential '{username}' added.", 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/clientcredentials/delete', methods=['POST'])
@auth.require_level('administrator')
def delete():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
if ri is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
row = _get_by_index(conn, ri)
if not row:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
conn.execute("DELETE FROM credentials WHERE id=?", (row['id'],))
conn.commit()
conn.close()
flash(f"Credential '{row['username']}' deleted.", 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/clientcredentials/toggle', methods=['POST'])
@auth.require_level('administrator')
def toggle():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
return redirect(f'/{_PAGE}')
ri = _row_index()
if ri is None:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
conn = _db_conn()
row = _get_by_index(conn, ri)
if not row:
conn.close()
flash('Credential not found.', 'error')
return redirect(f'/{_PAGE}')
new_enabled = 0 if row['enabled'] else 1
conn.execute("UPDATE credentials SET enabled=? WHERE id=?", (new_enabled, row['id']))
conn.commit()
conn.close()
state = 'enabled' if new_enabled else 'disabled'
flash(f"Credential '{row['username']}' {state}.", 'success')
return redirect(f'/{_PAGE}')