Source code for kstlib.monitoring.service

"""MonitoringService orchestrator for data collection, rendering, and delivery.

The MonitoringService provides a high-level API for building monitoring dashboards:

1. **Collect** - Run data collectors (sync or async callables)
2. **Render** - Generate HTML using Jinja2 templates and render types
3. **Deliver** - Send via email (kstlib.mail) or save to file

Examples:
    Basic usage with collectors:

    >>> from kstlib.monitoring import MonitoringService, StatusCell, StatusLevel
    >>> service = MonitoringService(
    ...     template=\"\"\"<p>Status: {{ status | render }}</p>\"\"\",
    ...     collectors={
    ...         "status": lambda: StatusCell("UP", StatusLevel.OK),
    ...     },
    ... )
    >>> result = service.run_sync()
    >>> "status-ok" in result.html
    True

    Async collectors for real-time data:

    >>> async def get_metrics():
    ...     # Fetch from API, database, etc.
    ...     return {"cpu": 75.2, "memory": 8192}
    >>> service = MonitoringService(
    ...     template=\"\"\"<p>CPU: {{ metrics.cpu }}%</p>\"\"\",
    ...     collectors={"metrics": get_metrics},
    ... )

"""

from __future__ import annotations

import asyncio
import inspect
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from kstlib.monitoring.exceptions import CollectorError, RenderError
from kstlib.monitoring.renderer import create_environment

if TYPE_CHECKING:
    from email.message import EmailMessage

    from kstlib.mail.transport import AsyncMailTransport, MailTransport

# Type alias for collectors: sync or async callables returning any data
Collector = Callable[[], Any] | Callable[[], Awaitable[Any]]


[docs] @dataclass class MonitoringResult: """Result of a monitoring run. Attributes: html: The rendered HTML string. data: The collected data dictionary. collected_at: Timestamp when data was collected. rendered_at: Timestamp when HTML was rendered. errors: List of collector errors (if fail_fast=False). """ html: str data: dict[str, Any] collected_at: datetime rendered_at: datetime errors: list[CollectorError] = field(default_factory=list) @property def success(self) -> bool: """Return True if no collector errors occurred.""" return len(self.errors) == 0
[docs] class MonitoringService: """Orchestrator for collecting, rendering, and delivering monitoring dashboards. The service manages the full lifecycle of a monitoring dashboard: 1. **Collectors** - Register data sources as sync or async callables 2. **Template** - Jinja2 template with ``| render`` filter support 3. **Render** - Generate HTML with automatic CSS handling 4. **Deliver** - Optional email delivery via kstlib.mail transports Args: template: Jinja2 template string for rendering the dashboard. collectors: Optional dict mapping names to collector callables. inline_css: If True (default), use inline CSS for email compatibility. fail_fast: If True, raise on first collector error. If False, continue and report errors in the result. Examples: Simple dashboard with status cell: >>> from kstlib.monitoring import MonitoringService, StatusCell, StatusLevel >>> service = MonitoringService( ... template=\"\"\"<div>{{ status | render }}</div>\"\"\", ... collectors={"status": lambda: StatusCell("OK", StatusLevel.OK)}, ... ) >>> result = service.run_sync() >>> "OK" in result.html True Adding collectors after construction (chainable): >>> service = MonitoringService(template="<p>{{ msg }}</p>") >>> service.add_collector("msg", lambda: "Hello").run_sync().html '<p>Hello</p>' """
[docs] def __init__( self, template: str, collectors: dict[str, Collector] | None = None, *, inline_css: bool = True, fail_fast: bool = True, ) -> None: """Initialize the monitoring service. Args: template: Jinja2 template string. collectors: Optional initial collectors dict. inline_css: Use inline CSS for email compatibility. fail_fast: Raise immediately on collector errors. """ self._template = template self._collectors: dict[str, Collector] = dict(collectors) if collectors else {} self._inline_css = inline_css self._fail_fast = fail_fast self._env = create_environment()
@property def template(self) -> str: """Return the template string.""" return self._template @property def inline_css(self) -> bool: """Return whether inline CSS is enabled.""" return self._inline_css @property def collector_names(self) -> list[str]: """Return list of registered collector names.""" return list(self._collectors)
[docs] def add_collector(self, name: str, collector: Collector) -> MonitoringService: """Add a data collector. Collectors are callables (sync or async) that return data to be passed to the template. The returned data can be any type including Renderable objects like StatusCell, MonitorTable, etc. Args: name: Name to use in the template (e.g., "status" for {{ status }}). collector: Callable returning the data. Can be sync or async. Returns: Self for method chaining. Examples: >>> service = MonitoringService(template="<p>{{ x }} + {{ y }}</p>") >>> service.add_collector("x", lambda: 1).add_collector("y", lambda: 2) <kstlib.monitoring.service.MonitoringService object at ...> """ self._collectors[name] = collector return self
[docs] def remove_collector(self, name: str) -> MonitoringService: """Remove a collector by name. Args: name: Name of the collector to remove. Returns: Self for method chaining. Raises: KeyError: If collector name not found. """ del self._collectors[name] return self
def _cancel_tasks(self, tasks: dict[str, asyncio.Task[Any]], exclude: str = "") -> None: """Cancel all pending async tasks except the excluded one.""" for name, task in tasks.items(): if name != exclude and not task.done(): task.cancel() def _handle_collector_error( self, name: str, exc: Exception, errors: list[CollectorError], tasks: dict[str, asyncio.Task[Any]], ) -> CollectorError: """Handle a collector error: cancel tasks if fail_fast, else append to errors.""" error = CollectorError(name, exc) if self._fail_fast: self._cancel_tasks(tasks) raise error from exc errors.append(error) return error
[docs] async def collect(self) -> tuple[dict[str, Any], list[CollectorError]]: """Run all collectors and gather data. Collectors are run concurrently when possible. Async collectors are awaited, sync collectors are called directly. Returns: Tuple of (collected data dict, list of errors). Raises: CollectorError: If fail_fast=True and any collector fails. """ data: dict[str, Any] = {} errors: list[CollectorError] = [] async_tasks: dict[str, asyncio.Task[Any]] = {} sync_collectors: dict[str, Collector] = {} # Separate and schedule collectors for name, collector in self._collectors.items(): if inspect.iscoroutinefunction(collector): async_tasks[name] = asyncio.create_task(collector()) else: sync_collectors[name] = collector # Run sync collectors for name, collector in sync_collectors.items(): try: # reason: per-collector isolation; one failure should not abort dashboard data[name] = collector() except Exception as e: self._handle_collector_error(name, e, errors, async_tasks) data[name] = None # Await async collectors for name, task in async_tasks.items(): try: # reason: per-task isolation; one failure should not abort dashboard data[name] = await task except asyncio.CancelledError: raise except Exception as e: self._handle_collector_error(name, e, errors, async_tasks) data[name] = None return data, errors
[docs] def render(self, data: dict[str, Any]) -> str: """Render the template with collected data. Args: data: Dictionary of data to pass to the template. Returns: Rendered HTML string. Raises: RenderError: If template rendering fails. """ from kstlib.monitoring._styles import get_css_classes try: template = self._env.from_string(self._template) html = template.render(**data) except Exception as e: raise RenderError(f"Template rendering failed: {e}") from e # Prepend CSS classes if not using inline CSS if not self._inline_css: html = get_css_classes() + "\n" + html return html
[docs] async def run(self) -> MonitoringResult: """Collect data and render the dashboard. This is the main entry point for async usage. It runs all collectors, renders the template, and returns a MonitoringResult. Returns: MonitoringResult with HTML, data, timestamps, and any errors. Examples: >>> import asyncio >>> service = MonitoringService( ... template="<p>{{ msg }}</p>", ... collectors={"msg": lambda: "Hello"}, ... ) >>> result = asyncio.run(service.run()) >>> "Hello" in result.html True """ collected_at = datetime.now(timezone.utc) data, errors = await self.collect() html = self.render(data) rendered_at = datetime.now(timezone.utc) return MonitoringResult( html=html, data=data, collected_at=collected_at, rendered_at=rendered_at, errors=errors, )
[docs] def run_sync(self) -> MonitoringResult: """Run the monitoring service synchronously. Convenience method for non-async contexts. Creates a new event loop if needed. Returns: MonitoringResult with HTML, data, timestamps, and any errors. Examples: >>> service = MonitoringService( ... template="<p>{{ msg }}</p>", ... collectors={"msg": lambda: "World"}, ... ) >>> "World" in service.run_sync().html True """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop is not None: # Already in an async context - can't use asyncio.run # Use a new thread or raise import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(asyncio.run, self.run()) return future.result() return asyncio.run(self.run())
[docs] async def deliver( self, transport: MailTransport | AsyncMailTransport, message_builder: Callable[[str], EmailMessage], ) -> MonitoringResult: """Collect, render, and deliver via email transport. This combines run() with email delivery. The message_builder callable receives the rendered HTML and should return a complete EmailMessage. Args: transport: Mail transport (sync or async) from kstlib.mail. message_builder: Callable that takes HTML and returns EmailMessage. Returns: MonitoringResult from the run. Examples: >>> from email.message import EmailMessage >>> def build_message(html: str) -> EmailMessage: ... msg = EmailMessage() ... msg["From"] = "bot@example.com" ... msg["To"] = "team@example.com" ... msg["Subject"] = "Dashboard" ... msg.set_content(html, subtype="html") ... return msg """ result = await self.run() message = message_builder(result.html) # Check if transport is async or sync if hasattr(transport, "send") and inspect.iscoroutinefunction(transport.send): await transport.send(message) else: # Sync transport - run in thread pool loop = asyncio.get_running_loop() await loop.run_in_executor(None, transport.send, message) return result
__all__ = [ "Collector", "MonitoringResult", "MonitoringService", ]