linuxrouter/docker/routlin-portal/app/page.py
2026-06-08 01:19:29 -04:00

159 lines
5.6 KiB
Python

import ipaddress
import sqlite3
import time
import bcrypt
from flask import Blueprint, request, redirect
import config_utils
CREDENTIALS_DB = f'{config_utils.CONFIGS_DIR}/.client-credentials'
USER_TYPE_CAPTIVE = 0
DIGEST_HASH_BCRYPT = 2
bp = Blueprint('portal', __name__)
PORTAL_HTML = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{title}</title>
<style>
body {{ font-family: sans-serif; max-width: 480px; margin: 60px auto; padding: 0 1rem; }}
h1 {{ font-size: 1.5rem; margin-bottom: .5rem; }}
.err {{ color: #c00; margin: .75rem 0; }}
.terms label {{ display: block; margin: .4rem 0; cursor: pointer; }}
.creds {{ margin: 1rem 0; }}
.creds label {{ display: block; margin-bottom: .2rem; font-size: .9rem; }}
.creds input {{ width: 100%; box-sizing: border-box; padding: .4rem .5rem; margin-bottom: .75rem; font-size: 1rem; }}
button {{ margin-top: 1rem; padding: .55rem 1.4rem; }}
</style>
</head>
<body>
<h1>{title}</h1>
{splash_html}
{error_html}
<form method="post">
<input type="hidden" name="next" value="{next_url}">
{credentials_html}
<div class="terms">{terms_html}</div>
<button type="submit">Continue</button>
</form>
</body>
</html>"""
def _vlan_for_ip(client_ip):
cfg = config_utils.load_config()
try:
addr = ipaddress.ip_address(client_ip)
except ValueError:
return None
for vlan in cfg.get('vlans', []):
if vlan.get('restricted_vlan') != 'c':
continue
try:
net = ipaddress.ip_network(f"{vlan['ip']}/{vlan['subnet']}", strict=False)
if addr in net:
return vlan
except (KeyError, ValueError):
continue
return None
def _cp(vlan):
return vlan.get('captive_portal', {})
def _verify_credential(username, password, vlan_name):
try:
conn = sqlite3.connect(CREDENTIALS_DB)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM credentials WHERE username=? COLLATE NOCASE"
" AND user_type=? AND vlan=? AND enabled=1",
(username, USER_TYPE_CAPTIVE, vlan_name),
).fetchone()
conn.close()
except Exception:
return False
if row is None:
return False
now = int(time.time())
if row['session_seconds'] > 0 and (row['date_set'] + row['session_seconds']) < now:
return False
if row['expires_seconds'] > 0 and (row['date_set'] + row['expires_seconds']) < now:
return False
if row['digest_type'] == DIGEST_HASH_BCRYPT:
try:
return bcrypt.checkpw(password.encode(), row['password'].encode())
except Exception:
return False
return False
def _render(vlan, error=None, next_url=''):
cp = _cp(vlan)
terms = cp.get('portal_terms', vlan.get('portal_terms', []))
terms_html = ''.join(
f'<label><input type="checkbox" name="term_{i}" required> {t}</label>'
for i, t in enumerate(terms)
) or '<p>No terms required.</p>'
require_upw = cp.get('require_username_password', vlan.get('require_username_password', False))
if require_upw:
credentials_html = (
'<div class="creds">'
'<label for="portal_username">Username</label>'
'<input type="text" id="portal_username" name="portal_username" autocomplete="username" required>'
'<label for="portal_password">Password</label>'
'<input type="password" id="portal_password" name="portal_password" autocomplete="current-password" required>'
'</div>'
)
else:
credentials_html = ''
title = cp.get('portal_splash_title', vlan.get('portal_splash_title', 'Guest Portal'))
splash_text = cp.get('portal_splash_text', vlan.get('portal_splash_text', ''))
return PORTAL_HTML.format(
title=title,
splash_html=f'<p>{splash_text}</p>' if splash_text else '',
error_html=f'<p class="err">{error}</p>' if error else '',
credentials_html=credentials_html,
terms_html=terms_html,
next_url=next_url,
)
@bp.route('/', defaults={'path': ''}, methods=['GET', 'POST'])
@bp.route('/<path:path>', methods=['GET', 'POST'])
def portal(path):
vlan = _vlan_for_ip(request.remote_addr)
if vlan is None:
return 'Portal unavailable.', 404
if request.method == 'POST':
cp = _cp(vlan)
terms = cp.get('portal_terms', vlan.get('portal_terms', []))
next_url = request.form.get('next', '')
require_upw = cp.get('require_username_password', vlan.get('require_username_password', False))
if require_upw:
username = request.form.get('portal_username', '').strip()
password = request.form.get('portal_password', '')
if not username:
return _render(vlan, error='Username is required.', next_url=next_url), 200
if not _verify_credential(username, password, vlan['name']):
return _render(vlan, error='Invalid username or password.', next_url=next_url), 200
for i in range(len(terms)):
if not request.form.get(f'term_{i}'):
return _render(vlan, error='You must accept all terms to continue.',
next_url=next_url), 200
try:
with open(config_utils.CAPTIVE_QUEUE, 'a') as f:
f.write(f'allow {request.remote_addr}\n')
except OSError:
pass
return redirect(request.form.get('next') or 'http://routlin.local/', 302)
return _render(vlan, next_url=request.args.get('next', '')), 200