linuxrouter/docker/routlin-dash/app/pages/dhcpleases/view.py
2026-06-03 02:57:59 -04:00

193 lines
6.5 KiB
Python

import ipaddress
import json
import os
import glob
from datetime import datetime, timezone
from config_utils import collect_layout_tokens, load_config, relative_time
from factory import (
load_json, build_table, table_token_key, iter_table_items, PAGES_DIR, e,
)
try:
import manuf as _manuf_mod
_mac_parser = _manuf_mod.MacParser()
except Exception:
_mac_parser = None
try:
from mac_vendor_lookup import MacLookup as _MacLookup
_mac_lookup = _MacLookup()
except Exception:
_mac_lookup = None
def _get_vendor(mac):
short, long = '', ''
if _mac_parser:
try:
short = _mac_parser.get_manuf(mac) or ''
except Exception:
pass
if _mac_lookup:
try:
long = _mac_lookup.lookup(mac) or ''
except Exception:
pass
return (short, long)
def _vendor_cell(vendor):
short, long = vendor
display = short if short else (long[:8] if long else '')
if not display:
return '-'
if long:
return f'<span data-vendor-long="{e(long)}">{e(display)}</span>'
return e(display)
def _get_arp_table():
"""Return {mac_lower: entry} from the ARP cache written by maintenance.py."""
try:
with open('/var/lib/misc/arp-cache.json') as f:
return json.load(f)
except Exception:
return {}
def _status_badge(state):
if state == 'REACHABLE':
return '<span class="badge badge-enabled">Online</span>'
return '<span class="badge badge-disabled">Offline</span>'
def _vlan_for_ip(ip_str, vlans):
"""Return VLAN name whose subnet contains ip_str, or '-'."""
try:
addr = ipaddress.IPv4Address(ip_str)
except ValueError:
return '-'
for v in vlans:
subnet = v.get('subnet', '')
mask = v.get('subnet_mask', '')
if subnet and mask:
try:
if addr in ipaddress.IPv4Network(f'{subnet}/{mask}', strict=False):
return v.get('name', '-')
except ValueError:
pass
return '-'
def _parse_lease_secs(s):
s = str(s).strip().lower()
try:
if s.endswith('h'): return int(s[:-1]) * 3600
if s.endswith('m'): return int(s[:-1]) * 60
if s.endswith('d'): return int(s[:-1]) * 86400
except ValueError:
pass
return None
def live_dhcp_leases():
rows = []
now = int(datetime.now(tz=timezone.utc).timestamp())
cfg = load_config()
vlans = cfg.get('vlans', [])
arp_table = _get_arp_table()
lease_macs = set()
vlan_lease_secs = {
v['name']: _parse_lease_secs(v.get('dhcp_information', {}).get('lease_time', ''))
for v in vlans if v.get('name')
}
mac_to_res = {
r['mac'].lower(): r['hostname']
for r in cfg.get('dhcp_reservations', [])
if r.get('mac') and r.get('hostname')
}
mac_to_desc = {
r['mac'].lower(): r['description']
for r in cfg.get('dhcp_reservations', [])
if r.get('mac') and r.get('description')
}
for leases_file in glob.glob('/var/lib/misc/dnsmasq-routlin-*.leases'):
stem = os.path.basename(leases_file)
vlan_name = stem[len('dnsmasq-routlin-'):-len('.leases')]
lease_secs = vlan_lease_secs.get(vlan_name)
try:
with open(leases_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) < 4:
continue
expiry = int(parts[0])
if expiry < now:
continue
obtained_ts = (expiry - lease_secs) if lease_secs else None
renews_ts = (expiry - lease_secs // 2) if lease_secs else None
if obtained_ts is None:
last_active = '-'
elif obtained_ts <= now:
last_active = relative_time(obtained_ts, now, short=True) + ' ago'
elif renews_ts and renews_ts > now:
last_active = 'ETA ' + relative_time(renews_ts, now, short=True)
else:
last_active = 'ETA soon'
mac_norm = parts[1].lower()
device_h = parts[3] if parts[3] != '*' else None
res_h = mac_to_res.get(mac_norm)
desc = mac_to_desc.get(mac_norm)
name = res_h or device_h
if name:
desc_attr = f' data-hostname-desc="{e(desc)}"' if desc else ''
hostname_html = f'<span{desc_attr}>{e(name)}</span>' if desc_attr else e(name)
else:
hostname_html = '-'
arp_entry = arp_table.get(mac_norm, {})
lease_macs.add(mac_norm)
rows.append({
'hostname': hostname_html,
'ip_address': parts[2],
'mac_address': parts[1],
'vendor': _vendor_cell(_get_vendor(parts[1])),
'vlan_name': vlan_name,
'last_active': last_active,
'renews': 'in ' + relative_time(renews_ts or expiry, now, short=True),
'status': _status_badge(arp_entry.get('state', '')),
})
except Exception:
pass
for mac, arp in arp_table.items():
if mac in lease_macs:
continue
rows.append({
'hostname': '-',
'ip_address': arp['ip'],
'mac_address': mac,
'vendor': _vendor_cell(_get_vendor(mac)),
'vlan_name': _vlan_for_ip(arp['ip'], vlans),
'last_active': '',
'renews': '',
'status': _status_badge(arp['state']),
})
return rows
def collect_tokens(cfg):
tokens = collect_layout_tokens(cfg)
vlans = cfg.get('vlans', [])
vlan_names = [v.get('name', '') for v in vlans]
filter_opts = '<option value="all">All VLANs</option>' + ''.join(
f'<option value="{n}">{n}</option>' for n in vlan_names
)
tokens['VLAN_FILTER_OPTIONS'] = filter_opts
content = load_json(f'{PAGES_DIR}/dhcpleases/content.json')
for table_item in iter_table_items(content.get('items', [])):
ds = table_item.get('datasource', '')
rows = live_dhcp_leases() if ds == 'live:dhcp_leases' else []
tokens[table_token_key(ds)] = build_table(table_item, tokens, rows)
return tokens