import json import os from datetime import datetime, timezone import config_utils import factory DNS_LOG_FILE = f'{config_utils.BLOCKLISTS_DIR}/.log' DNS_LOG_MAX = 50 BL_TYPE_LABELS = {'community': 'Community', 'local': 'Local'} def _dnsblocking_log_tail(cfg): try: log_max_kb = cfg.get('dns_blocking', {}).get('general', {}).get('log_max_kb', 1024) size_kb = os.path.getsize(DNS_LOG_FILE) / 1024 with open(DNS_LOG_FILE) as f: lines = f.readlines() if not lines: return '(no log entries yet)', '' total = len(lines) tail = lines[-DNS_LOG_MAX:] shown = len(tail) hidden = total - shown pct = min(100, round(size_kb / log_max_kb * 100)) if log_max_kb else 0 left = f'Showing {shown} of {total} lines ({hidden} not shown)' if hidden > 0 else f'Showing {shown} of {total} lines' right = f'Log file size: {size_kb:.1f} KB ({pct}% of max)' summary = ( '
' f'{left}{right}
' ) return ''.join(tail).strip(), summary except FileNotFoundError: return '(no log entries yet)', '' except Exception: return '(error reading log)', '' WARN_ICON = ' ' ERROR_ICON = ' ' def _last_dl_time(): path = f'{config_utils.BLOCKLISTS_DIR}/.last-dl' try: return int(open(path).read().strip()) except Exception: return None def blocklist_stats_html(cfg): db_rows = config_utils._bl_db_rows() last_dl = _last_dl_time() bl_vlans = {} for vlan in cfg.get('vlans', []): for bl_name in vlan.get('use_blocklists', []): bl_vlans.setdefault(bl_name, []).append(vlan['name']) rows = '' for bl in cfg.get('dns_blocking', {}).get('blocklists', []): name = bl.get('name', '') is_local = bl.get('bl_type') == 'local' db = db_rows.get(name, {}) count = db.get('domain_count') entries = f'{count:,}' if count is not None else '-' if is_local: save_as = bl.get('save_as', '') bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else '' try: size_str = config_utils.fmt_bytes(os.path.getsize(bl_path)) except Exception: size_str = '-' last_refreshed = 'Local' warn = '' else: fetched_at = db.get('fetched_at') if fetched_at: last_refreshed = ( f'{datetime.fromtimestamp(fetched_at).strftime("%Y-%m-%d %H:%M")}' f' ({config_utils.relative_time(fetched_at, datetime.now(tz=timezone.utc).timestamp())} ago)' ) else: last_refreshed = 'Never' save_as = bl.get('save_as', '') bl_path = f'{config_utils.BLOCKLISTS_DIR}/{save_as}' if save_as else '' try: size_str = config_utils.fmt_bytes(os.path.getsize(bl_path)) file_mtime = int(os.path.getmtime(bl_path)) except Exception: size_str = '-' file_mtime = None if file_mtime is None: warn = ERROR_ICON elif last_dl and file_mtime < last_dl: warn = WARN_ICON else: warn = '' vlan_names = bl_vlans.get(name, []) used_by = ', '.join(factory.e(v) for v in vlan_names) if vlan_names else 'Not used by any VLANs' rows += ( '' f'{factory.e(name)}' f'{entries}' f'{size_str}' f'{factory.e(last_refreshed)}{warn}' f'{used_by}' '' ) if not rows: return '' return ( '' '' '' '' '' '' '' f'{rows}
BlocklistEntriesSizeLast RefreshedUsed by VLAN(s)
' ) def collect_tokens(cfg): tokens = config_utils.collect_layout_tokens(cfg) dns_blk_gen = cfg.get('dns_blocking', {}).get('general', {}) tokens['GENERAL_LOG_MAX_KB'] = str(dns_blk_gen.get('log_max_kb', '-')) tokens['GENERAL_LOG_ERRORS_ONLY'] = 'true' if dns_blk_gen.get('log_errors_only') else 'false' tokens['GENERAL_DAILY_EXECUTE_TIME'] = str(dns_blk_gen.get('daily_execute_time_24hr_local', '-')) tokens['BLOCKLIST_STATS_HTML'] = blocklist_stats_html(cfg) tokens['DNS_LOG_TAIL'], tokens['DNS_LOG_SUMMARY'] = _dnsblocking_log_tail(cfg) blocklists = cfg.get('dns_blocking', {}).get('blocklists', []) tokens['BLOCKLIST_EXISTING_NAMES_JS'] = json.dumps([bl.get('name', '') for bl in blocklists]) vlans = cfg.get('vlans', []) vlan_checkboxes = ''.join( f'' for v in vlans if v.get('name') ) tokens['VLAN_USED_BY_CHECKBOXES'] = ( f'
' f'
{vlan_checkboxes}
' ) if vlan_checkboxes else '' content = factory.load_json(f'{factory.PAGES_DIR}/dnsblocking/content.json') for table_item in factory.iter_table_items(content.get('items', [])): ds = table_item.get('datasource', '') tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, config_utils.load_datasource(ds)) return tokens