from pathlib import Path from flask import Blueprint, request, session, redirect, flash import os, re, secrets, sqlite3, time from datetime import datetime, timezone import auth import config_utils import sanitize _PAGE = Path(__file__).parent.name bp = Blueprint(_PAGE, __name__) VALID_LEVELS = {'viewer': 1, 'administrator': 2, 'manager': 3} @bp.route('/action/accountmanage/email_change_deny', methods=['POST']) @auth.require_level('manager') def email_change_deny(): account_id = request.form.get('account_id', '').strip() if not account_id: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() con.execute('UPDATE accounts SET requested_email=NULL WHERE account_id=?', (account_id,)) con.commit() con.close() flash('Email change request denied.', 'success') except Exception as exc: flash(f'Could not deny request: {exc}', 'error') return redirect(f'/{_PAGE}') @bp.route('/action/accountmanage/email_change_approve', methods=['POST']) @auth.require_level('manager') def email_change_approve(): from pages.accountcreate.action import _send_verification_email, CODE_TTL_SECS account_id = request.form.get('account_id', '').strip() if not account_id: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() row = con.execute( 'SELECT * FROM accounts WHERE account_id=?', (account_id,) ).fetchone() con.close() except Exception: row = None if not row or not row['requested_email']: flash('No pending email change found.', 'error') return redirect(f'/{_PAGE}') new_email = row['requested_email'] old_password = row['hashed_password'] code = f'{secrets.randbelow(1000000):06d}' code_sent_ts = int(time.time()) try: _send_verification_email(new_email, code) except Exception as exc: flash(f'Could not send verification email: {exc}', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() last_cookie = con.execute( 'SELECT session_id FROM sessions WHERE account_id=? ORDER BY last_seen_ts DESC LIMIT 1', (account_id,) ).fetchone() cookie = last_cookie['session_id'] if last_cookie else None if cookie: con.execute( '''INSERT INTO clients (cookie_unique_token, email, hashed_password, verification_code, code_sent_ts) VALUES (?,?,?,?,?) ON CONFLICT(cookie_unique_token) DO UPDATE SET email=excluded.email, hashed_password=excluded.hashed_password, verification_code=excluded.verification_code, code_sent_ts=excluded.code_sent_ts''', (cookie, new_email, old_password, code, code_sent_ts) ) con.execute( 'UPDATE accounts SET email=?, hashed_password=NULL, requested_email=NULL WHERE account_id=?', (new_email, account_id) ) con.execute('DELETE FROM sessions WHERE account_id=?', (account_id,)) con.commit() con.close() except Exception as exc: flash(f'Could not apply email change: {exc}', 'error') return redirect(f'/{_PAGE}') flash(f'Email changed to {new_email}. A verification email has been sent.', 'success') return redirect(f'/{_PAGE}') @bp.route('/action/accountmanage/session_invalidate', methods=['POST']) @auth.require_level('manager') def session_invalidate(): sid = request.form.get('session_id', '').strip() if not sid: flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() con.execute('DELETE FROM sessions WHERE session_id=?', (sid,)) con.commit() con.close() flash('Session invalidated.', 'success') except Exception: flash('Failed to invalidate session.', 'error') return redirect(f'/{_PAGE}') @bp.route('/action/accountmanage/accounts_add', methods=['POST']) @auth.require_level('manager') def accounts_add(): import uuid as _uuid, time as _t email = sanitize.email(request.form.get('email_address', '')) access_level = request.form.get('access_level', '').strip() if not email: flash('Email address is required.', 'error') return redirect(f'/{_PAGE}') if not re.match(r'^[^@\s]+@[^@\s]+\.[^@\s]+$', email): flash('Email address does not appear to be valid.', 'error') return redirect(f'/{_PAGE}') if access_level not in VALID_LEVELS: flash('Invalid access level.', 'error') return redirect(f'/{_PAGE}') if config_utils.get_account_by_email(email): flash('An account with that email address already exists.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() con.execute( 'INSERT INTO accounts(account_id,email,access_level,created_ts,created_by) VALUES(?,?,?,?,?)', (str(_uuid.uuid4()), email, VALID_LEVELS[access_level], int(_t.time()), session.get('email_address', '')) ) con.commit() con.close() except Exception as exc: flash(f'Could not add account: {exc}', 'error') return redirect(f'/{_PAGE}') flash(f'Authorization added for {email}.', 'success') return redirect(f'/{_PAGE}') @bp.route('/action/accountmanage/accounts_edit', methods=['POST']) @auth.require_level('manager') def accounts_edit(): try: row_index = int(request.form.get('row_index', '')) except (ValueError, TypeError): flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') access_level = request.form.get('access_level', '').strip() if access_level not in VALID_LEVELS: flash('Invalid access level.', 'error') return redirect(f'/{_PAGE}') accounts = config_utils.list_accounts() if row_index < 0 or row_index >= len(accounts): flash('Account not found.', 'error') return redirect(f'/{_PAGE}') target = accounts[row_index] if target.get('email_address', '').lower() == session.get('email_address', '').lower(): flash('You cannot change your own access level.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() con.execute( 'UPDATE accounts SET access_level=? WHERE account_id=?', (VALID_LEVELS[access_level], target['account_id']) ) con.commit() con.close() except Exception as exc: flash(f'Could not update account: {exc}', 'error') return redirect(f'/{_PAGE}') flash('Account updated.', 'success') return redirect(f'/{_PAGE}') @bp.route('/action/accountmanage/accounts_delete', methods=['POST']) @auth.require_level('manager') def accounts_delete(): try: row_index = int(request.form.get('row_index', '')) except (ValueError, TypeError): flash('Invalid request.', 'error') return redirect(f'/{_PAGE}') accounts = config_utils.list_accounts() if row_index < 0 or row_index >= len(accounts): flash('Account not found.', 'error') return redirect(f'/{_PAGE}') target = accounts[row_index] target_email = target.get('email_address', '').lower() current_email = session.get('email_address', '').lower() initial_email = os.environ.get('INITIAL_MANAGER_EMAIL', '').strip().lower() if target_email == current_email and target_email != initial_email: flash('You cannot remove your own account.', 'error') return redirect(f'/{_PAGE}') try: con = config_utils.open_accounts_db() con.execute('DELETE FROM sessions WHERE account_id=?', (target['account_id'],)) con.execute('DELETE FROM accounts WHERE account_id=?', (target['account_id'],)) con.commit() con.close() except Exception as exc: flash(f'Could not delete account: {exc}', 'error') return redirect(f'/{_PAGE}') flash(f'Account for {target["email_address"]} has been removed.', 'success') return redirect(f'/{_PAGE}')