linuxrouter/docker/routlin-dash/app/config_utils.py
2026-05-30 14:57:33 -04:00

580 lines
18 KiB
Python

import copy, json, subprocess, hashlib, os, uuid
import os as _os
from datetime import datetime, timezone
from flask import session
APP_DIR = _os.path.dirname(_os.path.abspath(__file__))
CONFIGS_DIR = '/routlin_location'
DATA_DIR = '/data'
WWW_DIR = '/www'
ACCOUNTS_FILE = f'{APP_DIR}/authorized_accounts.json'
CONFIG_FILE = f'{CONFIGS_DIR}/config.json'
DASHBOARD_QUEUE = f'{CONFIGS_DIR}/.dashboard-queue'
DASHBOARD_DONE = f'{CONFIGS_DIR}/.dashboard-done'
DASHBOARD_LAST_RUN = f'{CONFIGS_DIR}/.dashboard-last-run'
DASHBOARD_LOCK = f'{CONFIGS_DIR}/.dashboard-lock'
DASHBOARD_PENDING = f'{CONFIGS_DIR}/.dashboard-pending'
DASHBOARD_DB = f'{CONFIGS_DIR}/.dashboard-snapshots'
HEALTH_FILE = f'{CONFIGS_DIR}/.health'
PRODUCT_NAME = os.environ.get('PRODUCT_NAME', 'routlin')
DASHB_TIMER_NAME = f'{PRODUCT_NAME}-dashboard-queue'
DDNS_TIMER_NAME = f'{PRODUCT_NAME}-ddns-update'
WEB_APP_DISPLAY_NAME = os.environ.get('WEB_APP_DISPLAY_NAME', f'{PRODUCT_NAME.capitalize()} Dashboard')
DASHB_INTERVAL_SECS = 60
QUEUE_MAX_LINES = 50
def load_config():
try:
with open(CONFIG_FILE) as f:
return json.load(f)
except Exception:
return {}
def save_config(data):
with open(CONFIG_FILE, 'w') as f:
json.dump(data, f, indent=2)
def config_hash():
try:
with open(CONFIG_FILE, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except Exception:
return ''
def verify_config_hash(submitted):
if not submitted:
return True
return submitted == config_hash()
def _load_done_set():
try:
done = set()
for line in open(DASHBOARD_DONE).read().splitlines():
parts = line.split()
if parts:
done.add(parts[0])
return done
except Exception:
return set()
def _read_pending(done_set):
pending = []
try:
lines = open(DASHBOARD_QUEUE).read().splitlines()
except Exception:
return pending
for line in lines:
try:
parts = line.split(None, 2)
if len(parts) == 3:
entry_uuid, entry_ts, rest = parts
cmd_user = rest.rsplit(' (', 1)
entry_cmd = cmd_user[0].strip('[]')
entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else ''
if entry_uuid not in done_set:
pending.append((entry_uuid, int(entry_ts), entry_cmd, entry_user))
except Exception:
pass
return pending
def get_pending_entries():
return _read_pending(_load_done_set())
def _format_timing(secs):
if secs is None:
return None
if secs <= 5:
return 'momentarily'
if secs < 60:
return f'in about {secs} seconds'
mins = round(secs / 60)
return f'in about {mins} minute{"s" if mins != 1 else ""}'
def _trim_if_needed():
try:
lines = [l for l in open(DASHBOARD_QUEUE).read().splitlines() if l]
if len(lines) <= QUEUE_MAX_LINES:
return
done_set = _load_done_set()
pending = [l for l in lines if l.split()[0] not in done_set]
with open(DASHBOARD_QUEUE, 'w') as f:
f.write('\n'.join(pending) + ('\n' if pending else ''))
open(DASHBOARD_DONE, 'w').close()
except Exception:
pass
def _apply_changes_immediately():
try:
return session.get('apply_changes_immediately', False)
except Exception:
return False
def _read_dashboard_pending():
"""Return list of (uuid, ts, cmd, user) from .dashboard-pending."""
items = []
try:
lines = open(DASHBOARD_PENDING).read().splitlines()
except Exception:
return items
for line in lines:
if not line.strip():
continue
try:
parts = line.split(None, 2)
if len(parts) == 3:
entry_uuid, entry_ts, rest = parts
cmd_user = rest.rsplit(' (', 1)
entry_cmd = cmd_user[0].strip('[]')
entry_user = cmd_user[1].rstrip(')') if len(cmd_user) == 2 else ''
items.append((entry_uuid, int(entry_ts), entry_cmd, entry_user))
except Exception:
pass
return items
def get_dashboard_pending():
return _read_dashboard_pending()
def get_dashboard_done():
"""Return list of (uuid, applied_ts) from .dashboard-done, newest first."""
items = []
try:
lines = open(DASHBOARD_DONE).read().splitlines()
except Exception:
return items
for line in lines:
if not line.strip():
continue
try:
parts = line.split(None, 1)
if len(parts) >= 2:
items.append((parts[0], int(parts[1])))
elif len(parts) == 1:
items.append((parts[0], None))
except Exception:
pass
items.reverse()
return items
def get_done_timestamps():
"""Return dict of {uuid: applied_ts} from .dashboard-done."""
result = {}
try:
for line in open(DASHBOARD_DONE).read().splitlines():
if not line.strip():
continue
parts = line.split(None, 1)
if len(parts) >= 2:
result[parts[0]] = int(parts[1])
elif len(parts) == 1:
result[parts[0]] = None
except Exception:
pass
return result
def load_all_snapshots():
"""Return all snapshot dicts from .snapshots/, sorted newest first."""
snaps = []
try:
for fname in sorted(os.listdir(SNAPSHOTS_DIR), reverse=True):
if not fname.endswith('.json'):
continue
try:
with open(os.path.join(SNAPSHOTS_DIR, fname)) as f:
snaps.append(json.load(f))
except Exception:
pass
except Exception:
pass
return snaps
def flush_pending_to_queue():
"""Move all entries from .dashboard-pending to .dashboard-queue and clear pending."""
items = _read_dashboard_pending()
if not items:
return
done_set = _load_done_set()
existing_ids = {uu for uu, *_ in _read_pending(done_set)}
with open(DASHBOARD_QUEUE, 'a') as f:
for entry_uuid, entry_ts, entry_cmd, entry_user in items:
if entry_uuid not in existing_ids:
f.write(f'{entry_uuid} {entry_ts} [{entry_cmd}] ({entry_user})\n')
open(DASHBOARD_PENDING, 'w').close()
_trim_if_needed()
def _queue_pending_command(cmd, user=None):
"""Append cmd to .dashboard-pending if not already present for this cmd+user."""
existing = _read_dashboard_pending()
current_user = user or session.get('email_address', 'unknown')
for entry_uuid, entry_ts, entry_cmd, entry_user in existing:
if entry_cmd == cmd and entry_user == current_user:
return entry_uuid, entry_ts
entry_uuid = str(uuid.uuid4())
entry_ts = int(datetime.now().timestamp())
with open(DASHBOARD_PENDING, 'a') as f:
f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n')
return entry_uuid, entry_ts
def _queue_pending_presigned(cmd, entry_uuid, entry_ts):
"""Write a pre-generated entry to .dashboard-pending without dedup."""
current_user = session.get('email_address', 'unknown')
with open(DASHBOARD_PENDING, 'a') as f:
f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n')
def _queue_command(cmd, user=None):
if not _apply_changes_immediately():
return _queue_pending_command(cmd, user=user)
done_set = _load_done_set()
pending = _read_pending(done_set)
current_user = user or session.get('email_address', 'unknown')
for entry_uuid, entry_ts, entry_cmd, entry_user in pending:
if entry_cmd == cmd and entry_user == current_user:
return entry_uuid, entry_ts
entry_uuid = str(uuid.uuid4())
entry_ts = int(datetime.now().timestamp())
with open(DASHBOARD_QUEUE, 'a') as f:
f.write(f'{entry_uuid} {entry_ts} [{cmd}] ({current_user})\n')
_trim_if_needed()
return entry_uuid, entry_ts
def _find_cmd_in_queues(cmd):
"""Return (uuid, ts) of first matching entry in .dashboard-pending or .dashboard-queue, or (None, None)."""
for entry_uuid, entry_ts, entry_cmd, entry_user in _read_dashboard_pending():
if entry_cmd == cmd:
return entry_uuid, entry_ts
done_set = _load_done_set()
for entry_uuid, entry_ts, entry_cmd, entry_user in _read_pending(done_set):
if entry_cmd == cmd:
return entry_uuid, entry_ts
return None, None
def _entry_ts_from_queue(entry_uuid):
try:
for line in open(DASHBOARD_QUEUE).read().splitlines():
parts = line.split(None, 2)
if len(parts) >= 2 and parts[0] == entry_uuid:
return int(parts[1])
except Exception:
pass
return None
def _seconds_until_next_run():
try:
last_run = float(open(DASHBOARD_LAST_RUN).read().strip())
elapsed = datetime.now(timezone.utc).timestamp() - last_run
return int(max(0, DASHB_INTERVAL_SECS - elapsed))
except Exception:
return None
def _is_locked():
try:
return os.path.getsize(DASHBOARD_LOCK) > 0
except Exception:
return False
def _lock_mtime():
try:
return os.path.getmtime(DASHBOARD_LOCK)
except Exception:
return None
def _timing_status_msg(entry_ts, action_label):
"""Return a flash message for a command already written to the queue."""
if _is_locked():
mtime = _lock_mtime()
if entry_ts is not None and mtime and entry_ts < mtime:
return f'{action_label}. Your changes are being applied now...'
return f'{action_label}. Your changes will be applied on the next run.'
timing = _format_timing(_seconds_until_next_run())
if timing:
return f'{action_label}. Your changes will be applied {timing}.'
return f'{action_label}. The processing service is not running.'
def _build_timing_msg(entry_ts, action_label='Configuration saved'):
if not _apply_changes_immediately():
from markupsafe import Markup
return Markup(f'{action_label}. Visit the <strong>Actions</strong> page to apply your changes.')
return _timing_status_msg(entry_ts, action_label)
def queue_command(cmd, description='', user=None):
"""Queue a command without generating a flash message. description is ignored (kept for compat)."""
return _queue_command(cmd, user=user)
def queued_msg(cmd=None, description='', action_label='Configuration saved'):
"""Queue cmd if given, then return a timing message. description is ignored."""
entry_ts = None
if cmd is not None:
_entry_uuid, entry_ts = queue_command(cmd)
return _build_timing_msg(entry_ts, action_label)
# Snapshot system ===================================================
import re as _re
import sqlite3 as _sqlite3
def _db():
conn = _sqlite3.connect(DASHBOARD_DB)
conn.row_factory = _sqlite3.Row
conn.execute('PRAGMA journal_mode=WAL')
conn.executescript('''
CREATE TABLE IF NOT EXISTS groups (
uuid TEXT PRIMARY KEY,
ts INTEGER NOT NULL,
cmd TEXT,
user TEXT,
parent_path TEXT NOT NULL,
item_key TEXT,
item_value TEXT,
reverts_group TEXT
);
CREATE TABLE IF NOT EXISTS changes (
group_id TEXT NOT NULL REFERENCES groups(uuid),
field TEXT NOT NULL,
before TEXT,
after TEXT,
value_type TEXT NOT NULL,
PRIMARY KEY (group_id, field)
);
CREATE INDEX IF NOT EXISTS idx_changes_group ON changes(group_id);
''')
return conn
def _py_value_type(val):
if val is None: return 'null'
if isinstance(val, bool): return 'bool'
if isinstance(val, int): return 'int'
if isinstance(val, float): return 'float'
if isinstance(val, (dict, list)): return 'json'
return 'str'
def _serialize_value(val):
if val is None:
return None
if isinstance(val, bool):
return 'true' if val else 'false'
if isinstance(val, (dict, list)):
return json.dumps(val, separators=(',', ':'))
return str(val)
def _deserialize_value(text, value_type):
if text is None:
return None
if value_type == 'int': return int(text)
if value_type == 'float': return float(text)
if value_type == 'bool': return text == 'true'
if value_type in ('json', 'null'): return json.loads(text)
return text
def diff_fields(before_dict, after_dict):
"""Return list of (field, before_text, after_text, value_type) for changed fields."""
bd = before_dict or {}
ad = after_dict or {}
result = []
for key in sorted(set(bd) | set(ad)):
bval = bd.get(key)
aval = ad.get(key)
if bval == aval:
continue
ref = aval if aval is not None else bval
result.append((
key,
_serialize_value(bval),
_serialize_value(aval),
_py_value_type(ref),
))
return result
_PATH_SEG = _re.compile(r'([^\.\[]+)(?:\[([^\]=]+)=([^\]]+)\])?')
def _parse_path(path):
"""Parse 'vlans[name=trusted].field' into [(field, sel_key, sel_val), ...]."""
return [(m.group(1), m.group(2), m.group(3)) for m in _PATH_SEG.finditer(path)]
def _nav_get(cfg, path):
"""Navigate config to the value at path."""
for field, sel_key, sel_val in _parse_path(path):
cfg = cfg[field]
if sel_key:
cfg = next(x for x in cfg if str(x.get(sel_key, '')) == str(sel_val))
return cfg
def _nav_parent(cfg, path):
"""Return (parent_obj, final_key) for setting/deleting the last path segment."""
segs = _parse_path(path)
for field, sel_key, sel_val in segs[:-1]:
cfg = cfg[field]
if sel_key:
cfg = next(x for x in cfg if str(x.get(sel_key, '')) == str(sel_val))
return cfg, segs[-1][0]
def record_group(cfg, parent_path, item_key, item_value, changes, cmd,
reverts_group=None, queue=True):
"""Insert a group + changes into sqlite, save config, and queue the command.
Returns a flash message string.
"""
group_uuid = str(uuid.uuid4())
entry_ts = int(datetime.now().timestamp())
current_user = session.get('email_address', 'unknown')
conn = _db()
try:
conn.execute(
'INSERT INTO groups '
'(uuid,ts,cmd,user,parent_path,item_key,item_value,reverts_group) '
'VALUES (?,?,?,?,?,?,?,?)',
(group_uuid, entry_ts, cmd, current_user,
parent_path, item_key, item_value, reverts_group)
)
for field, before, after, value_type in changes:
conn.execute(
'INSERT INTO changes (group_id,field,before,after,value_type) '
'VALUES (?,?,?,?,?)',
(group_uuid, field, before, after, value_type)
)
conn.commit()
finally:
conn.close()
save_config(cfg)
if not queue:
with open(DASHBOARD_DONE, 'a') as f:
f.write(f'{group_uuid} {entry_ts}\n')
return 'Saved.'
if _apply_changes_immediately():
with open(DASHBOARD_QUEUE, 'a') as f:
f.write(f'{group_uuid} {entry_ts} [{cmd}] ({current_user})\n')
_trim_if_needed()
else:
_queue_pending_presigned(cmd, group_uuid, entry_ts)
return _build_timing_msg(entry_ts)
def load_all_groups():
"""Return list of (group_dict, [change_dicts]) sorted newest first."""
conn = _db()
try:
gs = conn.execute('SELECT * FROM groups ORDER BY ts DESC').fetchall()
result = []
for g in gs:
cs = conn.execute(
'SELECT * FROM changes WHERE group_id=? ORDER BY field', (g['uuid'],)
).fetchall()
result.append((dict(g), [dict(c) for c in cs]))
return result
finally:
conn.close()
def revert_group(group_uuid):
"""Revert a change group. Returns (flash_message, success_bool)."""
conn = _db()
try:
g = conn.execute('SELECT * FROM groups WHERE uuid=?', (group_uuid,)).fetchone()
if not g:
return f'Snapshot not found for {group_uuid[:8]}.', False
g = dict(g)
changes = [dict(c) for c in conn.execute(
'SELECT * FROM changes WHERE group_id=?', (group_uuid,)
).fetchall()]
finally:
conn.close()
if g['reverts_group']:
return 'Cannot revert a revert.', False
cfg = load_config()
parent_path = g['parent_path']
item_key = g['item_key']
item_value = g['item_value']
all_before_null = all(c['before'] is None for c in changes)
all_after_null = all(c['after'] is None for c in changes)
if all_before_null:
parent_obj, lst_key = _nav_parent(cfg, parent_path)
parent_obj[lst_key] = [
x for x in parent_obj[lst_key]
if str(x.get(item_key, '')) != str(item_value)
]
elif all_after_null:
item = {c['field']: _deserialize_value(c['before'], c['value_type']) for c in changes}
_nav_get(cfg, parent_path).append(item)
else:
item_path = f'{parent_path}[{item_key}={item_value}]' if item_key else parent_path
for c in changes:
parent_obj, field = _nav_parent(cfg, f'{item_path}.{c["field"]}')
if c['before'] is None:
parent_obj.pop(field, None)
else:
parent_obj[field] = _deserialize_value(c['before'], c['value_type'])
inv = [(c['field'], c['after'], c['before'], c['value_type']) for c in changes]
msg = record_group(cfg, parent_path, item_key, item_value, inv,
g['cmd'], reverts_group=group_uuid)
return msg, True
# Misc ==============================================================
def run_apply():
try:
subprocess.run(
['python3', f'{CONFIGS_DIR}/core.py', '--apply'],
capture_output=True, timeout=30
)
except Exception:
pass
def run_update_blocklists():
try:
subprocess.run(
['python3', f'{CONFIGS_DIR}/core.py', '--update-blocklists'],
capture_output=True, timeout=120
)
except Exception:
pass