import json
import os
from datetime import datetime, timezone
import config_utils
import factory
DNS_LOG_FILE = f'{config_utils.CONFIGS_DIR}/dns-blocklists.log'
DNS_LOG_MAX = 50
BL_TYPE_LABELS = {'community': 'Community', 'local': 'Local'}
def _dnsblocking_log_tail(cfg):
try:
log_max_kb = cfg.get('dns_blocking', {}).get('general', {}).get('log_max_kb', 1024)
size_kb = os.path.getsize(DNS_LOG_FILE) / 1024
with open(DNS_LOG_FILE) as f:
lines = f.readlines()
if not lines:
return '(log is empty)', ''
total = len(lines)
tail = lines[-DNS_LOG_MAX:]
shown = len(tail)
hidden = total - shown
pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0
left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines'
right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)'
summary = (
'
'
f'{left}{right}
'
)
return ''.join(tail).strip(), summary
except FileNotFoundError:
return '(log file not found)', ''
except Exception:
return '(error reading log)', ''
def blocklist_stats_html(cfg):
db_rows = config_utils._bl_db_rows()
rows = ''
for bl in cfg.get('dns_blocking', {}).get('blocklists', []):
name = bl.get('name', '')
is_local = bl.get('bl_type') == 'local'
db = db_rows.get(name, {})
count = db.get('domain_count')
entries = f'{count:,}' if count is not None else '-'
if is_local:
save_as = bl.get('save_as', '')
bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else ''
try:
size_str = config_utils.fmt_bytes(os.path.getsize(bl_path))
except Exception:
size_str = '-'
last_refreshed = 'Local'
else:
fetched_at = db.get('fetched_at')
if fetched_at:
last_refreshed = (
f'{datetime.fromtimestamp(fetched_at).strftime("%Y-%m-%d %H:%M")}'
f' ({config_utils.relative_time(fetched_at, datetime.now(tz=timezone.utc).timestamp())} ago)'
)
else:
last_refreshed = 'Never'
save_as = bl.get('save_as', '')
bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else ''
try:
size_str = config_utils.fmt_bytes(os.path.getsize(bl_path))
except Exception:
size_str = '-'
rows += (
''
f'| {factory.e(name)} | '
f'{entries} | '
f'{size_str} | '
f'{factory.e(last_refreshed)} | '
'
'
)
if not rows:
return ''
return (
''
''
''
''
''
'
'
f'{rows}
'
)
def collect_tokens(cfg):
tokens = config_utils.collect_layout_tokens(cfg)
dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {})
tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-'))
tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false'
tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-'))
tokens['BLOCKLIST_STATS_HTML'] = blocklist_stats_html(cfg)
tokens['DNS_LOG_TAIL'], tokens['DNS_LOG_SUMMARY'] = _dnsblocking_log_tail(cfg)
blocklists = cfg.get('dns_blocking', {}).get('blocklists', [])
tokens['BLOCKLIST_EXISTING_NAMES_JS'] = json.dumps([bl.get('name', '') for bl in blocklists])
vlans = cfg.get('vlans', [])
vlan_checkboxes = ''.join(
f''
for v in vlans if v.get('name')
)
tokens['VLAN_USED_BY_CHECKBOXES'] = (
f''
) if vlan_checkboxes else ''
content = factory.load_json(f'{factory.PAGES_DIR}/dnsblocking/content.json')
for table_item in factory.iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds))
return tokens