linuxrouter/docker/routlin-dash/app/pages/preferences/action.py
2026-06-13 09:25:57 -04:00

176 lines
6.3 KiB
Python

from pathlib import Path
from flask import Blueprint, request, session, redirect, flash
import bcrypt
import auth
import config_utils
import sanitize
import settings
_PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
@bp.route('/action/preferences/accountdetails_save', methods=['POST'])
@auth.require_level('viewer')
def accountdetails_save():
tz = sanitize.timezone(request.form.get('timezone', '').strip())
if not tz:
flash('Timezone is required.', 'error')
return redirect(f'/{_PAGE}')
session['timezone'] = tz
flash('Preferences saved.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/preferences/email_change_direct', methods=['POST'])
@auth.require_level('viewer')
def email_change_direct():
if session.get('email_address', '').lower() != settings.get_initial_manager_email():
flash('Not authorised.', 'error')
return redirect(f'/{_PAGE}')
new_email = sanitize.email(request.form.get('new_email', '').strip())
if not new_email:
flash('A valid email address is required.', 'error')
return redirect(f'/{_PAGE}')
current_email = session.get('email_address', '').lower()
if new_email == current_email:
flash('That is already your current email address.', 'error')
return redirect(f'/{_PAGE}')
if not settings.is_single_user() and config_utils.get_account_by_email(new_email):
flash('That email address is already in use.', 'error')
return redirect(f'/{_PAGE}')
if not settings.is_single_user():
try:
con = config_utils.open_accounts_db()
con.execute(
'UPDATE accounts SET email=?, requested_email=NULL WHERE account_id=?',
(new_email, session.get('account_id', ''))
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not update account: {exc}', 'error')
return redirect(f'/{_PAGE}')
try:
import json as _json
cfg_path = Path(settings._APP_CONFIG_PATH)
cfg = _json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg['initial_manager_email'] = new_email
cfg_path.write_text(_json.dumps(cfg, indent=2) + '\n')
settings._app_config_cache = None
except Exception as exc:
flash(f'Could not update app_config.json: {exc}', 'error')
return redirect(f'/{_PAGE}')
session['email_address'] = new_email
flash('Email address updated.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/preferences/email_change_request', methods=['POST'])
@auth.require_level('viewer')
def email_change_request():
if settings.is_single_user():
flash('Not available in single-user mode.', 'error')
return redirect(f'/{_PAGE}')
new_email = sanitize.email(request.form.get('new_email', '').strip())
if not new_email:
flash('A valid email address is required.', 'error')
return redirect(f'/{_PAGE}')
current_email = session.get('email_address', '').lower()
if new_email == current_email:
flash('That is already your current email address.', 'error')
return redirect(f'/{_PAGE}')
if config_utils.get_account_by_email(new_email):
flash('That email address is already in use.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute(
'UPDATE accounts SET requested_email=? WHERE account_id=?',
(new_email, session.get('account_id', ''))
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not submit request: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash('Email change request submitted. A manager will review it.', 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/preferences/changepassword_save', methods=['POST'])
@auth.require_level('viewer')
def changepassword_save():
current_password = request.form.get('current_password', '')
new_password = request.form.get('new_password', '')
confirm_password = request.form.get('confirm_password', '')
if not current_password or not new_password or not confirm_password:
flash('All fields are required.', 'error')
return redirect(f'/{_PAGE}')
if new_password != confirm_password:
flash('New passwords do not match.', 'error')
return redirect(f'/{_PAGE}')
if len(new_password) < 8:
flash('New password must be at least 8 characters.', 'error')
return redirect(f'/{_PAGE}')
hashed = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
if settings.is_single_user():
stored_hash = settings.get_initial_manager_password_hash()
if not stored_hash or not bcrypt.checkpw(current_password.encode('utf-8'), stored_hash.encode('utf-8')):
flash('Current password is incorrect.', 'error')
return redirect(f'/{_PAGE}')
try:
import json as _json
cfg_path = Path(settings._APP_CONFIG_PATH)
cfg = _json.loads(cfg_path.read_text()) if cfg_path.exists() else {}
cfg['initial_manager_password'] = hashed
cfg_path.write_text(_json.dumps(cfg, indent=2) + '\n')
settings._app_config_cache = None
except Exception as exc:
flash(f'Could not update password: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash('Password changed successfully.', 'success')
return redirect(f'/{_PAGE}')
account = config_utils.get_account_by_id(session.get('account_id', ''))
if account is None:
flash('Account not found. Please log in again.', 'error')
return redirect('/accountlogin')
if not bcrypt.checkpw(current_password.encode('utf-8'), account['hashed_password'].encode('utf-8')):
flash('Current password is incorrect.', 'error')
return redirect(f'/{_PAGE}')
try:
con = config_utils.open_accounts_db()
con.execute(
'UPDATE accounts SET hashed_password=? WHERE account_id=?',
(hashed, account['account_id'])
)
con.commit()
con.close()
except Exception as exc:
flash(f'Could not update password: {exc}', 'error')
return redirect(f'/{_PAGE}')
flash('Password changed successfully.', 'success')
return redirect(f'/{_PAGE}')