63 lines
2.8 KiB
Python
63 lines
2.8 KiB
Python
import json
|
|
import config_utils
|
|
import factory
|
|
|
|
|
|
def live_vpn_sessions():
|
|
rows = []
|
|
out = factory.run('wg show all dump 2>/dev/null')
|
|
for line in out.splitlines():
|
|
parts = line.split('\t')
|
|
if len(parts) == 9:
|
|
interface, pubkey, _psk, endpoint, allowed_ips, last_hs, rx, tx, _ka = parts
|
|
rows.append({
|
|
'peer_name': pubkey[:16] + '...',
|
|
'interface': interface,
|
|
'tunnel_ip': allowed_ips.split(',')[0].split('/')[0] if allowed_ips else '-',
|
|
'endpoint': endpoint if endpoint != '(none)' else '-',
|
|
'last_handshake': config_utils.fmt_timestamp(int(last_hs)) if last_hs.isdigit() and last_hs != '0' else 'Never',
|
|
'rx_bytes': config_utils.fmt_bytes(int(rx)) if rx.isdigit() else '-',
|
|
'tx_bytes': config_utils.fmt_bytes(int(tx)) if tx.isdigit() else '-',
|
|
})
|
|
return rows
|
|
|
|
|
|
def collect_tokens(cfg):
|
|
tokens = config_utils.collect_layout_tokens(cfg)
|
|
vlans = cfg.get('vlans', [])
|
|
wg_vlans_list = sorted(
|
|
[v for v in vlans if v.get('is_vpn')],
|
|
key=lambda v: (v.get('vlan_id') is None, v.get('vlan_id') or 0)
|
|
)
|
|
wg_vlan = wg_vlans_list[0] if wg_vlans_list else {}
|
|
vpn = wg_vlan.get('vpn_information', {})
|
|
overrides = vpn.get('explicit_overrides', {})
|
|
|
|
try:
|
|
import ipaddress
|
|
ident_ips = [s['ip'] for s in wg_vlan.get('server_identities', []) if s.get('ip')]
|
|
if ident_ips:
|
|
default_gw = str(min((ipaddress.IPv4Address(ip) for ip in ident_ips), key=lambda x: x.packed[-1]))
|
|
else:
|
|
wg_net = ipaddress.IPv4Network(f"{wg_vlan['subnet']}/{wg_vlan['subnet_mask']}", strict=False)
|
|
default_gw = str(next(wg_net.hosts()))
|
|
vpn_gateway = overrides.get('gateway') or default_gw
|
|
except Exception:
|
|
vpn_gateway = ''
|
|
|
|
tokens['VPN_VLAN_OPTIONS'] = json.dumps([
|
|
{'value': v.get('name', ''), 'label': f'wg{i} (VLAN {v.get("vlan_id") or "?"})'}
|
|
for i, v in enumerate(wg_vlans_list)
|
|
])
|
|
tokens['VPN_LISTEN_PORT'] = str(vpn.get('listen_port', ''))
|
|
tokens['VPN_SERVER_ENDPOINT'] = str(vpn.get('server_endpoint', ''))
|
|
tokens['VPN_DOMAIN'] = str(vpn.get('domain', ''))
|
|
tokens['VPN_DNS_SERVER'] = str(overrides.get('dns_servers', ''))
|
|
tokens['VPN_MTU'] = str(overrides.get('mtu', ''))
|
|
tokens['VPN_GATEWAY'] = vpn_gateway
|
|
content = factory.load_json(f'{factory.PAGES_DIR}/vpn/content.json')
|
|
for table_item in factory.iter_table_items(content.get('items', [])):
|
|
ds = table_item.get('datasource', '')
|
|
rows = live_vpn_sessions() if ds == 'live:vpn_sessions' else config_utils.load_datasource(ds)
|
|
tokens[factory.table_token_key(ds)] = factory.build_table(table_item, tokens, rows)
|
|
return tokens
|