linuxrouter/docker/routlin-dash/app/pages/accountmanage/action.py
2026-06-13 00:03:11 -04:00

236 lines
8.2 KiB
Python

from pathlib import Path
from flask import Blueprint, request, session, redirect, flash
import os, re, secrets, sqlite3, time
import settings
from datetime import datetime, timezone
import auth
import config_utils
import sanitize
_PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
VALID_LEVELS = {'viewer': 1, 'administrator': 2, 'manager': 3}
@bp.route('/action/accountmanage/email_change_deny', methods=['POST'])
@auth.require_level('manager')
def email_change_deny():
account_id = request.form.get('account_id', '').strip()
if not account_id:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute('UPDATE accounts SET requested_email=NULL WHERE account_id=?', (account_id,))
con.commit()
con.close()
flash('Email change request denied.', 'success')
except Exception as exc:
flash(f'Could not deny request: {exc}', 'error')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/email_change_approve', methods=['POST'])
@auth.require_level('manager')
def email_change_approve():
from pages.accountcreate.action import _send_verification_email, CODE_TTL_SECS
account_id = request.form.get('account_id', '').strip()
if not account_id:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
row = con.execute(
'SELECT * FROM accounts WHERE account_id=?', (account_id,)
).fetchone()
con.close()
except Exception:
row = None
if not row or not row['requested_email']:
flash('No pending email change found.', 'error')
return redirect(f'/{_PAGE}')
new_email = row['requested_email']
old_password = row['hashed_password']
code = f'{secrets.randbelow(1000000):06d}'
code_sent_ts = int(time.time())
try:
_send_verification_email(new_email, code)
except Exception as exc:
flash(f'Could not send verification email: {exc}', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
last_cookie = con.execute(
'SELECT session_id FROM sessions WHERE account_id=? ORDER BY last_seen_ts DESC LIMIT 1',
(account_id,)
).fetchone()
cookie = last_cookie['session_id'] if last_cookie else None
if cookie:
con.execute(
'''INSERT INTO clients (cookie_unique_token, email, hashed_password, verification_code, code_sent_ts)
VALUES (?,?,?,?,?)
ON CONFLICT(cookie_unique_token) DO UPDATE SET
email=excluded.email,
hashed_password=excluded.hashed_password,
verification_code=excluded.verification_code,
code_sent_ts=excluded.code_sent_ts''',
(cookie, new_email, old_password, code, code_sent_ts)
)
con.execute(
'UPDATE accounts SET email=?, hashed_password=NULL, requested_email=NULL WHERE account_id=?',
(new_email, account_id)
)
con.execute('DELETE FROM sessions WHERE account_id=?', (account_id,))
con.commit()
con.close()
except Exception as exc:
flash(f'Could not apply email change: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash(f'Email changed to {new_email}. A verification email has been sent.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/session_invalidate', methods=['POST'])
@auth.require_level('manager')
def session_invalidate():
sid = request.form.get('session_id', '').strip()
if not sid:
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute('DELETE FROM sessions WHERE session_id=?', (sid,))
con.commit()
con.close()
flash('Session invalidated.', 'success')
except Exception:
flash('Failed to invalidate session.', 'error')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_add', methods=['POST'])
@auth.require_level('manager')
def accounts_add():
import uuid as _uuid, time as _t
email = sanitize.email(request.form.get('email_address', ''))
access_level = request.form.get('access_level', '').strip()
if not email:
flash('Email address is required.', 'error')
return redirect(f'/{_PAGE}')
if not re.match(r'^[^@\s]+@[^@\s]+\.[^@\s]+$', email):
flash('Email address does not appear to be valid.', 'error')
return redirect(f'/{_PAGE}')
if access_level not in VALID_LEVELS:
flash('Invalid access level.', 'error')
return redirect(f'/{_PAGE}')
if config_utils.get_account_by_email(email):
flash('An account with that email address already exists.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute(
'INSERT INTO accounts(account_id,email,access_level,created_ts,created_by) VALUES(?,?,?,?,?)',
(str(_uuid.uuid4()), email, VALID_LEVELS[access_level], int(_t.time()),
session.get('email_address', ''))
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not add account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash(f'Authorization added for {email}.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_edit', methods=['POST'])
@auth.require_level('manager')
def accounts_edit():
try:
row_index = int(request.form.get('row_index', ''))
except (ValueError, TypeError):
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
access_level = request.form.get('access_level', '').strip()
if access_level not in VALID_LEVELS:
flash('Invalid access level.', 'error')
return redirect(f'/{_PAGE}')
accounts = config_utils.list_accounts()
if row_index < 0 or row_index >= len(accounts):
flash('Account not found.', 'error')
return redirect(f'/{_PAGE}')
target = accounts[row_index]
if target.get('email_address', '').lower() == session.get('email_address', '').lower():
flash('You cannot change your own access level.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute(
'UPDATE accounts SET access_level=? WHERE account_id=?',
(VALID_LEVELS[access_level], target['account_id'])
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not update account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash('Account updated.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/accountmanage/accounts_delete', methods=['POST'])
@auth.require_level('manager')
def accounts_delete():
try:
row_index = int(request.form.get('row_index', ''))
except (ValueError, TypeError):
flash('Invalid request.', 'error')
return redirect(f'/{_PAGE}')
accounts = config_utils.list_accounts()
if row_index < 0 or row_index >= len(accounts):
flash('Account not found.', 'error')
return redirect(f'/{_PAGE}')
target = accounts[row_index]
target_email = target.get('email_address', '').lower()
current_email = session.get('email_address', '').lower()
initial_email = settings.get_initial_manager_email()
if target_email == current_email and target_email != initial_email:
flash('You cannot remove your own account.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute('DELETE FROM sessions WHERE account_id=?', (target['account_id'],))
con.execute('DELETE FROM accounts WHERE account_id=?', (target['account_id'],))
con.commit()
con.close()
except Exception as exc:
flash(f'Could not delete account: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash(f'Account for {target["email_address"]} has been removed.', 'success')
return redirect(f'/{_PAGE}')