import ipaddress import sqlite3 import time import bcrypt from flask import Blueprint, request, redirect import config_utils CREDENTIALS_DB = f'{config_utils.CONFIGS_DIR}/.client-credentials' USER_TYPE_CAPTIVE = 0 HASH_BCRYPT = 2 bp = Blueprint('portal', __name__) PORTAL_HTML = """\ {title}

{title}

{splash_html} {error_html}
{credentials_html}
{terms_html}
""" def _vlan_for_ip(client_ip): cfg = config_utils.load_config() try: addr = ipaddress.ip_address(client_ip) except ValueError: return None for vlan in cfg.get('vlans', []): if vlan.get('restricted_vlan') != 'c': continue try: net = ipaddress.ip_network(f"{vlan['ip']}/{vlan['subnet']}", strict=False) if addr in net: return vlan except (KeyError, ValueError): continue return None def _cp(vlan): return vlan.get('captive_portal', {}) def _verify_credential(username, password, vlan_name): try: conn = sqlite3.connect(CREDENTIALS_DB) conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM credentials WHERE username=? COLLATE NOCASE" " AND user_type=? AND vlan=? AND enabled=1", (username, USER_TYPE_CAPTIVE, vlan_name), ).fetchone() conn.close() except Exception: return False if row is None: return False if row['valid_for'] is not None and (row['date_set'] + row['valid_for']) < int(time.time()): return False if row['hash_type'] == HASH_BCRYPT: try: return bcrypt.checkpw(password.encode(), row['password'].encode()) except Exception: return False return False def _render(vlan, error=None, next_url=''): cp = _cp(vlan) terms = cp.get('portal_terms', vlan.get('portal_terms', [])) terms_html = ''.join( f'' for i, t in enumerate(terms) ) or '

No terms required.

' require_upw = cp.get('require_username_password', vlan.get('require_username_password', False)) if require_upw: credentials_html = ( '
' '' '' '' '' '
' ) else: credentials_html = '' title = cp.get('portal_splash_title', vlan.get('portal_splash_title', 'Guest Portal')) splash_text = cp.get('portal_splash_text', vlan.get('portal_splash_text', '')) return PORTAL_HTML.format( title=title, splash_html=f'

{splash_text}

' if splash_text else '', error_html=f'

{error}

' if error else '', credentials_html=credentials_html, terms_html=terms_html, next_url=next_url, ) @bp.route('/', defaults={'path': ''}, methods=['GET', 'POST']) @bp.route('/', methods=['GET', 'POST']) def portal(path): vlan = _vlan_for_ip(request.remote_addr) if vlan is None: return 'Portal unavailable.', 404 if request.method == 'POST': cp = _cp(vlan) terms = cp.get('portal_terms', vlan.get('portal_terms', [])) next_url = request.form.get('next', '') require_upw = cp.get('require_username_password', vlan.get('require_username_password', False)) if require_upw: username = request.form.get('portal_username', '').strip() password = request.form.get('portal_password', '') if not username: return _render(vlan, error='Username is required.', next_url=next_url), 200 if not _verify_credential(username, password, vlan['name']): return _render(vlan, error='Invalid username or password.', next_url=next_url), 200 for i in range(len(terms)): if not request.form.get(f'term_{i}'): return _render(vlan, error='You must accept all terms to continue.', next_url=next_url), 200 try: with open(config_utils.CAPTIVE_QUEUE, 'a') as f: f.write(f'allow {request.remote_addr}\n') except OSError: pass return redirect(request.form.get('next') or 'http://routlin.local/', 302) return _render(vlan, next_url=request.args.get('next', '')), 200