"""Thread-safe bounded collector for notify results.
Used in conjunction with :meth:`kstlib.mail.MailBuilder.notify` to accumulate
:class:`~kstlib.mail.NotifyResult` instances across multiple decorated calls,
then render a summary email at run end via
:meth:`kstlib.mail.MailBuilder.send_summary`.
"""
from __future__ import annotations
import html
import threading
from collections import deque
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from kstlib.mail.builder import NotifyResult
from kstlib.monitoring import MonitorTable
_DEFAULT_MAXSIZE = 1000
_DETAIL_MAX_CHARS = 200
def _truncate(text: str, limit: int = _DETAIL_MAX_CHARS) -> str:
"""Return ``text`` truncated to ``limit`` characters with ellipsis if needed."""
if len(text) <= limit:
return text
return text[: limit - 3] + "..."
_REDACTED_DETAIL = "[REDACTED] (NotifyCollector(redact_user_data=False) to view)"
def _format_detail(result: NotifyResult, *, redact: bool = True) -> str:
"""Build the Detail column string for a result.
KO: ``"{ExcType}: {msg}"`` truncated.
OK with return_value: ``repr(return_value)`` truncated.
OK without return_value: empty string.
When ``redact`` is True (default), exception messages and return
values are replaced with a redaction placeholder. The exception type
name is still surfaced for KO rows so the failure mode stays visible.
"""
if not result.success and result.exception is not None:
exc_type = type(result.exception).__name__
if redact:
return f"{exc_type}: {_REDACTED_DETAIL}"
return _truncate(f"{exc_type}: {result.exception}")
if result.success and result.return_value is not None:
if redact:
return _REDACTED_DETAIL
return _truncate(repr(result.return_value))
return ""
[docs]
class NotifyCollector:
"""Thread-safe bounded collector of :class:`NotifyResult` instances.
Designed to be passed to :meth:`kstlib.mail.MailBuilder.notify` via the
``collector=`` kwarg, accumulating results across multiple decorated
function calls. Provides several rendering helpers for summary emails.
Capacity is bounded to ``maxsize`` items with FIFO eviction when full.
Args:
maxsize: Maximum number of results to retain. Older entries are
evicted FIFO when the bound is reached. Must be positive.
Raises:
ValueError: If ``maxsize`` is not a positive integer.
Examples:
>>> from kstlib.mail import NotifyCollector
>>> collector = NotifyCollector(maxsize=500)
>>> collector.ok_count
0
>>> collector.ko_count
0
"""
[docs]
def __init__(
self,
*,
maxsize: int = _DEFAULT_MAXSIZE,
redact_user_data: bool = True,
) -> None:
"""Initialize the NotifyCollector.
Args:
maxsize: Maximum number of results retained (FIFO eviction).
redact_user_data: When True (default), exception messages,
return values, and tracebacks from the decorated user
functions are replaced with a redaction placeholder
before they appear in the rendered summary. Set to
False to opt-in to the legacy verbatim behaviour --
only do so if the decorated functions never raise with
sensitive content in their messages and never return
objects whose ``repr()`` would expose secrets.
"""
if not isinstance(maxsize, int) or maxsize <= 0:
msg = f"maxsize must be a positive integer, got: {maxsize!r}"
raise ValueError(msg)
self._maxsize = maxsize
self._lock = threading.Lock()
self._results: deque[NotifyResult] = deque(maxlen=maxsize)
self._redact_user_data = redact_user_data
[docs]
def add(self, result: NotifyResult) -> None:
"""Append a result to the collector (thread-safe, FIFO bounded).
Args:
result: The notify result to record.
"""
with self._lock:
self._results.append(result)
[docs]
def reset(self) -> None:
"""Remove every recorded result (thread-safe)."""
with self._lock:
self._results.clear()
@property
def maxsize(self) -> int:
"""Return the configured maximum capacity."""
return self._maxsize
@property
def results(self) -> list[NotifyResult]:
"""Return a snapshot copy of recorded results in insertion order."""
with self._lock:
return list(self._results)
@property
def ok_count(self) -> int:
"""Return the number of successful results."""
with self._lock:
return sum(1 for r in self._results if r.success)
@property
def ko_count(self) -> int:
"""Return the number of failed results."""
with self._lock:
return sum(1 for r in self._results if not r.success)
@property
def total_count(self) -> int:
"""Return the total number of recorded results."""
with self._lock:
return len(self._results)
[docs]
def render_html(self, *, include_tracebacks: bool = True) -> str:
"""Render results as a standalone HTML table with inline CSS.
Produces a self-contained HTML fragment suitable for embedding in an
email body. Each row is colored green for success or red for failure
on the Status column. When ``include_tracebacks`` is true and any
failure carries a traceback, a collapsible ``<details>`` block lists
them under the table.
Args:
include_tracebacks: Whether to append a collapsible block with
the full traceback of each failed result.
Returns:
HTML string with the summary table (and optional traceback block).
"""
snapshot = self.results
ok_count = sum(1 for r in snapshot if r.success)
ko_count = len(snapshot) - ok_count
rows: list[str] = []
for r in snapshot:
if r.success:
status_label = "OK"
status_bg = "#16a34a"
else:
status_label = "FAILED"
status_bg = "#dc2626"
status_cell = (
f'<td style="padding:6px 10px;border-bottom:1px solid #e5e7eb">'
f'<span style="display:inline-block;padding:2px 8px;border-radius:4px;'
f'background:{status_bg};color:#fff;font-weight:600">{status_label}</span>'
f"</td>"
)
cells_style = "padding:6px 10px;border-bottom:1px solid #e5e7eb"
rows.append(
"<tr>"
f'<td style="{cells_style}"><code>{html.escape(r.function_name)}</code></td>'
f"{status_cell}"
f'<td style="{cells_style}">{r.started_at.isoformat()}</td>'
f'<td style="{cells_style};text-align:right">{r.duration_ms:.2f}</td>'
f'<td style="{cells_style}">{html.escape(_format_detail(r, redact=self._redact_user_data))}</td>'
"</tr>"
)
header_style = (
"padding:8px 10px;background:#1f2937;color:#fff;"
"text-align:left;font-weight:600;border-bottom:1px solid #111827"
)
table_style = 'style="border-collapse:collapse;font-family:Arial,sans-serif;font-size:13px;width:100%"'
summary = (
f'<p style="font-family:Arial,sans-serif;font-size:13px;margin:0 0 8px 0">'
f"<strong>Total:</strong> {len(snapshot)} "
f'(<span style="color:#16a34a">OK: {ok_count}</span>, '
f'<span style="color:#dc2626">FAILED: {ko_count}</span>)'
f"</p>"
)
table_html = (
f"{summary}"
f"<table {table_style}>"
f"<thead><tr>"
f'<th style="{header_style}">Function</th>'
f'<th style="{header_style}">Status</th>'
f'<th style="{header_style}">Started</th>'
f'<th style="{header_style};text-align:right">Duration (ms)</th>'
f'<th style="{header_style}">Detail</th>'
f"</tr></thead>"
f"<tbody>{''.join(rows)}</tbody>"
f"</table>"
)
if include_tracebacks and not self._redact_user_data:
# Tracebacks may carry user-code locals / exception payloads;
# they are suppressed when redact_user_data is True regardless
# of the include_tracebacks flag. The opt-out path requires the
# caller to set redact_user_data=False explicitly.
tb_blocks: list[str] = [
(
f"<h4 style='font-family:Arial,sans-serif;margin:12px 0 4px 0'>"
f"<code>{html.escape(r.function_name)}</code></h4>"
f"<pre style='background:#f3f4f6;padding:10px;border-radius:4px;"
f"font-size:12px;overflow:auto'>{html.escape(r.traceback_str)}</pre>"
)
for r in snapshot
if not r.success and r.traceback_str
]
if tb_blocks:
table_html += (
"<details style='margin-top:16px;font-family:Arial,sans-serif'>"
"<summary style='cursor:pointer;font-weight:600'>Tracebacks</summary>"
f"{''.join(tb_blocks)}"
"</details>"
)
return table_html
[docs]
def render_plain(self) -> str:
"""Render results as a plain text summary.
Returns:
Plain text string with header, per-result rows, and totals.
"""
snapshot = self.results
ok_count = sum(1 for r in snapshot if r.success)
ko_count = len(snapshot) - ok_count
lines = [
f"Summary: {len(snapshot)} total (OK: {ok_count}, FAILED: {ko_count})",
"",
]
for r in snapshot:
status = "OK" if r.success else "FAILED"
detail = _format_detail(r, redact=self._redact_user_data)
base = f"[{status}] {r.function_name} (started={r.started_at.isoformat()}, duration={r.duration_ms:.2f} ms)"
lines.append(f"{base} - {detail}" if detail else base)
return "\n".join(lines)
[docs]
def to_monitor_table(self) -> MonitorTable:
"""Build a :class:`~kstlib.monitoring.MonitorTable` of the recorded results.
Imports are routed through the public ``kstlib.monitoring`` API to
avoid cross-feature private imports.
Returns:
A populated MonitorTable with five columns
(Function, Status, Started, Duration (ms), Detail).
"""
from kstlib.monitoring import MonitorTable, StatusCell, StatusLevel
table = MonitorTable(headers=["Function", "Status", "Started", "Duration (ms)", "Detail"])
for r in self.results:
status_cell = StatusCell("OK", StatusLevel.OK) if r.success else StatusCell("FAILED", StatusLevel.ERROR)
table.add_row(
[
r.function_name,
status_cell,
r.started_at.isoformat(),
f"{r.duration_ms:.2f}",
_format_detail(r, redact=self._redact_user_data),
]
)
return table
[docs]
def to_context(self) -> dict[str, Any]:
"""Build a Jinja-friendly context dict from the recorded results.
Returns:
Dict with keys: ``results``, ``ok_count``, ``ko_count``,
``total_count``, ``ok_ratio``, ``started_at``, ``ended_at``,
``total_duration_ms``.
"""
snapshot = self.results
ok_count = sum(1 for r in snapshot if r.success)
total = len(snapshot)
ko_count = total - ok_count
if snapshot:
started_at = min(r.started_at for r in snapshot)
ended_at = max(r.ended_at for r in snapshot)
total_duration_ms = sum(r.duration_ms for r in snapshot)
else:
started_at = None
ended_at = None
total_duration_ms = 0.0
ok_ratio = (ok_count / total) if total else 0.0
return {
"results": snapshot,
"ok_count": ok_count,
"ko_count": ko_count,
"total_count": total,
"ok_ratio": ok_ratio,
"started_at": started_at,
"ended_at": ended_at,
"total_duration_ms": total_duration_ms,
# Surface the redaction flag so templates can choose whether to
# render ``{{ result.exception }}`` etc. The raw NotifyResult
# objects under ``results`` are not modified : custom templates
# remain free to read user-provided fields directly. The
# built-in render_html / render_plain / to_monitor_table all
# honour this flag automatically.
"redact_user_data": self._redact_user_data,
}
__all__ = ["NotifyCollector"]