Development

This commit is contained in:
Matthew Grotke 2026-05-25 16:38:08 -04:00
parent 6221ee3691
commit 4150c6ef6e
4 changed files with 141 additions and 13 deletions

View file

@ -1,8 +1,8 @@
from flask import Blueprint, request, redirect, flash, session
from auth import require_level
from config_utils import (flush_selected_to_queue, delete_pending_by_uuids,
get_dashboard_pending, _is_locked, _format_timing,
_seconds_until_next_run)
from config_utils import (flush_pending_to_queue, flush_selected_to_queue,
delete_pending_by_uuids, get_dashboard_pending,
_is_locked, _format_timing, _seconds_until_next_run)
bp = Blueprint('action_actions', __name__)
@ -20,11 +20,10 @@ def actions_cardoptions_save():
@bp.route('/action/actions_cardpendingchanges_applynow', methods=['POST'])
@require_level('administrator')
def actions_cardpendingchanges_applynow():
selected_uuids = request.form.getlist('selected_uuids')
if not selected_uuids:
flash('No items selected.', 'info')
if not get_dashboard_pending():
flash('No pending changes to apply.', 'info')
return redirect(_VIEW)
flush_selected_to_queue(selected_uuids)
flush_pending_to_queue()
if _is_locked():
msg = 'Changes queued. They are being applied now.'
else:

View file

@ -144,6 +144,28 @@ def get_dashboard_pending():
return _read_dashboard_pending()
def get_dashboard_done():
"""Return list of (uuid, applied_ts) from .dashboard-done, newest first."""
items = []
try:
lines = open(DASHBOARD_DONE).read().splitlines()
except Exception:
return items
for line in lines:
if not line.strip():
continue
try:
parts = line.split(None, 1)
if len(parts) >= 2:
items.append((parts[0], int(parts[1])))
elif len(parts) == 1:
items.append((parts[0], None))
except Exception:
pass
items.reverse()
return items
def flush_pending_to_queue():
"""Move all entries from .dashboard-pending to .dashboard-queue and clear pending."""
items = _read_dashboard_pending()
@ -325,6 +347,64 @@ def _find_snapshot_dependencies(path, key):
return []
def _items_match(item, ref):
"""Return True if item and ref refer to the same entity by a common identifier field."""
if not isinstance(item, dict) or not isinstance(ref, dict):
return item == ref
for field in ('ip', 'name', 'mac_address', 'host', 'id', 'address'):
if field in ref and field in item:
return item[field] == ref[field]
return item == ref
def revert_snapshot_to_core(entry_uuid):
"""Apply the inverse of a snapshot to core.json and queue a new pending change.
Returns (flash_message, success_bool).
"""
snap = load_snapshot_for_uuid(entry_uuid)
if not snap:
return f'Snapshot not found for {entry_uuid[:8]}.', False
path = snap['path']
key = snap['key']
before = snap['before'] # original state to restore
after = snap['after'] # applied state to undo
operation = snap['operation']
if operation == 'revert':
return 'This change is already a revert; cannot revert again.', False
core = load_core()
if key == 'global':
if before is None:
core.pop(path, None)
else:
core[path] = before
else:
items = core.setdefault(path, [])
if operation == 'add':
core[path] = [x for x in items if not _items_match(x, after)]
elif operation == 'delete':
if before:
core[path].append(before)
else:
if before and after:
for i, item in enumerate(items):
if _items_match(item, after):
items[i] = before
break
msg = save_core_with_snapshot(
core, path=path, key=key, operation='revert',
before=after, after=before,
description=f"Reverted: {snap.get('description', '')}",
cmd=snap.get('cmd', 'core apply'),
)
return msg or 'Reverted.', True
def load_snapshot_for_uuid(entry_uuid):
"""Return the snapshot dict for the given UUID, or None if not found."""
try:

View file

@ -4,7 +4,7 @@ import json, re, subprocess, os, sys, html as html_mod
import sanitize
import validation as validate
from datetime import datetime, timezone
from config_utils import core_hash, get_pending_entries, get_dashboard_pending, load_snapshot_for_uuid, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME, CONFIGS_DIR, DATA_DIR
from config_utils import core_hash, get_pending_entries, get_dashboard_pending, get_dashboard_done, load_snapshot_for_uuid, _seconds_until_next_run, _format_timing, _is_locked, _lock_mtime, WEB_APP_DISPLAY_NAME, CONFIGS_DIR, DATA_DIR
bp = Blueprint('view_page', __name__)
@ -594,7 +594,7 @@ def collect_tokens():
pending_items = get_dashboard_pending()
if pending_items:
rows = ''
for _uuid, ts, cmd, user in pending_items:
for _uuid, ts, _cmd, user in pending_items:
snap = load_snapshot_for_uuid(_uuid)
dt_str = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
snap_desc = e(snap['description']) if snap else ''
@ -604,7 +604,6 @@ def collect_tokens():
rows += (f'<tr>'
f'<td class="table-cell"><input type="checkbox" name="selected_uuids" value="{e(_uuid)}"/></td>'
f'<td class="table-cell">{e(dt_str)}</td>'
f'<td class="table-cell">{e(cmd)}</td>'
f'<td class="table-cell">{snap_desc}</td>'
f'<td class="table-cell">{before_html}</td>'
f'<td class="table-cell">{after_html}</td>'
@ -620,7 +619,6 @@ def collect_tokens():
'<thead><tr>'
f'<th class="table-header">{select_all}</th>'
'<th class="table-header">Time</th>'
'<th class="table-header">Action</th>'
'<th class="table-header">Description</th>'
'<th class="table-header">Before</th>'
'<th class="table-header">After</th>'
@ -634,6 +632,45 @@ def collect_tokens():
pending_html = ''
tokens['PENDING_CHANGES_HTML'] = pending_html
done_items = get_dashboard_done()
if done_items:
hist_rows = ''
for _uuid, applied_ts in done_items:
snap = load_snapshot_for_uuid(_uuid)
if applied_ts:
dt_str = datetime.fromtimestamp(applied_ts).strftime('%Y-%m-%d %H:%M')
else:
dt_str = '-'
snap_desc = e(snap['description']) if snap else ''
before_html = _render_snap_val(snap.get('before') if snap else None)
after_html = _render_snap_val(snap.get('after') if snap else None)
snap_id = e(_uuid[:8]) if snap else ''
snap_user = e(snap['user']) if snap else ''
hist_rows += (f'<tr>'
f'<td class="table-cell">{e(dt_str)}</td>'
f'<td class="table-cell">{snap_desc}</td>'
f'<td class="table-cell">{before_html}</td>'
f'<td class="table-cell">{after_html}</td>'
f'<td class="table-cell">{snap_id}</td>'
f'<td class="table-cell">{snap_user}</td>'
f'</tr>')
history_html = (
'<table class="data-table" style="margin-bottom:1rem">'
'<thead><tr>'
'<th class="table-header">Applied</th>'
'<th class="table-header">Description</th>'
'<th class="table-header">Before</th>'
'<th class="table-header">After</th>'
'<th class="table-header">Snapshot</th>'
'<th class="table-header">User</th>'
'</tr></thead>'
f'<tbody>{hist_rows}</tbody>'
'</table>'
)
else:
history_html = ''
tokens['CHANGE_HISTORY_HTML'] = history_html
servers = dns.get('upstream_servers', [])
tokens['DNS_STRICT_ORDER'] = 'true' if dns.get('strict_order') else 'false'
tokens['DNS_CACHE_SIZE'] = str(dns.get('cache_size', '-'))
@ -1030,8 +1067,9 @@ def _render_item(item, tokens, inherited_req=None):
method = e(item.get('method', 'post'))
inner = render_items(item.get('items', []), tokens, req)
hash_field = f'<input type="hidden" name="config_hash" value="{e(core_hash())}"/>'
originals = json.dumps(_collect_form_originals(item.get('items', []), tokens))
orig_field = f'<input type="hidden" name="original_values" value="{e(originals)}"/>'
originals = _collect_form_originals(item.get('items', []), tokens)
orig_field = (f'<input type="hidden" name="original_values" value="{e(json.dumps(originals))}"/>'
if originals else '')
return f'<form action="{action}" method="{method}">{hash_field}{orig_field}{inner}</form>'
if t == 'hidden':

View file

@ -634,6 +634,17 @@
}
]
},
{
"type": "card",
"label": "Change History",
"client_requirement": "client_is_administrator+",
"items": [
{
"type": "raw_html",
"html": "%CHANGE_HISTORY_HTML%"
}
]
},
{
"type": "card",
"label": "Options",