linuxrouter/routlin/mod_dns_queries.py
2026-06-09 21:28:38 -04:00

182 lines
6 KiB
Python

"""
mod_dns_queries.py -- DNS query log collector.
Reads dnsmasq query logs from journalctl using a cursor bookmark,
parses query/result line pairs, and appends rows to a SQLite database.
Called by:
- maintenance.py on each timer tick
- routlin-dash overview page on each page load (background thread)
Only VLANs with dnsmasq_log_queries=true are collected.
"""
import json
import re
import sqlite3
import subprocess
from collections import defaultdict, deque
from pathlib import Path
import mod_shared as shared
import mod_validation as validation
DB_FILE = shared.SCRIPT_DIR / ".dns-queries"
QUERY_RE = re.compile(r'query\[(\w+)\] (\S+) from ([\d.]+)')
BLOCK_RE = re.compile(r'(\S+) is 0\.0\.0\.0$')
CACHED_RE = re.compile(r'cached (\S+) is ')
FWD_RE = re.compile(r'forwarded (\S+) to ')
REPLY_RE = re.compile(r'\breply (\S+) is ')
LOCAL_RE = re.compile(r'/\S+ (\S+) is ')
# ===================================================================
# Database
# ===================================================================
def open_db():
con = sqlite3.connect(DB_FILE, timeout=10)
con.execute('PRAGMA journal_mode=WAL')
con.executescript('''
CREATE TABLE IF NOT EXISTS dns_queries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
domain TEXT NOT NULL,
qtype TEXT NOT NULL,
client_ip TEXT NOT NULL,
vlan TEXT NOT NULL,
blocked INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_dq_ts ON dns_queries(ts);
CREATE INDEX IF NOT EXISTS idx_dq_domain ON dns_queries(domain, blocked);
CREATE INDEX IF NOT EXISTS idx_dq_client ON dns_queries(client_ip);
CREATE TABLE IF NOT EXISTS collector_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
''')
con.commit()
return con
def _get_cursor(con):
row = con.execute("SELECT value FROM collector_state WHERE key='cursor'").fetchone()
return row[0] if row else None
def _save_cursor(con, cursor_val):
con.execute(
"INSERT OR REPLACE INTO collector_state(key, value) VALUES ('cursor', ?)",
(cursor_val,)
)
# ===================================================================
# Collection
# ===================================================================
def collect(data):
"""
Fetch new dnsmasq log entries from journalctl since the last stored
cursor, parse query/result pairs, and insert into dns_queries.
Returns the number of rows inserted.
"""
unit_to_vlan = {}
for vlan in data.get('vlans', []):
if not vlan.get('dnsmasq_log_queries'):
continue
iface = validation.derive_interface(vlan, data)
svc = shared.vlan_service_name(vlan, iface)
unit_to_vlan[svc] = vlan['name']
unit_to_vlan[svc + '.service'] = vlan['name']
if not unit_to_vlan:
return 0
con = open_db()
journal_cursor = _get_cursor(con)
cmd = ['journalctl', '-u', 'dnsmasq-routlin-*', '--no-pager', '-o', 'json']
if journal_cursor:
cmd += ['--after-cursor', journal_cursor]
result = subprocess.run(cmd, capture_output=True, text=True)
# pending[domain] = deque of {ts, qtype, client_ip, vlan}
# FIFO so concurrent same-domain queries from different clients pair correctly.
pending = defaultdict(deque)
rows = []
last_cursor = journal_cursor
for line in result.stdout.splitlines():
try:
entry = json.loads(line)
except Exception:
continue
msg = entry.get('MESSAGE', '')
if not isinstance(msg, str):
continue
raw_unit = entry.get('_SYSTEMD_UNIT', '')
vlan_name = unit_to_vlan.get(raw_unit) or unit_to_vlan.get(raw_unit.removesuffix('.service'))
jcursor = entry.get('__CURSOR', '')
ts = int(entry.get('__REALTIME_TIMESTAMP', 0)) // 1_000_000
if vlan_name:
m = QUERY_RE.search(msg)
if m:
# Incoming query line -- push to pending, wait for result line
pending[m.group(2)].append({
'ts': ts, 'qtype': m.group(1),
'client_ip': m.group(3), 'vlan': vlan_name,
})
else:
# Result line -- identify domain and whether it was blocked
domain = None
blocked = 0
bm = BLOCK_RE.search(msg)
if bm:
domain = bm.group(1)
blocked = 1
else:
for pat in (CACHED_RE, FWD_RE, REPLY_RE, LOCAL_RE):
pm = pat.search(msg)
if pm:
domain = pm.group(1)
break
if domain and pending.get(domain):
p = pending[domain].popleft()
if not pending[domain]:
del pending[domain]
rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], blocked))
if jcursor:
last_cursor = jcursor
# Flush any pending entries that never received a result line.
# This can happen when the collector runs mid-transaction. We
# record them as not-blocked since if they were blocked dnsmasq
# would have answered synchronously and the result line would be
# in the same journal batch.
for domain, q in pending.items():
for p in q:
rows.append((p['ts'], domain, p['qtype'], p['client_ip'], p['vlan'], 0))
if rows:
con.executemany(
'INSERT INTO dns_queries(ts, domain, qtype, client_ip, vlan, blocked)'
' VALUES(?,?,?,?,?,?)',
rows
)
if last_cursor and last_cursor != journal_cursor:
_save_cursor(con, last_cursor)
con.commit()
shared.chown_to_script_dir_owner(DB_FILE)
con.close()
return len(rows)