'
- f'{e(uuid[:8])}'
- f'{e(uuid[:8])}'
- f'{e(uuid[:8])}'
+ f''
+ f'{factory.e(uuid[:8])}'
+ f'{factory.e(uuid[:8])}'
+ f'{factory.e(uuid[:8])}'
'
'
)
- snap_user = e(g.get('user', ''))
+ snap_user = factory.e(g.get('user', ''))
cb_attrs = '' if is_manager else ('disabled title="Cannot revert"' if uuid in no_revert else '')
hist_rows += (
- f''
- f' | '
- f'{e(dt_str)} | '
+ f'
'
+ f' | '
+ f'{factory.e(dt_str)} | '
f'{summary} | '
- f'{build_snap_val(changes)} | '
+ f'{factory.build_snap_val(changes)} | '
f'{snap_tag} | '
f'{snap_user} | '
'
'
- f'{snap_expand_row(changes, 6)}'
+ f'{factory.snap_expand_row(changes, 6)}'
)
select_all = (
'= len(items):
flash('Entry not found.', 'error')
@@ -87,13 +87,13 @@ def table_toggle():
return redirect(f'/{_PAGE}')
ip = items[idx]['ip']
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'banned_ips', 'ip', ip, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'banned_ips', 'ip', ip, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/bannedips/table_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_edit():
idx = _row_index()
if idx is None:
@@ -109,7 +109,7 @@ def table_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('banned_ips', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -123,13 +123,13 @@ def table_edit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'banned_ips', 'ip', ip, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'banned_ips', 'ip', ip, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/bannedips/table_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_delete():
idx = _row_index()
if idx is None:
@@ -138,7 +138,7 @@ def table_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('banned_ips', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -151,6 +151,6 @@ def table_delete():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'banned_ips', 'ip', removed['ip'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'banned_ips', 'ip', removed['ip'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/bannedips/view.py b/docker/routlin-dash/app/pages/bannedips/view.py
index c3529b6..800c759 100644
--- a/docker/routlin-dash/app/pages/bannedips/view.py
+++ b/docker/routlin-dash/app/pages/bannedips/view.py
@@ -1,11 +1,11 @@
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
- content = load_json(f'{PAGES_DIR}/bannedips/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ tokens = config_utils.collect_layout_tokens(cfg)
+ content = factory.load_json(f'{factory.PAGES_DIR}/bannedips/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/clientcredentials/action.py b/docker/routlin-dash/app/pages/clientcredentials/action.py
index 99e0a95..59e83f8 100644
--- a/docker/routlin-dash/app/pages/clientcredentials/action.py
+++ b/docker/routlin-dash/app/pages/clientcredentials/action.py
@@ -5,7 +5,7 @@ from pathlib import Path
import bcrypt
from flask import Blueprint, request, redirect, flash
-from auth import require_level
+import auth
import config_utils
import sanitize
import settings
@@ -130,7 +130,7 @@ def _row_index():
# ===================================================================
@bp.route('/action/clientcredentials/addedit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addedit():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
@@ -259,7 +259,7 @@ def addedit():
@bp.route('/action/clientcredentials/delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def delete():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
@@ -285,7 +285,7 @@ def delete():
@bp.route('/action/clientcredentials/toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def toggle():
if not PRO_LICENSE:
flash('Client Credentials requires a Routlin Pro license.', 'error')
diff --git a/docker/routlin-dash/app/pages/clientcredentials/view.py b/docker/routlin-dash/app/pages/clientcredentials/view.py
index df59179..0806b45 100644
--- a/docker/routlin-dash/app/pages/clientcredentials/view.py
+++ b/docker/routlin-dash/app/pages/clientcredentials/view.py
@@ -3,9 +3,9 @@ import sqlite3
import time
import datetime
-from config_utils import collect_layout_tokens, CREDENTIALS_DB
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
-import settings as settings
+import config_utils
+import factory
+import settings
PRO_LICENSE = settings.is_pro()
@@ -15,7 +15,7 @@ HASH_TYPE_LABELS = {0: 'Cleartext', 1: 'NT-Password', 2: 'Bcrypt'}
def _load_credentials():
try:
- conn = sqlite3.connect(CREDENTIALS_DB)
+ conn = sqlite3.connect(config_utils.CREDENTIALS_DB)
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS credentials (
@@ -61,7 +61,7 @@ def _format_expiry(date_set, valid_for):
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
tokens['PRO_NOTE'] = (
'' if PRO_LICENSE else
@@ -92,10 +92,10 @@ def collect_tokens(cfg):
r['expires_label'] = _format_expiry(r.get('date_set', 0), r.get('valid_for'))
display_rows.append(r)
- content = load_json(f'{PAGES_DIR}/clientcredentials/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/clientcredentials/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
data = display_rows if ds == 'sqlite:client_credentials' else []
- tokens[table_token_key(ds)] = build_table(table_item, tokens, data)
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, data)
return tokens
diff --git a/docker/routlin-dash/app/pages/ddns/action.py b/docker/routlin-dash/app/pages/ddns/action.py
index db109c5..d15bc1d 100644
--- a/docker/routlin-dash/app/pages/ddns/action.py
+++ b/docker/routlin-dash/app/pages/ddns/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
import os
from flask import Blueprint, request, redirect, flash, send_file, abort
-from auth import require_level
-from config_utils import load_config, verify_config_hash, record_group, diff_fields, CONFIGS_DIR
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -11,11 +11,11 @@ _PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
-LOG_FILE = f'{CONFIGS_DIR}/ddns.log'
+LOG_FILE = f'{config_utils.CONFIGS_DIR}/ddns.log'
@bp.route('/action/ddns/addaccount_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addaccount_add():
provider_type = sanitize.filtervalue(request.form.get('provider', ''), validate.VALID_DDNS_PROVIDERS)
description = sanitize.description(request.form.get('description', ''))
@@ -31,7 +31,7 @@ def addaccount_add():
flash('Unknown provider type.', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
@@ -47,15 +47,15 @@ def addaccount_add():
else:
entry['api_token'] = request.form.get('api_token', '').strip()
- cfg = load_config()
+ cfg = config_utils.load_config()
cfg.setdefault('ddns', {}).setdefault('providers', []).append(entry)
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/accounts_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def accounts_edit():
try:
row_index = int(request.form.get('row_index', -1))
@@ -72,11 +72,11 @@ def accounts_edit():
flash('Unknown provider type.', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
providers = cfg.setdefault('ddns', {}).setdefault('providers', [])
if row_index < 0 or row_index >= len(providers):
flash('Invalid provider index.', 'error')
@@ -96,13 +96,13 @@ def accounts_edit():
entry['api_token'] = request.form.get('api_token', '').strip()
providers[row_index] = entry
- changes = diff_fields(before, entry)
- flash(record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
+ changes = config_utils.diff_fields(before, entry)
+ flash(config_utils.record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/accounts_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def accounts_delete():
try:
row_index = int(request.form.get('row_index', -1))
@@ -110,11 +110,11 @@ def accounts_delete():
flash('Invalid row index.', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
providers = cfg.setdefault('ddns', {}).setdefault('providers', [])
if row_index < 0 or row_index >= len(providers):
flash('Invalid provider index.', 'error')
@@ -123,13 +123,13 @@ def accounts_delete():
before = copy.deepcopy(providers[row_index])
description = before.get('description', str(row_index))
del providers[row_index]
- changes = diff_fields(before, None)
- flash(record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
+ changes = config_utils.diff_fields(before, None)
+ flash(config_utils.record_group(cfg, 'ddns.providers', 'description', description, changes, 'ddns update', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/ipcheckinterval_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def ipcheckinterval_save():
raw = request.form.get('timer_interval', '').strip()
try:
@@ -141,22 +141,22 @@ def ipcheckinterval_save():
return redirect(f'/{_PAGE}')
timer_interval = f'{mins}m'
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('ddns', {}).get('general', {}))
cfg.setdefault('ddns', {}).setdefault('general', {})['timer_interval'] = timer_interval
- changes = diff_fields(before, cfg['ddns']['general'])
- flash(record_group(cfg, 'ddns.general', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['ddns']['general'])
+ flash(config_utils.record_group(cfg, 'ddns.general', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/ipcheckservices_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def ipcheckservices_save():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
@@ -167,18 +167,18 @@ def ipcheckservices_save():
flash('At least one IP check service is required.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('ddns', {}).get('ip_check_services', []))
services = [{'type': 'http', 'url': u} for u in http_services]
services += [{'type': 'dig', 'url': u} for u in dig_services]
cfg.setdefault('ddns', {})['ip_check_services'] = services
- changes = diff_fields({'ip_check_services': before}, {'ip_check_services': services})
- flash(record_group(cfg, 'ddns', None, None, changes, 'ddns update', queue=False), 'success')
+ changes = config_utils.diff_fields({'ip_check_services': before}, {'ip_check_services': services})
+ flash(config_utils.record_group(cfg, 'ddns', None, None, changes, 'ddns update', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/logging_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_save():
log_max_kb = validate.int_range(request.form.get('log_max_kb', '').strip(), 64, None)
if log_max_kb is None:
@@ -186,23 +186,23 @@ def logging_save():
return redirect(f'/{_PAGE}')
log_errors_only = 'log_errors_only' in request.form
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('ddns', {}).get('general', {}))
cfg.setdefault('ddns', {}).setdefault('general', {}).update({
'log_max_kb': log_max_kb,
'log_errors_only': log_errors_only,
})
- changes = diff_fields(before, cfg['ddns']['general'])
- flash(record_group(cfg, 'ddns.general', None, None, changes, 'ddns update', queue=False), 'success')
+ changes = config_utils.diff_fields(before, cfg['ddns']['general'])
+ flash(config_utils.record_group(cfg, 'ddns.general', None, None, changes, 'ddns update', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/ddns/logging_clear', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_clear():
try:
open(LOG_FILE, 'w').close()
@@ -213,7 +213,7 @@ def logging_clear():
@bp.route('/action/ddns/logging_download', methods=['GET'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_download():
if not os.path.isfile(LOG_FILE):
abort(404)
diff --git a/docker/routlin-dash/app/pages/ddns/view.py b/docker/routlin-dash/app/pages/ddns/view.py
index f880db9..0ded9c1 100644
--- a/docker/routlin-dash/app/pages/ddns/view.py
+++ b/docker/routlin-dash/app/pages/ddns/view.py
@@ -1,10 +1,8 @@
import json
import re
import os
-from config_utils import (
- collect_layout_tokens, load_datasource, CONFIGS_DIR, relative_time,
-)
-from factory import load_ddns, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
import mod_validation as validate
@@ -20,9 +18,9 @@ def _parse_interval_to_seconds(s):
def _ddns_log_tail():
- log_path = f'{CONFIGS_DIR}/ddns.log'
+ log_path = f'{config_utils.CONFIGS_DIR}/ddns.log'
try:
- log_max_kb = load_ddns().get('general', {}).get('log_max_kb', 1024)
+ log_max_kb = factory.load_ddns().get('general', {}).get('log_max_kb', 1024)
size_kb = os.path.getsize(log_path) / 1024
with open(log_path) as f:
lines = f.readlines()
@@ -49,9 +47,9 @@ def _ddns_log_tail():
def _read_cached_ip():
try:
best_ip, best_mtime = '', 0.0
- for fname in os.listdir(CONFIGS_DIR):
+ for fname in os.listdir(config_utils.CONFIGS_DIR):
if fname.startswith('.ddns-last-ip-'):
- path = f'{CONFIGS_DIR}/{fname}'
+ path = f'{config_utils.CONFIGS_DIR}/{fname}'
mtime = os.path.getmtime(path)
if mtime > best_mtime:
ip = open(path).read().strip()
@@ -70,7 +68,7 @@ def public_ip_info(ddns_cfg):
all_hosts.extend(p.get('hostnames', p.get('subdomains', [])))
domains_sub = ', '.join(all_hosts)
ip, mtime = _read_cached_ip()
- last_obtained = f'Obtained: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago' if mtime else ''
+ last_obtained = f'Obtained: {config_utils.relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago' if mtime else ''
if ip:
return ip, domains_sub, last_obtained
return 'Offline', domains_sub, ''
@@ -79,15 +77,15 @@ def public_ip_info(ddns_cfg):
def ddns_last_checked():
from datetime import datetime, timezone
try:
- mtime = os.path.getmtime(f'{CONFIGS_DIR}/.ddns-last-service')
- return f'Last checked: {relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago'
+ mtime = os.path.getmtime(f'{config_utils.CONFIGS_DIR}/.ddns-last-service')
+ return f'Last checked: {config_utils.relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago'
except OSError:
return 'Last checked: ---'
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
- ddns = load_ddns()
+ tokens = config_utils.collect_layout_tokens(cfg)
+ ddns = factory.load_ddns()
ddns_gen = ddns.get('general', {})
tokens['DDNS_TIMER_INTERVAL'] = ddns_gen.get('timer_interval', '-')
interval_secs = _parse_interval_to_seconds(ddns_gen.get('timer_interval', '')) or 600
@@ -111,8 +109,8 @@ def collect_tokens(cfg):
tokens['STAT_PUBLIC_IP_LAST_OBTAINED'] = last_obtained
tokens['STAT_PUBLIC_IP_LAST_CHECKED'] = ddns_last_checked()
tokens['DDNS_LOG_TAIL'], tokens['DDNS_LOG_SUMMARY'] = _ddns_log_tail()
- content = load_json(f'{PAGES_DIR}/ddns/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/ddns/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/dhcpleases/view.py b/docker/routlin-dash/app/pages/dhcpleases/view.py
index 266be71..66a8582 100644
--- a/docker/routlin-dash/app/pages/dhcpleases/view.py
+++ b/docker/routlin-dash/app/pages/dhcpleases/view.py
@@ -3,10 +3,8 @@ import json
import os
import glob
from datetime import datetime, timezone
-from config_utils import collect_layout_tokens, load_config, relative_time
-from factory import (
- load_json, build_table, table_token_key, iter_table_items, PAGES_DIR, e,
-)
+import config_utils
+import factory
try:
import manuf as _manuf_mod
@@ -42,8 +40,8 @@ def _vendor_cell(vendor):
if not display:
return '-'
if long:
- return f'{e(display)}'
- return e(display)
+ return f'{factory.e(display)}'
+ return factory.e(display)
def _get_arp_table():
@@ -93,7 +91,7 @@ def _parse_lease_secs(s):
def live_dhcp_leases():
rows = []
now = int(datetime.now(tz=timezone.utc).timestamp())
- cfg = load_config()
+ cfg = config_utils.load_config()
vlans = cfg.get('vlans', [])
arp_table = _get_arp_table()
lease_macs = set()
@@ -130,9 +128,9 @@ def live_dhcp_leases():
if obtained_ts is None:
lease_renewed = '-'
elif obtained_ts <= now:
- lease_renewed = relative_time(obtained_ts, now, short=True) + ' ago'
+ lease_renewed = config_utils.relative_time(obtained_ts, now, short=True) + ' ago'
elif renews_ts and renews_ts > now:
- lease_renewed = 'ETA ' + relative_time(renews_ts, now, short=True)
+ lease_renewed = 'ETA ' + config_utils.relative_time(renews_ts, now, short=True)
else:
lease_renewed = 'ETA soon'
mac_norm = parts[1].lower()
@@ -141,8 +139,8 @@ def live_dhcp_leases():
desc = mac_to_desc.get(mac_norm)
name = res_h or device_h
if name:
- desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else ''
- hostname_html = f'{e(name)}' if desc_attr else e(name)
+ desc_attr = f' data-hostname-desc="{factory.e(desc)}"' if desc else ''
+ hostname_html = f'{factory.e(name)}' if desc_attr else factory.e(name)
else:
hostname_html = '-'
arp_entry = arp_table.get(mac_norm, {})
@@ -154,7 +152,7 @@ def live_dhcp_leases():
'vendor': _vendor_cell(_get_vendor(parts[1])),
'vlan_name': vlan_name,
'lease_renewed': lease_renewed,
- 'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
+ 'renews': 'in ' + config_utils.relative_time(renews_ts or expiry, now, short=True),
'status': _status_badge(arp_entry.get('state', '')),
})
except Exception:
@@ -178,16 +176,16 @@ def live_dhcp_leases():
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
filter_opts = '' + ''.join(
f'' for n in vlan_names
)
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
- content = load_json(f'{PAGES_DIR}/dhcpleases/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/dhcpleases/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
rows = live_dhcp_leases() if ds == 'live:dhcp_leases' else []
- tokens[table_token_key(ds)] = build_table(table_item, tokens, rows)
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, rows)
return tokens
diff --git a/docker/routlin-dash/app/pages/dhcpreservations/action.py b/docker/routlin-dash/app/pages/dhcpreservations/action.py
index 1324b72..e9a8601 100644
--- a/docker/routlin-dash/app/pages/dhcpreservations/action.py
+++ b/docker/routlin-dash/app/pages/dhcpreservations/action.py
@@ -3,8 +3,8 @@ import copy
import ipaddress
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -20,7 +20,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -59,7 +59,7 @@ def _check_ip_in_vlan_subnet(ip, vlan):
@bp.route('/action/dhcpreservations/addreservation_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addreservation_add():
vlan_name = sanitize.name(request.form.get('vlan_name', ''))
description = sanitize.text(request.form.get('description', ''))
@@ -79,7 +79,7 @@ def addreservation_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlans = cfg.get('vlans', [])
vlan = next((v for v in vlans if v.get('name') == vlan_name), None)
if vlan is None:
@@ -113,13 +113,13 @@ def addreservation_add():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcpreservations/reservations_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def reservations_toggle():
idx = _row_index()
if idx is None:
@@ -128,7 +128,7 @@ def reservations_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -144,13 +144,13 @@ def reservations_toggle():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, res)
- flash(record_group(cfg, 'dhcp_reservations', 'mac', res['mac'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, res)
+ flash(config_utils.record_group(cfg, 'dhcp_reservations', 'mac', res['mac'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcpreservations/reservations_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def reservations_edit():
idx = _row_index()
if idx is None:
@@ -171,7 +171,7 @@ def reservations_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -208,13 +208,13 @@ def reservations_edit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, res)
- flash(record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, res)
+ flash(config_utils.record_group(cfg, 'dhcp_reservations', 'mac', mac, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dhcpreservations/reservations_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def reservations_delete():
idx = _row_index()
if idx is None:
@@ -223,7 +223,7 @@ def reservations_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('dhcp_reservations', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -236,6 +236,6 @@ def reservations_delete():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'dhcp_reservations', 'mac', removed['mac'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'dhcp_reservations', 'mac', removed['mac'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/dhcpreservations/view.py b/docker/routlin-dash/app/pages/dhcpreservations/view.py
index 15f6e99..0c5ee05 100644
--- a/docker/routlin-dash/app/pages/dhcpreservations/view.py
+++ b/docker/routlin-dash/app/pages/dhcpreservations/view.py
@@ -1,10 +1,10 @@
import json
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
res_ips_by_vlan, res_hosts_by_vlan = {}, {}
@@ -26,8 +26,8 @@ def collect_tokens(cfg):
})
tokens['RESERVATION_IPS_BY_VLAN_JSON'] = json.dumps(res_ips_by_vlan)
tokens['RESERVATION_HOSTNAMES_BY_VLAN_JSON'] = json.dumps(res_hosts_by_vlan)
- content = load_json(f'{PAGES_DIR}/dhcpreservations/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/dhcpreservations/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/dnsblocking/action.py b/docker/routlin-dash/app/pages/dnsblocking/action.py
index 37fb76c..9f200d0 100644
--- a/docker/routlin-dash/app/pages/dnsblocking/action.py
+++ b/docker/routlin-dash/app/pages/dnsblocking/action.py
@@ -2,12 +2,12 @@ from pathlib import Path
import copy
import re
from flask import Blueprint, request, redirect, flash, send_file
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash, queued_msg, CONFIGS_DIR
+import auth
+import config_utils
import sanitize
import mod_validation as validate
-DNS_LOG_FILE = Path(CONFIGS_DIR) / 'dns-blocklists.log'
+DNS_LOG_FILE = Path(config_utils.CONFIGS_DIR) / 'dns-blocklists.log'
_PAGE = Path(__file__).parent.name
@@ -24,7 +24,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -56,7 +56,7 @@ def _parse_fields():
@bp.route('/action/dnsblocking/blocklists_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def blocklists_delete():
idx = _row_index()
if idx is None:
@@ -66,7 +66,7 @@ def blocklists_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('dns_blocking', {}).get('blocklists', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -80,13 +80,13 @@ def blocklists_delete():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, None)
- flash(record_group(cfg, 'dns_blocking.blocklists', 'name', name, changes, 'core apply', queue=False), 'success')
+ changes = config_utils.diff_fields(before, None)
+ flash(config_utils.record_group(cfg, 'dns_blocking.blocklists', 'name', name, changes, 'core apply', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/blocklists_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def blocklists_edit():
idx = _row_index()
if idx is None:
@@ -100,7 +100,7 @@ def blocklists_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('dns_blocking', {}).get('blocklists', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -125,13 +125,13 @@ def blocklists_edit():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'dns_blocking.blocklists', 'name', fields['name'], changes, 'core apply', queue=False), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'dns_blocking.blocklists', 'name', fields['name'], changes, 'core apply', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/addblocklist_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addblocklist_add():
fields, err = _parse_fields()
if err:
@@ -140,7 +140,7 @@ def addblocklist_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
blocklists = cfg.setdefault('dns_blocking', {}).setdefault('blocklists', [])
# Blocklist name must be unique - it is the lookup key for VLAN use_blocklists references
@@ -162,13 +162,13 @@ def addblocklist_add():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'dns_blocking.blocklists', 'name', fields['name'], changes, 'core apply', queue=False), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'dns_blocking.blocklists', 'name', fields['name'], changes, 'core apply', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/blocklistrefresh_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def blocklistrefresh_save():
daily_execute_time = validate.time_24h(sanitize.time_24h(request.form.get('daily_execute_time_24hr_local', '')))
@@ -176,27 +176,27 @@ def blocklistrefresh_save():
flash('Daily Refresh Time must be a valid 24-hour time (e.g. 02:30).', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('dns_blocking', {}).get('general', {}))
cfg.setdefault('dns_blocking', {}).setdefault('general', {})['daily_execute_time_24hr_local'] = daily_execute_time
- changes = diff_fields(before, cfg['dns_blocking']['general'])
- flash(record_group(cfg, 'dns_blocking.general', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['dns_blocking']['general'])
+ flash(config_utils.record_group(cfg, 'dns_blocking.general', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/blocklistrefresh_refresh', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def blocklistrefresh_refresh():
- flash(queued_msg('core update-blocklists', action_label='Blocklist refresh queued'), 'success')
+ flash(config_utils.queued_msg('core update-blocklists', action_label='Blocklist refresh queued'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/logging_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_save():
log_max_kb_raw = request.form.get('log_max_kb', '').strip()
log_errors_only = 'log_errors_only' in request.form
@@ -206,11 +206,11 @@ def logging_save():
flash('Max Log Size must be a number >= 64.', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('dns_blocking', {}).get('general', {}))
cfg.setdefault('dns_blocking', {}).setdefault('general', {}).update({
'log_max_kb': log_max_kb,
@@ -221,13 +221,13 @@ def logging_save():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, cfg['dns_blocking']['general'])
- flash(record_group(cfg, 'dns_blocking.general', None, None, changes, 'core apply', queue=False), 'success')
+ changes = config_utils.diff_fields(before, cfg['dns_blocking']['general'])
+ flash(config_utils.record_group(cfg, 'dns_blocking.general', None, None, changes, 'core apply', queue=False), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsblocking/logging_clear', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_clear():
try:
DNS_LOG_FILE.write_text('')
@@ -238,7 +238,7 @@ def logging_clear():
@bp.route('/action/dnsblocking/logging_download', methods=['GET'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_download():
if not DNS_LOG_FILE.is_file():
flash('Log file not found.', 'error')
diff --git a/docker/routlin-dash/app/pages/dnsblocking/view.py b/docker/routlin-dash/app/pages/dnsblocking/view.py
index 2d437bb..6f3cfb9 100644
--- a/docker/routlin-dash/app/pages/dnsblocking/view.py
+++ b/docker/routlin-dash/app/pages/dnsblocking/view.py
@@ -1,10 +1,10 @@
import json
import os
from datetime import datetime, timezone
-from config_utils import collect_layout_tokens, load_datasource, fmt_bytes, relative_time, BLOCKLISTS_DIR, CONFIGS_DIR
-from factory import e, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
-DNS_LOG_FILE = f'{CONFIGS_DIR}/dns-blocklists.log'
+DNS_LOG_FILE = f'{config_utils.CONFIGS_DIR}/dns-blocklists.log'
DNS_LOG_MAX = 50
@@ -37,17 +37,17 @@ def _dnsblocking_log_tail(cfg):
def blocklist_stats_html(cfg):
rows = ''
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
- name = e(bl.get('name', ''))
+ name = factory.e(bl.get('name', ''))
save_as = bl.get('save_as', '')
- bl_path = f'{BLOCKLISTS_DIR}/{save_as}' if save_as else ''
+ bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else ''
try:
with open(bl_path) as f:
entries = sum(1 for _ in f)
mtime = int(os.path.getmtime(bl_path))
- size_str = fmt_bytes(os.path.getsize(bl_path))
+ size_str = config_utils.fmt_bytes(os.path.getsize(bl_path))
last_refreshed = (
f'{datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M")}'
- f' ({relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago)'
+ f' ({config_utils.relative_time(mtime, datetime.now(tz=timezone.utc).timestamp())} ago)'
)
except Exception:
entries, size_str, last_refreshed = '-', '-', 'Never'
@@ -56,7 +56,7 @@ def blocklist_stats_html(cfg):
f'{name} | '
f'
{entries} | '
f'
{size_str} | '
- f'
{e(last_refreshed)} | '
+ f'
{factory.e(last_refreshed)} | '
''
)
if not rows:
@@ -73,7 +73,7 @@ def blocklist_stats_html(cfg):
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
@@ -84,8 +84,8 @@ def collect_tokens(cfg):
{'value': 'hosts', 'label': 'hosts (hosts file format)'},
{'value': 'dnsmasq', 'label': 'dnsmasq (local=/ syntax)'},
])
- content = load_json(f'{PAGES_DIR}/dnsblocking/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/dnsblocking/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/dnsserver/action.py b/docker/routlin-dash/app/pages/dnsserver/action.py
index 0ff5bfc..cbabfff 100644
--- a/docker/routlin-dash/app/pages/dnsserver/action.py
+++ b/docker/routlin-dash/app/pages/dnsserver/action.py
@@ -1,8 +1,8 @@
from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -11,7 +11,7 @@ _PAGE = Path(__file__).parent.name
bp = Blueprint(_PAGE, __name__)
@bp.route('/action/dnsserver/upstreamdns_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def upstreamdns_save():
strict_order = 'strict_order' in request.form
submitted = request.form.getlist('upstream_servers')
@@ -29,11 +29,11 @@ def upstreamdns_save():
return redirect(f'/{_PAGE}')
upstream_servers.append(clean)
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('upstream_dns', {}))
current = cfg.get('upstream_dns', {})
if (strict_order == bool(current.get('strict_order', False)) and
@@ -50,24 +50,24 @@ def upstreamdns_save():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, cfg['upstream_dns'])
- flash(record_group(cfg, 'upstream_dns', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['upstream_dns'])
+ flash(config_utils.record_group(cfg, 'upstream_dns', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/dnsserver/dnsforwarding_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def dnsforwarding_save():
cache_size = validate.int_range(request.form.get('cache_size', '').strip(), 0, None)
if cache_size is None:
flash('Cache Size must be a non-negative integer.', 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('upstream_dns', {}))
current = cfg.get('upstream_dns', {})
if cache_size == int(current.get('cache_size', 0)):
@@ -80,6 +80,6 @@ def dnsforwarding_save():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, cfg['upstream_dns'])
- flash(record_group(cfg, 'upstream_dns', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['upstream_dns'])
+ flash(config_utils.record_group(cfg, 'upstream_dns', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/dnsserver/view.py b/docker/routlin-dash/app/pages/dnsserver/view.py
index 46090dd..a841f8f 100644
--- a/docker/routlin-dash/app/pages/dnsserver/view.py
+++ b/docker/routlin-dash/app/pages/dnsserver/view.py
@@ -1,9 +1,9 @@
import json
-from config_utils import collect_layout_tokens
+import config_utils
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
dns = cfg.get('upstream_dns', {})
servers = dns.get('upstream_servers', [])
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
diff --git a/docker/routlin-dash/app/pages/hostoverrides/action.py b/docker/routlin-dash/app/pages/hostoverrides/action.py
index b080499..fbff9d6 100644
--- a/docker/routlin-dash/app/pages/hostoverrides/action.py
+++ b/docker/routlin-dash/app/pages/hostoverrides/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -19,14 +19,14 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@bp.route('/action/hostoverrides/addoverride_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addoverride_add():
description = sanitize.text(request.form.get('description', ''))
host = validate.domainname(request.form.get('host', ''))
@@ -38,7 +38,7 @@ def addoverride_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
err = validate.check_host_override_ip_in_vlans(ip, cfg)
if err:
flash(err, 'error')
@@ -52,13 +52,13 @@ def addoverride_add():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/hostoverrides/table_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_toggle():
idx = _row_index()
if idx is None:
@@ -67,7 +67,7 @@ def table_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('host_overrides', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -83,13 +83,13 @@ def table_toggle():
return redirect(f'/{_PAGE}')
host = items[idx]['host']
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/hostoverrides/table_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_edit():
idx = _row_index()
if idx is None:
@@ -107,7 +107,7 @@ def table_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
err = validate.check_host_override_ip_in_vlans(ip, cfg)
if err:
flash(err, 'error')
@@ -126,13 +126,13 @@ def table_edit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'host_overrides', 'host', host, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/hostoverrides/table_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_delete():
idx = _row_index()
if idx is None:
@@ -141,7 +141,7 @@ def table_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('host_overrides', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -154,6 +154,6 @@ def table_delete():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'host_overrides', 'host', removed['host'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'host_overrides', 'host', removed['host'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/hostoverrides/view.py b/docker/routlin-dash/app/pages/hostoverrides/view.py
index e852160..8d18353 100644
--- a/docker/routlin-dash/app/pages/hostoverrides/view.py
+++ b/docker/routlin-dash/app/pages/hostoverrides/view.py
@@ -1,11 +1,11 @@
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
- content = load_json(f'{PAGES_DIR}/hostoverrides/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ tokens = config_utils.collect_layout_tokens(cfg)
+ content = factory.load_json(f'{factory.PAGES_DIR}/hostoverrides/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/intervlan/action.py b/docker/routlin-dash/app/pages/intervlan/action.py
index 521022f..8bc3497 100644
--- a/docker/routlin-dash/app/pages/intervlan/action.py
+++ b/docker/routlin-dash/app/pages/intervlan/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -22,7 +22,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -88,7 +88,7 @@ def _parse_entry():
@bp.route('/action/intervlan/addexception_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addexception_add():
entry, err = _parse_entry()
if err:
@@ -96,7 +96,7 @@ def addexception_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
cfg.setdefault('inter_vlan_exceptions', []).append(entry)
errors = validate.validate_config(cfg)
if errors:
@@ -105,13 +105,13 @@ def addexception_add():
return redirect(f'/{_PAGE}')
src = entry.get('src_ip_or_subnet', '')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/intervlan/table_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_toggle():
idx = _row_index()
if idx is None:
@@ -120,7 +120,7 @@ def table_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('inter_vlan_exceptions', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -136,13 +136,13 @@ def table_toggle():
return redirect(f'/{_PAGE}')
src = items[idx].get('src_ip_or_subnet', '')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/intervlan/table_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_edit():
idx = _row_index()
if idx is None:
@@ -155,7 +155,7 @@ def table_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('inter_vlan_exceptions', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -171,13 +171,13 @@ def table_edit():
return redirect(f'/{_PAGE}')
src = items[idx].get('src_ip_or_subnet', '')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/intervlan/table_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def table_delete():
idx = _row_index()
if idx is None:
@@ -186,7 +186,7 @@ def table_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('inter_vlan_exceptions', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -200,6 +200,6 @@ def table_delete():
return redirect(f'/{_PAGE}')
src = removed.get('src_ip_or_subnet', '')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'inter_vlan_exceptions', 'src_ip_or_subnet', src, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/intervlan/view.py b/docker/routlin-dash/app/pages/intervlan/view.py
index ed3875d..e0e2527 100644
--- a/docker/routlin-dash/app/pages/intervlan/view.py
+++ b/docker/routlin-dash/app/pages/intervlan/view.py
@@ -1,17 +1,17 @@
import json
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
- content = load_json(f'{PAGES_DIR}/intervlan/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/intervlan/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/mdns/action.py b/docker/routlin-dash/app/pages/mdns/action.py
index 4e1c270..1bd1c4e 100644
--- a/docker/routlin-dash/app/pages/mdns/action.py
+++ b/docker/routlin-dash/app/pages/mdns/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -13,15 +13,15 @@ bp = Blueprint(_PAGE, __name__)
@bp.route('/action/mdns/settings_apply', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def settings_apply():
mdns_enabled = 'mdns_enabled' in request.form
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
mdns_reflect_vlans = sanitize.filterlist(
request.form.getlist('mdns_reflect_vlans'),
{v.get('name') for v in cfg.get('vlans', [])},
@@ -38,6 +38,6 @@ def settings_apply():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, cfg['mdns_reflection'])
- flash(record_group(cfg, 'mdns_reflection', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['mdns_reflection'])
+ flash(config_utils.record_group(cfg, 'mdns_reflection', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/mdns/view.py b/docker/routlin-dash/app/pages/mdns/view.py
index 0e98c4e..b940d94 100644
--- a/docker/routlin-dash/app/pages/mdns/view.py
+++ b/docker/routlin-dash/app/pages/mdns/view.py
@@ -1,5 +1,5 @@
-from config_utils import collect_layout_tokens
+import config_utils
def collect_tokens(cfg):
- return collect_layout_tokens(cfg)
+ return config_utils.collect_layout_tokens(cfg)
diff --git a/docker/routlin-dash/app/pages/networklayout/action.py b/docker/routlin-dash/app/pages/networklayout/action.py
index ab1d401..2d77d5b 100644
--- a/docker/routlin-dash/app/pages/networklayout/action.py
+++ b/docker/routlin-dash/app/pages/networklayout/action.py
@@ -4,11 +4,11 @@ import ipaddress
import json
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
-import settings as settings
+import settings
_PAGE = Path(__file__).parent.name
@@ -25,14 +25,14 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@bp.route('/action/networklayout/vlans_addedit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def vlans_addedit():
_ri = request.form.get('row_index', '').strip()
try:
@@ -53,7 +53,7 @@ def vlans_addedit():
restricted_vlan = restricted_vlan_raw if restricted_vlan_raw in ('q', 'c') else ''
use_blocklists = sanitize.filterlist(
request.form.getlist('use_blocklists'),
- {b.get('name') for b in load_config().get('dns_blocking', {}).get('blocklists', [])},
+ {b.get('name') for b in config_utils.load_config().get('dns_blocking', {}).get('blocklists', [])},
)
if restricted_vlan and not PRO_LICENSE:
@@ -216,7 +216,7 @@ def vlans_addedit():
if dhcp_overrides:
dhcp_info['explicit_overrides'] = dhcp_overrides
- cfg = load_config()
+ cfg = config_utils.load_config()
vlans = cfg.setdefault('vlans', [])
if is_edit:
@@ -284,11 +284,11 @@ def vlans_addedit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, existing)
+ changes = config_utils.diff_fields(before, existing)
if not changes:
flash('No changes were made.', 'info')
return redirect(f'/{_PAGE}')
- flash(record_group(cfg, 'vlans', 'name', name, changes, 'core apply'), 'success')
+ flash(config_utils.record_group(cfg, 'vlans', 'name', name, changes, 'core apply'), 'success')
else:
is_vpn = 'is_vpn' in request.form
@@ -347,14 +347,14 @@ def vlans_addedit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'vlans', 'name', name, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'vlans', 'name', name, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/networklayout/vlans_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def vlans_delete():
idx = _row_index()
if idx is None:
@@ -363,7 +363,7 @@ def vlans_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlans = cfg.get('vlans', [])
if idx < 0 or idx >= len(vlans):
flash('VLAN not found.', 'error')
@@ -376,6 +376,6 @@ def vlans_delete():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'vlans', 'name', removed['name'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'vlans', 'name', removed['name'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/networklayout/view.py b/docker/routlin-dash/app/pages/networklayout/view.py
index 0391a0d..f791862 100644
--- a/docker/routlin-dash/app/pages/networklayout/view.py
+++ b/docker/routlin-dash/app/pages/networklayout/view.py
@@ -1,13 +1,13 @@
import json
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
-import settings as settings
+import config_utils
+import factory
+import settings
PRO_LICENSE = settings.is_pro()
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
dv = next((v for v in vlans if v.get('radius_default')), None)
tokens['EXISTING_VLAN_IDS_JSON'] = json.dumps([v.get('vlan_id') for v in vlans])
@@ -30,8 +30,8 @@ def collect_tokens(cfg):
{'value': bl.get('name', ''), 'label': bl.get('description', bl.get('name', ''))}
for bl in cfg.get('dns_blocking', {}).get('blocklists', [])
])
- content = load_json(f'{PAGES_DIR}/networklayout/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/networklayout/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/overview/view.py b/docker/routlin-dash/app/pages/overview/view.py
index 561109d..d692282 100644
--- a/docker/routlin-dash/app/pages/overview/view.py
+++ b/docker/routlin-dash/app/pages/overview/view.py
@@ -1,14 +1,14 @@
import re
import os
-from config_utils import collect_layout_tokens, fmt_timestamp, BLOCKLISTS_DIR
-from factory import run, load_ddns
+import config_utils
+import factory
from pages.ddns.view import public_ip_info
from pages.dhcpleases.view import live_dhcp_leases
def get_dnsmasq_stats():
stats = {'queries': '-', 'hits': '-', 'hit_rate': '-', 'forwarded': '-', 'auth': '-', 'tcp_peak': '-'}
- out = run('journalctl -u dnsmasq -n 200 --no-pager 2>/dev/null')
+ out = factory.run('journalctl -u dnsmasq -n 200 --no-pager 2>/dev/null')
for line in reversed(out.splitlines()):
if 'queries forwarded' in line:
m = re.search(r'queries forwarded (\d+)', line)
@@ -36,15 +36,15 @@ def get_dnsmasq_stats():
def count_blocked_today():
- out = run("journalctl -u dnsmasq --since today --no-pager 2>/dev/null | grep -c 'is NXDOMAIN'")
+ out = factory.run("journalctl -u dnsmasq --since today --no-pager 2>/dev/null | grep -c 'is NXDOMAIN'")
return out or '0'
def count_blocked_domains():
try:
total = sum(
- int(run(f'wc -l < "{BLOCKLISTS_DIR}/{f}"') or 0)
- for f in os.listdir(BLOCKLISTS_DIR) if f.endswith('.con')
+ int(factory.run(f'wc -l < "{config_utils.BLOCKLISTS_DIR}/{f}"') or 0)
+ for f in os.listdir(config_utils.BLOCKLISTS_DIR) if f.endswith('.con')
)
return str(total)
except Exception:
@@ -54,23 +54,23 @@ def count_blocked_domains():
def bl_last_update():
try:
mtime = max(
- os.path.getmtime(f'{BLOCKLISTS_DIR}/{f}')
- for f in os.listdir(BLOCKLISTS_DIR) if f.endswith('.con')
+ os.path.getmtime(f'{config_utils.BLOCKLISTS_DIR}/{f}')
+ for f in os.listdir(config_utils.BLOCKLISTS_DIR) if f.endswith('.con')
)
- return fmt_timestamp(int(mtime))
+ return config_utils.fmt_timestamp(int(mtime))
except Exception:
return '-'
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')]
vlan_names = [v.get('name', '') for v in vlans]
net = cfg.get('network_interfaces', {})
dns = cfg.get('upstream_dns', {})
dns_stats = get_dnsmasq_stats()
- ddns = load_ddns()
+ ddns = factory.load_ddns()
ip_str, domains_sub, last_obtained = public_ip_info(ddns)
tokens['GENERAL_WAN_INTERFACE'] = str(net.get('wan_interface', '-'))
@@ -82,8 +82,8 @@ def collect_tokens(cfg):
tokens['STAT_BLOCKED_TODAY'] = count_blocked_today()
tokens['STAT_BLOCKED_DOMAINS'] = count_blocked_domains()
tokens['STAT_BL_LAST_UPDATE'] = bl_last_update()
- tokens['STAT_UPTIME'] = run('uptime -p') or '-'
- tokens['STAT_NFTABLES_STATUS'] = 'Active' if run('nft list tables 2>/dev/null').strip() else 'Inactive'
+ tokens['STAT_UPTIME'] = factory.run('uptime -p') or '-'
+ tokens['STAT_NFTABLES_STATUS'] = 'Active' if factory.run('nft list tables 2>/dev/null').strip() else 'Inactive'
tokens['STAT_PUBLIC_IP'] = ip_str
tokens['STAT_DDNS_HOSTNAME'] = domains_sub
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
diff --git a/docker/routlin-dash/app/pages/physicalinterfaces/action.py b/docker/routlin-dash/app/pages/physicalinterfaces/action.py
index f9a8471..3cdc8ab 100644
--- a/docker/routlin-dash/app/pages/physicalinterfaces/action.py
+++ b/docker/routlin-dash/app/pages/physicalinterfaces/action.py
@@ -3,8 +3,8 @@ import copy
import os
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash, queued_msg, queue_command
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -33,7 +33,7 @@ def _valid_interface(name):
@bp.route('/action/physicalinterfaces/physicalinterface_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def physicalinterface_save():
wan = sanitize.interface_name(request.form.get('wan_interface', ''))
lan = sanitize.interface_name(request.form.get('lan_interface', ''))
@@ -48,7 +48,7 @@ def physicalinterface_save():
flash(err, 'error')
return redirect(f'/{_PAGE}')
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
@@ -60,7 +60,7 @@ def physicalinterface_save():
flash(err, 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('network_interfaces', {}))
gen = cfg.setdefault('network_interfaces', {})
gen['wan_interface'] = wan
@@ -70,15 +70,15 @@ def physicalinterface_save():
for msg in errors:
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, cfg['network_interfaces'])
- flash(record_group(cfg, 'network_interfaces', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, cfg['network_interfaces'])
+ flash(config_utils.record_group(cfg, 'network_interfaces', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/physicalinterfaces/ifaceconfig_apply', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def ifaceconfig_apply():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return redirect(f'/{_PAGE}')
@@ -114,15 +114,15 @@ def ifaceconfig_apply():
queued = False
if mtu_int and str(mtu_int) != original_mtu:
- queue_command(f'mtu {iface} {mtu_int}')
+ config_utils.queue_command(f'mtu {iface} {mtu_int}')
queued = True
if mac and mac != original_mac:
- queue_command(f'mac {iface} {mac}')
+ config_utils.queue_command(f'mac {iface} {mac}')
queued = True
if not queued:
flash('No changes detected.', 'info')
return redirect(f'/{_PAGE}')
- flash(queued_msg(action_label='Changes queued'), 'success')
+ flash(config_utils.queued_msg(action_label='Changes queued'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/physicalinterfaces/view.py b/docker/routlin-dash/app/pages/physicalinterfaces/view.py
index 16576ea..80aa4cf 100644
--- a/docker/routlin-dash/app/pages/physicalinterfaces/view.py
+++ b/docker/routlin-dash/app/pages/physicalinterfaces/view.py
@@ -1,6 +1,6 @@
import json
import os
-from config_utils import collect_layout_tokens
+import config_utils
_EXCLUDE_PREFIXES = ('lo', 'wg', 'docker', 'br-', 'veth', 'tun', 'tap', 'ppp', 'virbr', 'podman', 'vnet', 'macvtap', 'fc-')
@@ -68,7 +68,7 @@ def iface_info(iface):
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
net = cfg.get('network_interfaces', {})
wan = net.get('wan_interface', '')
lan = net.get('lan_interface', '')
diff --git a/docker/routlin-dash/app/pages/portforwarding/action.py b/docker/routlin-dash/app/pages/portforwarding/action.py
index 109e2bc..a91aff5 100644
--- a/docker/routlin-dash/app/pages/portforwarding/action.py
+++ b/docker/routlin-dash/app/pages/portforwarding/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -22,7 +22,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -75,7 +75,7 @@ def _parse_entry():
@bp.route('/action/portforwarding/addrule_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addrule_add():
entry, err = _parse_entry()
if err:
@@ -83,7 +83,7 @@ def addrule_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
err = validate.check_portfwd_restricted_vlan(entry['nat_ip'], cfg.get('vlans', []))
if err:
flash(err, 'error')
@@ -98,13 +98,13 @@ def addrule_add():
return redirect(f'/{_PAGE}')
dest_port = entry.get('dest_port', '')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portforwarding/rules_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_toggle():
idx = _row_index()
if idx is None:
@@ -113,7 +113,7 @@ def rules_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_forwarding', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -137,13 +137,13 @@ def rules_toggle():
return redirect(f'/{_PAGE}')
dest_port = items[idx].get('dest_port', '')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portforwarding/rules_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_edit():
idx = _row_index()
if idx is None:
@@ -156,7 +156,7 @@ def rules_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_forwarding', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -179,13 +179,13 @@ def rules_edit():
return redirect(f'/{_PAGE}')
dest_port = items[idx].get('dest_port', '')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portforwarding/rules_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_delete():
idx = _row_index()
if idx is None:
@@ -194,7 +194,7 @@ def rules_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_forwarding', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -208,6 +208,6 @@ def rules_delete():
return redirect(f'/{_PAGE}')
dest_port = removed.get('dest_port', '')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'port_forwarding', 'dest_port', dest_port, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/portforwarding/view.py b/docker/routlin-dash/app/pages/portforwarding/view.py
index ce773fb..22bf8f4 100644
--- a/docker/routlin-dash/app/pages/portforwarding/view.py
+++ b/docker/routlin-dash/app/pages/portforwarding/view.py
@@ -1,17 +1,17 @@
import json
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
tokens['PROTOCOL_OPTIONS'] = json.dumps([
{'value': 'tcp', 'label': 'TCP'},
{'value': 'udp', 'label': 'UDP'},
{'value': 'both', 'label': 'TCP/UDP'},
])
- content = load_json(f'{PAGES_DIR}/portforwarding/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/portforwarding/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/portwrangling/action.py b/docker/routlin-dash/app/pages/portwrangling/action.py
index 6561330..a49cc81 100644
--- a/docker/routlin-dash/app/pages/portwrangling/action.py
+++ b/docker/routlin-dash/app/pages/portwrangling/action.py
@@ -2,8 +2,8 @@ from pathlib import Path
import copy
from flask import Blueprint, request, redirect, flash
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -22,7 +22,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -65,7 +65,7 @@ def _parse_entry():
@bp.route('/action/portwrangling/addrule_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addrule_add():
vlan_name = sanitize.name(request.form.get('vlan_name', ''))
entry, err = _parse_entry()
@@ -77,7 +77,7 @@ def addrule_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlan = next((v for v in cfg.get('vlans', []) if v.get('name') == vlan_name), None)
if vlan is None:
flash(f'The configuration has not been saved because VLAN "{vlan_name}" was not found.', 'error')
@@ -91,13 +91,13 @@ def addrule_add():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- flash(record_group(cfg, 'port_wrangling', 'dest_port', entry['dest_port'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(None, entry)
+ flash(config_utils.record_group(cfg, 'port_wrangling', 'dest_port', entry['dest_port'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_toggle():
idx = _row_index()
if idx is None:
@@ -106,7 +106,7 @@ def rules_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -121,13 +121,13 @@ def rules_toggle():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_edit():
idx = _row_index()
if idx is None:
@@ -140,7 +140,7 @@ def rules_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -156,13 +156,13 @@ def rules_edit():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(before, items[idx])
- flash(record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, items[idx])
+ flash(config_utils.record_group(cfg, 'port_wrangling', 'dest_port', items[idx].get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/portwrangling/rules_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def rules_delete():
idx = _row_index()
if idx is None:
@@ -171,7 +171,7 @@ def rules_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
items = cfg.get('port_wrangling', [])
if idx < 0 or idx >= len(items):
flash('Entry not found.', 'error')
@@ -184,6 +184,6 @@ def rules_delete():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(removed, None)
- flash(record_group(cfg, 'port_wrangling', 'dest_port', removed.get('dest_port', ''), changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, 'port_wrangling', 'dest_port', removed.get('dest_port', ''), changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
diff --git a/docker/routlin-dash/app/pages/portwrangling/view.py b/docker/routlin-dash/app/pages/portwrangling/view.py
index b0d3ed5..d3222a4 100644
--- a/docker/routlin-dash/app/pages/portwrangling/view.py
+++ b/docker/routlin-dash/app/pages/portwrangling/view.py
@@ -1,10 +1,10 @@
import json
-from config_utils import collect_layout_tokens, load_datasource
-from factory import load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
filter_opts = '
' + ''.join(
@@ -21,8 +21,8 @@ def collect_tokens(cfg):
v.get('name', ''): {'subnet': v.get('subnet', ''), 'prefix': v.get('subnet_mask', 0)}
for v in vlans if v.get('name') and v.get('subnet')
})
- content = load_json(f'{PAGES_DIR}/portwrangling/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/portwrangling/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- tokens[table_token_key(ds)] = build_table(table_item, tokens, load_datasource(ds))
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens
diff --git a/docker/routlin-dash/app/pages/preferences/action.py b/docker/routlin-dash/app/pages/preferences/action.py
index d3f9265..2ed63a0 100644
--- a/docker/routlin-dash/app/pages/preferences/action.py
+++ b/docker/routlin-dash/app/pages/preferences/action.py
@@ -1,8 +1,8 @@
from pathlib import Path
from flask import Blueprint, request, session, redirect, flash
import json, bcrypt
-from auth import require_level
-from config_utils import ACCOUNTS_FILE
+import auth
+import config_utils
import sanitize
_PAGE = Path(__file__).parent.name
@@ -13,18 +13,18 @@ bp = Blueprint(_PAGE, __name__)
def _load_accounts():
try:
- with open(ACCOUNTS_FILE) as f:
+ with open(config_utils.ACCOUNTS_FILE) as f:
return json.load(f)
except Exception:
return {'accounts': []}
def _save_accounts(data):
- with open(ACCOUNTS_FILE, 'w') as f:
+ with open(config_utils.ACCOUNTS_FILE, 'w') as f:
json.dump(data, f, indent=2)
@bp.route('/action/preferences/accountdetails_save', methods=['POST'])
-@require_level('viewer')
+@auth.require_level('viewer')
def accountdetails_save():
tz = sanitize.timezone(request.form.get('timezone', '').strip())
@@ -51,7 +51,7 @@ def accountdetails_save():
@bp.route('/action/preferences/changepassword_save', methods=['POST'])
-@require_level('viewer')
+@auth.require_level('viewer')
def changepassword_save():
current_password = request.form.get('current_password', '')
new_password = request.form.get('new_password', '')
diff --git a/docker/routlin-dash/app/pages/preferences/view.py b/docker/routlin-dash/app/pages/preferences/view.py
index fccd95e..94fe6d5 100644
--- a/docker/routlin-dash/app/pages/preferences/view.py
+++ b/docker/routlin-dash/app/pages/preferences/view.py
@@ -1,11 +1,11 @@
import json
from flask import session
import sanitize
-from config_utils import collect_layout_tokens
+import config_utils
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
blank = [{'value': '', 'label': '-- Select timezone --'}]
tokens['PREF_EMAIL'] = session.get('email_address', '')
tokens['PREF_TIMEZONE'] = session.get('timezone', '')
diff --git a/docker/routlin-dash/app/pages/radius/action.py b/docker/routlin-dash/app/pages/radius/action.py
index 23a1eea..4a7c760 100644
--- a/docker/routlin-dash/app/pages/radius/action.py
+++ b/docker/routlin-dash/app/pages/radius/action.py
@@ -5,10 +5,10 @@ import os
import re
from pathlib import Path
from flask import Blueprint, request, redirect, flash, send_file, abort, jsonify
-from auth import require_level
-from config_utils import CONFIGS_DIR, load_config, record_group, diff_fields
+import auth
+import config_utils
import mod_validation as validate
-import settings as settings
+import settings
_PAGE = Path(__file__).parent.name
@@ -16,7 +16,7 @@ PRO_LICENSE = settings.is_pro()
bp = Blueprint(_PAGE, __name__)
-RADIUS_SECRET_FILE = Path(CONFIGS_DIR) / '.radius-secret'
+RADIUS_SECRET_FILE = Path(config_utils.CONFIGS_DIR) / '.radius-secret'
RADIUS_LOG_FILE = '/var/log/freeradius/radius.log'
VALID_MAC_FORMATS = {
@@ -26,7 +26,7 @@ VALID_MAC_FORMATS = {
@bp.route('/action/radius/regenerate', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def regenerate():
try:
RADIUS_SECRET_FILE.unlink(missing_ok=True)
@@ -38,25 +38,25 @@ def regenerate():
@bp.route('/action/radius/options_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def options_save():
mac_format = request.form.get('mac_format', 'aabbccddeeff')
if mac_format not in VALID_MAC_FORMATS:
flash('Invalid MAC format.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('radius', {}).get('options', {}))
after = {**before, 'mac_format': mac_format}
cfg.setdefault('radius', {})['options'] = after
- changes = diff_fields(before, after)
- flash(record_group(cfg, 'radius.options', 'setting', 'radius', changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, after)
+ flash(config_utils.record_group(cfg, 'radius.options', 'setting', 'radius', changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/radius/auth_mode_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def auth_mode_save():
auth_mode = request.form.get('auth_mode', 'mab')
if auth_mode not in ('mab', 'eap_password', 'eap_credential'):
@@ -78,7 +78,7 @@ def auth_mode_save():
'eap_ttls': {'md5', 'mschapv2', 'gtc'},
}
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('radius', {}).get('options', {}))
after = {**before, 'auth_mode': auth_mode}
if auth_mode == 'eap_password':
@@ -104,13 +104,13 @@ def auth_mode_save():
after.pop('include_length', None)
cfg.setdefault('radius', {})['options'] = after
- changes = diff_fields(before, after)
- flash(record_group(cfg, 'radius.options', 'auth_mode', auth_mode, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, after)
+ flash(config_utils.record_group(cfg, 'radius.options', 'auth_mode', auth_mode, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/radius/default_rule_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def default_rule_save():
apply_to = request.form.get('apply_to', 'all')
ap_ips = request.form.getlist('ap_ips')
@@ -119,7 +119,7 @@ def default_rule_save():
flash('Invalid apply_to value.', 'error')
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
valid_ips = {
r['ip'] for r in cfg.get('dhcp_reservations', [])
if r.get('radius_client') is True
@@ -134,16 +134,16 @@ def default_rule_save():
after = {**before, 'apply_to': apply_to, 'ap_ips': ap_ips}
cfg.setdefault('radius', {})['options'] = after
- changes = diff_fields(before, after)
- flash(record_group(cfg, 'radius.options', 'default_rule', 'radius', changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, after)
+ flash(config_utils.record_group(cfg, 'radius.options', 'default_rule', 'radius', changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/radius/default_vlan_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def default_vlan_save():
chosen = request.form.get('default_vlan', '').strip()
- cfg = load_config()
+ cfg = config_utils.load_config()
vlans = cfg.get('vlans', [])
if chosen and not any(v['name'] == chosen for v in vlans):
@@ -154,14 +154,14 @@ def default_vlan_save():
for v in vlans:
v['radius_default'] = (v['name'] == chosen) if chosen else False
- changes = diff_fields({'radius_default': old_name}, {'radius_default': chosen})
- flash(record_group(cfg, 'radius', 'default_vlan', chosen or 'none', changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields({'radius_default': old_name}, {'radius_default': chosen})
+ flash(config_utils.record_group(cfg, 'radius', 'default_vlan', chosen or 'none', changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/radius/logging_save', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_save():
log_max_kb = validate.int_range(request.form.get('log_max_kb', '').strip(), 64, None)
if log_max_kb is None:
@@ -169,18 +169,18 @@ def logging_save():
return redirect(f'/{_PAGE}')
logging = 'logging' in request.form
- cfg = load_config()
+ cfg = config_utils.load_config()
before = copy.deepcopy(cfg.get('radius', {}).get('general', {}))
after = {'logging': logging, 'log_max_kb': log_max_kb}
cfg.setdefault('radius', {})['general'] = after
- changes = diff_fields(before, after)
- flash(record_group(cfg, 'radius.general', 'setting', 'radius', changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, after)
+ flash(config_utils.record_group(cfg, 'radius.general', 'setting', 'radius', changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/radius/logging_download', methods=['GET'])
-@require_level('administrator')
+@auth.require_level('administrator')
def logging_download():
log_dir = os.path.dirname(RADIUS_LOG_FILE)
chunks = []
@@ -228,10 +228,10 @@ def logging_download():
@bp.route('/api/radius/log-tail', methods=['GET'])
-@require_level('administrator')
+@auth.require_level('administrator')
def api_log_tail():
try:
- cfg = load_config()
+ cfg = config_utils.load_config()
log_max_kb = cfg.get('radius', {}).get('general', {}).get('log_max_kb', 1024)
current = []
diff --git a/docker/routlin-dash/app/pages/radius/view.py b/docker/routlin-dash/app/pages/radius/view.py
index 54d9e51..036a863 100644
--- a/docker/routlin-dash/app/pages/radius/view.py
+++ b/docker/routlin-dash/app/pages/radius/view.py
@@ -1,7 +1,7 @@
import json
import os
-from config_utils import collect_layout_tokens, CONFIGS_DIR
-import settings as settings
+import config_utils
+import settings
PRO_LICENSE = settings.is_pro()
@@ -59,9 +59,9 @@ def radius_log_tail(cfg):
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
try:
- tokens['RADIUS_SECRET'] = open(f'{CONFIGS_DIR}/.radius-secret').read().strip()
+ tokens['RADIUS_SECRET'] = open(f'{config_utils.CONFIGS_DIR}/.radius-secret').read().strip()
except OSError:
tokens['RADIUS_SECRET'] = '(Generation is pending - visit Actions to apply generation command)'
fr = cfg.get('radius', {})
diff --git a/docker/routlin-dash/app/pages/vpn/action.py b/docker/routlin-dash/app/pages/vpn/action.py
index b277a6a..3ec9c6a 100644
--- a/docker/routlin-dash/app/pages/vpn/action.py
+++ b/docker/routlin-dash/app/pages/vpn/action.py
@@ -5,8 +5,8 @@ import ipaddress
import re
from flask import Blueprint, make_response, redirect, flash, request
-from auth import require_level
-from config_utils import load_config, record_group, diff_fields, verify_config_hash, CONFIGS_DIR, WEB_APP_DISPLAY_NAME
+import auth
+import config_utils
import sanitize
import mod_validation as validate
@@ -53,7 +53,7 @@ def _row_index():
def _hash_ok():
- if not verify_config_hash(request.form.get('config_hash', '')):
+ if not config_utils.verify_config_hash(request.form.get('config_hash', '')):
flash('Configuration was modified by another session. Please refresh and try again.', 'error')
return False
return True
@@ -72,7 +72,7 @@ def _generate_wg_keypair():
def _server_pubkey(iface):
try:
- with open(f'{CONFIGS_DIR}/.{iface}.pub') as f:
+ with open(f'{config_utils.CONFIGS_DIR}/.{iface}.pub') as f:
return f.read().strip()
except OSError:
return None
@@ -101,7 +101,7 @@ def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey):
allowed_ips = f'{subnet}/{prefix}' if split_tunnel else '0.0.0.0/0'
lines = [
- f'# Generated by {WEB_APP_DISPLAY_NAME}', '',
+ f'# Generated by {config_utils.WEB_APP_DISPLAY_NAME}', '',
'[Interface]',
f'PrivateKey = {private_key}',
f'Address = {peer_ip}/{prefix}',
@@ -117,7 +117,7 @@ def _build_client_conf(vlan, peer_name, peer_ip, private_key, server_pubkey):
def _conf_response(vlan, peer_name, peer_ip, private_key):
- cfg = load_config()
+ cfg = config_utils.load_config()
iface = _wg_iface(vlan, cfg)
server_pub = _server_pubkey(iface)
if not server_pub:
@@ -133,7 +133,7 @@ def _conf_response(vlan, peer_name, peer_ip, private_key):
@bp.route('/action/vpn/wireguard_apply', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def wireguard_apply():
listen_port_raw = request.form.get('vpn_listen_port', '').strip()
server_endpoint = validate.domainname(request.form.get('vpn_server_endpoint', ''))
@@ -166,7 +166,7 @@ def wireguard_apply():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vpn_vlan = _wg_vlan(cfg)
if vpn_vlan is None:
flash('No WireGuard VLAN found in configuration.', 'error')
@@ -201,13 +201,13 @@ def wireguard_apply():
return redirect(f'/{_PAGE}')
vlan_name = vpn_vlan['name']
- changes = diff_fields(before_info, info)
- flash(record_group(cfg, f'vlans[name={vlan_name}].vpn_information', None, None, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before_info, info)
+ flash(config_utils.record_group(cfg, f'vlans[name={vlan_name}].vpn_information', None, None, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/vpn/addpeer_add', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def addpeer_add():
peer_name = sanitize.name(request.form.get('peer_name', ''))
peer_vlan_nm = request.form.get('peer_vlan', '').strip()
@@ -228,7 +228,7 @@ def addpeer_add():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vpn_vlan = _wg_vlan_by_name(cfg, peer_vlan_nm)
if vpn_vlan is None:
flash(f'VPN VLAN "{peer_vlan_nm}" not found.', 'error')
@@ -269,13 +269,13 @@ def addpeer_add():
flash(msg, 'error')
return redirect(f'/{_PAGE}')
- changes = diff_fields(None, entry)
- record_group(cfg, f'vlans[name={peer_vlan_nm}].peers', 'name', peer_name, changes, 'core apply')
+ changes = config_utils.diff_fields(None, entry)
+ config_utils.record_group(cfg, f'vlans[name={peer_vlan_nm}].peers', 'name', peer_name, changes, 'core apply')
return _conf_response(vpn_vlan, peer_name, peer_ip, private_key)
@bp.route('/action/vpn/peers_edit', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def peers_edit():
flat_idx = _row_index()
if flat_idx is None:
@@ -292,7 +292,7 @@ def peers_edit():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
@@ -313,13 +313,13 @@ def peers_edit():
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
- changes = diff_fields(before, {'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
- flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, {'name': peer_name, 'split_tunnel': split_tunnel, 'enabled': enabled})
+ flash(config_utils.record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/vpn/peers_toggle', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def peers_toggle():
flat_idx = _row_index()
if flat_idx is None:
@@ -328,7 +328,7 @@ def peers_toggle():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
@@ -346,13 +346,13 @@ def peers_toggle():
peer_name = peers[peer_idx]['name']
vlan_name = vlan['name']
- changes = diff_fields(before, peers[peer_idx])
- flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(before, peers[peer_idx])
+ flash(config_utils.record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer_name, changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/vpn/peers_delete', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def peers_delete():
flat_idx = _row_index()
if flat_idx is None:
@@ -361,7 +361,7 @@ def peers_delete():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
@@ -376,13 +376,13 @@ def peers_delete():
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
- changes = diff_fields(removed, None)
- flash(record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', removed['name'], changes, 'core apply'), 'success')
+ changes = config_utils.diff_fields(removed, None)
+ flash(config_utils.record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', removed['name'], changes, 'core apply'), 'success')
return redirect(f'/{_PAGE}')
@bp.route('/action/vpn/peers_regenerate', methods=['POST'])
-@require_level('administrator')
+@auth.require_level('administrator')
def peers_regenerate():
flat_idx = _row_index()
if flat_idx is None:
@@ -391,7 +391,7 @@ def peers_regenerate():
if not _hash_ok():
return redirect(f'/{_PAGE}')
- cfg = load_config()
+ cfg = config_utils.load_config()
vlan, peer_idx = _find_peer_by_flat_idx(cfg, flat_idx)
if vlan is None:
flash('Peer not found.', 'error')
@@ -408,6 +408,6 @@ def peers_regenerate():
return redirect(f'/{_PAGE}')
vlan_name = vlan['name']
- changes = diff_fields({'public_key': old_pub_key}, {'public_key': public_key})
- record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer['name'], changes, 'core apply')
+ changes = config_utils.diff_fields({'public_key': old_pub_key}, {'public_key': public_key})
+ config_utils.record_group(cfg, f'vlans[name={vlan_name}].peers', 'name', peer['name'], changes, 'core apply')
return _conf_response(vlan, peer['name'], peer['ip'], private_key)
diff --git a/docker/routlin-dash/app/pages/vpn/view.py b/docker/routlin-dash/app/pages/vpn/view.py
index 935d88b..bdac1f4 100644
--- a/docker/routlin-dash/app/pages/vpn/view.py
+++ b/docker/routlin-dash/app/pages/vpn/view.py
@@ -1,11 +1,11 @@
import json
-from config_utils import collect_layout_tokens, load_datasource, fmt_timestamp, fmt_bytes
-from factory import run, load_json, build_table, table_token_key, iter_table_items, PAGES_DIR
+import config_utils
+import factory
def live_vpn_sessions():
rows = []
- out = run('wg show all dump 2>/dev/null')
+ out = factory.run('wg show all dump 2>/dev/null')
for line in out.splitlines():
parts = line.split('\t')
if len(parts) == 9:
@@ -15,15 +15,15 @@ def live_vpn_sessions():
'interface': interface,
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
'endpoint': endpoint if endpoint != '(none)' else '-',
- 'last_handshake': fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
- 'rx_bytes': fmt_bytes(int(rx)) if rx.isdigit() else '-',
- 'tx_bytes': fmt_bytes(int(tx)) if tx.isdigit() else '-',
+ 'last_handshake': config_utils.fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
+ 'rx_bytes': config_utils.fmt_bytes(int(rx)) if rx.isdigit() else '-',
+ 'tx_bytes': config_utils.fmt_bytes(int(tx)) if tx.isdigit() else '-',
})
return rows
def collect_tokens(cfg):
- tokens = collect_layout_tokens(cfg)
+ tokens = config_utils.collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
wg_vlans_list = sorted(
[v for v in vlans if v.get('is_vpn')],
@@ -55,9 +55,9 @@ def collect_tokens(cfg):
tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_servers', ''))
tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
tokens['VPN_GATEWAY'] = vpn_gateway
- content = load_json(f'{PAGES_DIR}/vpn/content.json')
- for table_item in iter_table_items(content.get('items', [])):
+ content = factory.load_json(f'{factory.PAGES_DIR}/vpn/content.json')
+ for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
- rows = live_vpn_sessions() if ds == 'live:vpn_sessions' else load_datasource(ds)
- tokens[table_token_key(ds)] = build_table(table_item, tokens, rows)
+ rows = live_vpn_sessions() if ds == 'live:vpn_sessions' else config_utils.load_datasource(ds)
+ tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, rows)
return tokens