linuxrouter/docker/routlin-dash/app/pages/dnsblocking/view.py
2026-06-09 13:28:59 -04:00

176 lines
7.6 KiB
Python

import json
import os
from datetime import datetime, timezone
import config_utils
import factory
DNS_LOG_FILE = f'{config_utils.BLOCKLISTS_DIR}/.log'
DNS_LOG_MAX = 50
BL_TYPE_LABELS = {'community': 'Community', 'local': 'Local'}
def _dnsblocking_log_tail(cfg):
try:
log_max_kb = cfg.get('dns_blocking', {}).get('general', {}).get('log_max_kb', 1024)
size_kb = os.path.getsize(DNS_LOG_FILE) / 1024
with open(DNS_LOG_FILE) as f:
lines = f.readlines()
if not lines:
return '(no log entries yet)', ''
total = len(lines)
tail = lines[-DNS_LOG_MAX:]
shown = len(tail)
hidden = total - shown
pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0
left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines'
right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)'
summary = (
'<div class="text-muted" style="display:flex;justify-content:space-between;margin-top:0.5em;">'
f'<span>{left}</span><span>{right}</span></div>'
)
return ''.join(tail).strip(), summary
except FileNotFoundError:
return '(no log entries yet)', ''
except Exception:
return '(error reading log)', ''
WARN_ICON = '&nbsp;<span class="tooltip-wrap" data-tooltip="Stale blocklist used. Verify URL is correct." style="display:inline-flex;align-items:center;"><img src="/www/icons/warning.svg" style="width:1em;height:1em;"></span>'
ERROR_ICON = '&nbsp;<span class="tooltip-wrap" data-tooltip="Blocklist not found. No cached version available." style="display:inline-flex;align-items:center;"><img src="/www/icons/error.svg" style="width:1em;height:1em;"></span>'
def _last_dl_time():
path = f'{config_utils.BLOCKLISTS_DIR}/.last-dl'
try:
return int(open(path).read().strip())
except Exception:
return None
def blocklist_stats_html(cfg):
db_rows = config_utils._bl_db_rows()
last_dl = _last_dl_time()
rows = ''
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
name = bl.get('name', '')
is_local = bl.get('bl_type') == 'local'
db = db_rows.get(name, {})
count = db.get('domain_count')
entries = f'{count:,}' if count is not None else '-'
if is_local:
save_as = bl.get('save_as', '')
bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else ''
try:
size_str = config_utils.fmt_bytes(os.path.getsize(bl_path))
except Exception:
size_str = '-'
last_refreshed = 'Local file'
warn = ''
else:
fetched_at = db.get('fetched_at')
if fetched_at:
last_refreshed = (
f'Downloaded {datetime.fromtimestamp(fetched_at).strftime("%Y-%m-%d %H:%M")}'
f' ({config_utils.relative_time(fetched_at, datetime.now(tz=timezone.utc).timestamp())} ago)'
)
else:
last_refreshed = 'Never downloaded'
save_as = bl.get('save_as', '')
bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else ''
try:
size_str = config_utils.fmt_bytes(os.path.getsize(bl_path))
file_mtime = int(os.path.getmtime(bl_path))
except Exception:
size_str = '-'
file_mtime = None
if file_mtime is None:
warn = ERROR_ICON
elif last_dl and file_mtime < last_dl:
warn = WARN_ICON
else:
warn = ''
rows += (
'<tr>'
f'<td class="table-cell">{factory.e(name)}</td>'
f'<td class="table-cell">{entries}</td>'
f'<td class="table-cell">{size_str}</td>'
f'<td class="table-cell">{factory.e(last_refreshed)}{warn}</td>'
'</tr>'
)
if not rows:
return ''
return (
'<table class="data-table"><thead><tr>'
'<th class="table-header">Blocklist</th>'
'<th class="table-header">Entries</th>'
'<th class="table-header">Size</th>'
'<th class="table-header">Last Refreshed</th>'
'</tr></thead>'
f'<tbody>{rows}</tbody></table>'
)
def collect_tokens(cfg):
tokens = config_utils.collect_layout_tokens(cfg)
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-'))
tokens['BLOCKLIST_STATS_HTML'] = blocklist_stats_html(cfg)
tokens['DNS_LOG_TAIL'], tokens['DNS_LOG_SUMMARY'] = _dnsblocking_log_tail(cfg)
blocklists = cfg.get('dns_blocking', {}).get('blocklists', [])
tokens['BLOCKLIST_EXISTING_NAMES_JS'] = json.dumps([bl.get('name', '') for bl in blocklists])
bl_options = ''.join(
f'<option value="{factory.e(bl["name"])}">{factory.e(bl["name"])}</option>'
for bl in blocklists if bl.get('name')
)
tokens['BLOCKLIST_SEARCH_HTML'] = (
'<div style="display:flex;gap:0.75rem;align-items:flex-end;flex-wrap:wrap;">'
'<div>'
'<label class="form-label">Match</label>'
'<select id="bl-search-match" class="form-input" style="width:auto;">'
'<option value="partial">Partial</option>'
'<option value="exact">Exact</option>'
'<option value="starts_with">Starts With</option>'
'<option value="ends_with">Ends With</option>'
'</select>'
'</div>'
'<div>'
'<label class="form-label">Search Where</label>'
'<select id="bl-search-list" class="form-input" style="width:auto;">'
'<option value="">-- All Blocklists --</option>'
f'{bl_options}'
'</select>'
'</div>'
'<div>'
'<label class="form-label">Limit</label>'
'<input id="bl-search-limit" type="number" class="form-input" min="1" max="5000" value="500" style="width:6rem;">'
'</div>'
'<div style="flex:1 1 200px;">'
'<label class="form-label">Search term</label>'
'<input id="bl-search-term" type="text" class="form-input" placeholder="e.g. doubleclick.net">'
'</div>'
'<div style="padding-top:1.4rem;">'
'<button id="bl-search-btn" class="btn btn-primary">Search</button>'
'</div>'
'</div>'
'<div id="bl-search-results" style="margin-top:1rem;"></div>'
)
vlans = cfg.get('vlans', [])
vlan_checkboxes = ''.join(
f'<label class="form-checkbox-row">'
f'<input type="checkbox" name="used_by_vlans" value="{factory.e(v["name"])}" class="form-checkbox">'
f' <span class="form-checkbox-label">{factory.e(v["name"])}</span>'
f'</label>'
for v in vlans if v.get('name')
)
tokens['VLAN_USED_BY_CHECKBOXES'] = (
f'<div class="form-group"><label class="form-label">Used By VLAN(s)</label>'
f'<div>{vlan_checkboxes}</div></div>'
) if vlan_checkboxes else ''
content = factory.load_json(f'{factory.PAGES_DIR}/dnsblocking/content.json')
for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens