import json import os from datetime import datetime, timezone import config_utils import factory DNS_LOG_FILE = f'{config_utils.BLOCKLISTS_DIR}/.log' DNS_LOG_MAX = 50 BL_TYPE_LABELS = {'community': 'Community', 'local': 'Local'} def _dnsblocking_log_tail(cfg): try: log_max_kb = cfg.get('dns_blocking', {}).get('general', {}).get('log_max_kb', 1024) size_kb = os.path.getsize(DNS_LOG_FILE) / 1024 with open(DNS_LOG_FILE) as f: lines = f.readlines() if not lines: return '(no log entries yet)', '' total = len(lines) tail = lines[-DNS_LOG_MAX:] shown = len(tail) hidden = total - shown pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0 left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines' right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)' summary = ( '
' f'{left}{right}
' ) return ''.join(tail).strip(), summary except FileNotFoundError: return '(no log entries yet)', '' except Exception: return '(error reading log)', '' WARN_ICON = ' ' ERROR_ICON = ' ' def _last_dl_time(): path = f'{config_utils.BLOCKLISTS_DIR}/.last-dl' try: return int(open(path).read().strip()) except Exception: return None def blocklist_stats_html(cfg): db_rows = config_utils._bl_db_rows() last_dl = _last_dl_time() rows = '' for bl in cfg.get('dns_blocking', {}).get('blocklists', []): name = bl.get('name', '') is_local = bl.get('bl_type') == 'local' db = db_rows.get(name, {}) count = db.get('domain_count') entries = f'{count:,}' if count is not None else '-' if is_local: save_as = bl.get('save_as', '') bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else '' try: size_str = config_utils.fmt_bytes(os.path.getsize(bl_path)) except Exception: size_str = '-' last_refreshed = 'Local' warn = '' else: fetched_at = db.get('fetched_at') if fetched_at: last_refreshed = ( f'{datetime.fromtimestamp(fetched_at).strftime("%Y-%m-%d %H:%M")}' f' ({config_utils.relative_time(fetched_at, datetime.now(tz=timezone.utc).timestamp())} ago)' ) else: last_refreshed = 'Never' save_as = bl.get('save_as', '') bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else '' try: size_str = config_utils.fmt_bytes(os.path.getsize(bl_path)) file_mtime = int(os.path.getmtime(bl_path)) except Exception: size_str = '-' file_mtime = None if file_mtime is None: warn = ERROR_ICON elif last_dl and file_mtime < last_dl: warn = WARN_ICON else: warn = '' rows += ( '' f'{factory.e(name)}' f'{entries}' f'{size_str}' f'{factory.e(last_refreshed)}{warn}' '' ) if not rows: return '' return ( '' '' '' '' '' '' f'{rows}
BlocklistEntriesSizeLast Refreshed
' ) def collect_tokens(cfg): tokens = config_utils.collect_layout_tokens(cfg) dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {}) tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-')) tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false' tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-')) tokens['BLOCKLIST_STATS_HTML'] = blocklist_stats_html(cfg) tokens['DNS_LOG_TAIL'], tokens['DNS_LOG_SUMMARY'] = _dnsblocking_log_tail(cfg) blocklists = cfg.get('dns_blocking', {}).get('blocklists', []) tokens['BLOCKLIST_EXISTING_NAMES_JS'] = json.dumps([bl.get('name', '') for bl in blocklists]) vlans = cfg.get('vlans', []) vlan_checkboxes = ''.join( f'' for v in vlans if v.get('name') ) tokens['VLAN_USED_BY_CHECKBOXES'] = ( f'
' f'
{vlan_checkboxes}
' ) if vlan_checkboxes else '' content = factory.load_json(f'{factory.PAGES_DIR}/dnsblocking/content.json') for table_item in factory.iter_table_items(content.get('items', [])): ds = table_item.get('datasource', '') tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds)) return tokens