Development

This commit is contained in:
Matthew Grotke 2026-06-08 00:28:33 -04:00
parent 1a473296b7
commit 43c4cf380d
5 changed files with 71 additions and 62 deletions

View file

@ -71,7 +71,8 @@ def _db_conn():
vlan TEXT NOT NULL DEFAULT '',
enabled INTEGER NOT NULL DEFAULT 1,
date_set INTEGER NOT NULL,
valid_for INTEGER DEFAULT NULL
session_seconds INTEGER NOT NULL DEFAULT 0,
expires_seconds INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
@ -103,33 +104,33 @@ def _hash_password(plaintext, digest_type):
raise ValueError(f"Unknown digest_type: {digest_type}")
def _parse_valid_for(value_str, unit_str):
"""Return valid_for in seconds (int) or None for no expiry. Value of 0 means no expiry."""
def _parse_session_seconds(value_str, unit_str):
"""Return session_seconds (int). 0 means no limit."""
unit_str = (unit_str or '').strip()
if not value_str or not value_str.strip():
return None
return 0
try:
n = int(value_str.strip())
if n <= 0:
return None
return 0
except (ValueError, TypeError):
return None
return 0
multipliers = {'hours': 3600, 'days': 86400}
mult = multipliers.get(unit_str)
if mult is None:
return None
return 0
return n * mult
def _valid_for_to_display(valid_for):
def _session_seconds_to_display(session_seconds):
"""Return (value_str, unit_str) for form pre-population."""
if valid_for is None:
if not session_seconds:
return '0', 'hours'
if valid_for % 86400 == 0:
return str(valid_for // 86400), 'days'
if valid_for % 3600 == 0:
return str(valid_for // 3600), 'hours'
return str(valid_for // 3600 or 1), 'hours'
if session_seconds % 86400 == 0:
return str(session_seconds // 86400), 'days'
if session_seconds % 3600 == 0:
return str(session_seconds // 3600), 'hours'
return str(session_seconds // 3600 or 1), 'hours'
def _row_index():
@ -185,9 +186,9 @@ def addedit():
return redirect(f'/{_PAGE}')
enabled = 'enabled' in request.form
valid_for = _parse_valid_for(
request.form.get('valid_for_value', ''),
request.form.get('valid_for_unit', 'never'),
session_seconds = _parse_session_seconds(
request.form.get('session_seconds_value', ''),
request.form.get('session_seconds_unit', 'hours'),
)
if not username:
@ -222,10 +223,10 @@ def addedit():
conn.execute(
"""UPDATE credentials
SET username=?, password=?, description=?, user_type=?, digest_type=?,
vlan=?, enabled=?, date_set=?, valid_for=?
vlan=?, enabled=?, date_set=?, session_seconds=?
WHERE id=?""",
(username, stored_password, description, user_type, stored_digest_type,
vlan, int(enabled), date_set, valid_for, existing['id']),
vlan, int(enabled), date_set, session_seconds, existing['id']),
)
conn.commit()
except sqlite3.IntegrityError:
@ -251,10 +252,10 @@ def addedit():
try:
conn.execute(
"""INSERT INTO credentials
(username, password, description, user_type, digest_type, vlan, enabled, date_set, valid_for)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(username, password, description, user_type, digest_type, vlan, enabled, date_set, session_seconds, expires_seconds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(username, hashed, description, user_type, digest_type,
vlan, int(enabled), int(time.time()), valid_for),
vlan, int(enabled), int(time.time()), session_seconds, 0),
)
conn.commit()
except sqlite3.IntegrityError:

View file

@ -51,7 +51,7 @@
"client_requirement": "client_is_administrator+"
},
{
"label": "Expires",
"label": "Session",
"field": "expires_label",
"class": "col-narrow"
}
@ -171,7 +171,7 @@
{
"type": "field",
"label": "Valid For",
"name": "valid_for_value",
"name": "session_seconds_value",
"input_type": "number",
"min": 0,
"value": "0",
@ -180,7 +180,7 @@
{
"type": "field",
"label": "Unit",
"name": "valid_for_unit",
"name": "session_seconds_unit",
"input_type": "select",
"options": [
{"value": "hours", "label": "Hours"},

View file

@ -1,7 +1,6 @@
import json
import sqlite3
import time
import datetime
import config_utils
import factory
@ -27,7 +26,8 @@ def _load_credentials():
vlan TEXT NOT NULL DEFAULT '',
enabled INTEGER NOT NULL DEFAULT 1,
date_set INTEGER NOT NULL,
valid_for INTEGER DEFAULT NULL
session_seconds INTEGER NOT NULL DEFAULT 0,
expires_seconds INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
@ -48,15 +48,14 @@ def _load_credentials():
return []
def _format_expiry(date_set, valid_for):
if valid_for is None:
return 'Never'
expires_ts = date_set + valid_for
now = int(time.time())
if expires_ts <= now:
return 'Expired'
dt = datetime.datetime.fromtimestamp(expires_ts)
return dt.strftime('%Y-%m-%d %H:%M')
def _format_session(session_seconds):
if not session_seconds:
return 'No limit'
if session_seconds % 86400 == 0:
n = session_seconds // 86400
return f"{n} day{'s' if n != 1 else ''}"
hours = session_seconds / 3600
return f"{hours:g} h"
def collect_tokens(cfg):
@ -100,7 +99,7 @@ def collect_tokens(cfg):
r = dict(row)
r.pop('password', None)
r['user_type_label'] = USER_TYPE_LABELS.get(r.get('user_type'), str(r.get('user_type', '')))
r['expires_label'] = _format_expiry(r.get('date_set', 0), r.get('valid_for'))
r['expires_label'] = _format_session(r.get('session_seconds', 0))
display_rows.append(r)
content = factory.load_json(f'{factory.PAGES_DIR}/clientcredentials/content.json')

View file

@ -80,7 +80,7 @@ def _verify_credential(username, password, vlan_name):
return False
if row is None:
return False
if row['valid_for'] is not None and (row['date_set'] + row['valid_for']) < int(time.time()):
if row['session_seconds'] > 0 and (row['date_set'] + row['session_seconds']) < int(time.time()):
return False
if row['digest_type'] == DIGEST_HASH_BCRYPT:
try:

View file

@ -52,7 +52,7 @@ def _load_supplicant_credentials():
conn = sqlite3.connect(str(CREDENTIALS_DB_FILE))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT username, password, digest_type, vlan FROM credentials"
"SELECT username, password, digest_type, vlan, session_seconds, expires_seconds FROM credentials"
" WHERE user_type=? AND enabled=1",
(USER_TYPE_SUPPLICANT,)
).fetchall()
@ -132,6 +132,24 @@ def build_radius_clients_conf(data, secret):
return "\n".join(lines)
def _supplicant_reply_attrs(cred, default_session_seconds, vlan_id, vlan):
"""Return reply attribute lines for a supplicant entry (no trailing blank line)."""
import datetime
attrs = [
f" Tunnel-Type = VLAN,",
f" Tunnel-Medium-Type = IEEE-802,",
f" Tunnel-Private-Group-Id = \"{vlan_id}\"",
]
session = cred.get('session_seconds') or default_session_seconds
if session:
attrs.append(f" Session-Timeout = {session}")
expires = cred.get('expires_seconds') or 0
if expires:
dt = datetime.datetime.fromtimestamp(expires)
attrs.append(f" Expiration := \"{dt.strftime('%b %d %Y %H:%M:%S')}\"")
return attrs
def fmt_mac(raw, fmt):
c = raw.replace(':', '').replace('-', '').lower()
pairs = [c[i:i+2] for i in range(0, 12, 2)]
@ -165,6 +183,7 @@ def build_radius_users(data):
apply_to = fr_opts.get('apply_to', 'all')
auth_mode = fr_opts.get('auth_mode', 'mab')
mab_first = fr_opts.get('mab_first', True)
default_session_seconds = fr_opts.get('default_session_seconds', 0) or 0
emit_mac_entries = (auth_mode == 'mab') or mab_first
lines = [
@ -216,23 +235,13 @@ def build_radius_users(data):
except Exception:
print(f"WARNING: Skipping '{username}' - decryption failed")
continue
lines += [
f"# {username} -> VLAN {vlan_id} ({vlan['name']})",
f"{username} Cleartext-Password := \"{plaintext}\"",
f" Tunnel-Type = VLAN,",
f" Tunnel-Medium-Type = IEEE-802,",
f" Tunnel-Private-Group-Id = \"{vlan_id}\"",
"",
]
check_line = f"{username} Cleartext-Password := \"{plaintext}\""
reply = _supplicant_reply_attrs(cred, default_session_seconds, vlan_id, vlan)
lines += [f"# {username} -> VLAN {vlan_id} ({vlan['name']})", check_line] + reply + [""]
else: # eap_certificate - cert verified by TLS stack, entry provides VLAN reply attrs
lines += [
f"# {username} -> VLAN {vlan_id} ({vlan['name']})",
f"{username} Auth-Type := EAP",
f" Tunnel-Type = VLAN,",
f" Tunnel-Medium-Type = IEEE-802,",
f" Tunnel-Private-Group-Id = \"{vlan_id}\"",
"",
]
check_line = f"{username} Auth-Type := EAP"
reply = _supplicant_reply_attrs(cred, 0, vlan_id, vlan)
lines += [f"# {username} -> VLAN {vlan_id} ({vlan['name']})", check_line] + reply + [""]
default_id = default_vlan.get('vlan_id')
ap_ips = fr_opts.get('ap_ips', [])