import json import os from datetime import datetime import config_utils import factory from pages.ddns.view import public_ip_info from pages.dhcpleases.view import live_dhcp_leases METRICS_FILE = f'{config_utils.CONFIGS_DIR}/.dns-metrics' def _fmt_since(since_str): try: dt = datetime.strptime(since_str, '%Y-%m-%d %H:%M:%S') now = datetime.now() rel = config_utils.relative_time(int(dt.timestamp()), int(now.timestamp())) if dt.date() == now.date(): return f'Today at {dt.strftime("%H:%M")} ({rel} ago)' return f'{dt.strftime("%Y-%m-%d")} ({rel} ago)' except Exception: return since_str def _fmt_updated(updated_str): try: dt = datetime.strptime(updated_str, '%Y-%m-%d %H:%M:%S') now = datetime.now() rel = config_utils.relative_time(int(dt.timestamp()), int(now.timestamp())) return f'{rel} ago' except Exception: return updated_str def _dns_providers_table(servers): if not servers: return '

No upstream server data recorded yet.

' rows = [] for s in servers: latency = f'{s["avg_latency_ms"]} ms' if s.get("avg_latency_ms") else '-' rows.append( f'' f'{factory.e(s.get("address", "-"))}' f'{s.get("queries_sent", 0):,}' f'{s.get("retried", 0):,}' f'{s.get("failed", 0):,}' f'{s.get("nxdomain", 0):,}' f'{latency}' f'' ) return ( '' '' '' '' '' '' '' '' + ''.join(rows) + '
ServerQueries SentRetriedFailedNXDOMAINAvg Latency
' ) def load_dns_metrics(): empty = {'queries': '-', 'hits': '-', 'hit_rate': '-', 'forwarded': '-', 'tcp_peak': '-', 'cache_evictions': '-', 'updated': '-', 'since': '-', 'servers': []} try: with open(METRICS_FILE) as f: data = json.load(f) t = data.get('totals', {}) meta = data.get('metadata', {}) fwd = t.get('queries_forwarded', 0) hits = t.get('queries_answered_locally', 0) total = fwd + hits since_raw = meta.get('first_recorded', '-') updated_raw = meta.get('last_recorded', '-') return { 'queries': f'{total:,}' if total else '-', 'hits': f'{hits:,}' if hits else '-', 'hit_rate': f'{hits / total * 100:.0f}%' if total > 0 else '-', 'forwarded': f'{fwd:,}' if fwd else '-', 'tcp_peak': str(t.get('tcp_hwm', 0)), 'cache_evictions': f'{t.get("cache_reused", 0):,}', 'updated': _fmt_updated(updated_raw), 'since': _fmt_since(since_raw), 'servers': t.get('servers', []), } except Exception: return empty def count_blocked_today(): out = factory.run("journalctl -u 'dnsmasq-routlin-*' --since '24 hours ago' --no-pager 2>/dev/null | grep -c ' is 0\\.0\\.0\\.0'") return out.strip() or '0' def count_blocked_domains(): try: total = sum( int(factory.run(f'wc -l < "{config_utils.BLOCKLISTS_DIR}/{f}"') or 0) for f in os.listdir(config_utils.BLOCKLISTS_DIR) if f.endswith('.con') ) return str(total) except Exception: return '-' def bl_last_update(): try: mtime = max( os.path.getmtime(f'{config_utils.BLOCKLISTS_DIR}/{f}') for f in os.listdir(config_utils.BLOCKLISTS_DIR) if f.endswith('.con') ) return config_utils.fmt_timestamp(int(mtime)) except Exception: return '-' def collect_tokens(cfg): tokens = config_utils.collect_layout_tokens(cfg) vlans = cfg.get('vlans', []) non_vpn_vlans = [v for v in vlans if not v.get('is_vpn')] vlan_names = [v.get('name', '') for v in vlans] net = cfg.get('network_interfaces', {}) dns = cfg.get('upstream_dns', {}) dns_stats = load_dns_metrics() ddns = factory.load_ddns() ip_str, domains_sub, last_obtained = public_ip_info(ddns) tokens['GENERAL_WAN_INTERFACE'] = str(net.get('wan_interface', '-')) tokens['OVERVIEW_VLAN_NAMES'] = ', '.join(vlan_names) or '-' tokens['STAT_VLAN_COUNT'] = str(len(non_vpn_vlans)) tokens['STAT_LEASE_COUNT'] = str(len(live_dhcp_leases())) tokens['STAT_BANNED_IP_COUNT'] = str(sum(1 for b in cfg.get('banned_ips', []) if b.get('enabled', True))) tokens['STAT_BLOCKLIST_COUNT'] = str(len(cfg.get('dns_blocking', {}).get('blocklists', []))) tokens['STAT_BLOCKED_TODAY'] = count_blocked_today() tokens['STAT_BLOCKED_DOMAINS'] = count_blocked_domains() tokens['STAT_BL_LAST_UPDATE'] = bl_last_update() tokens['STAT_UPTIME'] = factory.run('uptime -p') or '-' tokens['STAT_NFTABLES_STATUS'] = 'Active' if factory.run('nft list tables 2>/dev/null').strip() else 'Inactive' tokens['STAT_PUBLIC_IP'] = ip_str tokens['STAT_DDNS_HOSTNAME'] = domains_sub tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-')) tokens['DNS_STAT_QUERIES'] = dns_stats['queries'] tokens['DNS_STAT_HITS'] = dns_stats['hits'] tokens['DNS_STAT_HIT_RATE'] = dns_stats['hit_rate'] tokens['DNS_STAT_FORWARDED'] = dns_stats['forwarded'] tokens['DNS_STAT_CACHE_EVICTIONS'] = dns_stats['cache_evictions'] tokens['DNS_METRICS_UPDATED'] = dns_stats['updated'] tokens['DNS_METRICS_SINCE'] = dns_stats['since'] tokens['DNS_PROVIDERS_TABLE'] = _dns_providers_table(dns_stats['servers']) return tokens