Source code for kstlib.monitoring.delivery

"""Delivery backends for monitoring results.

This module provides delivery mechanisms for MonitoringResult outputs:

- **FileDelivery**: Save HTML to local files with rotation
- **MailDelivery**: Send via kstlib.mail transports (wrapper)

Examples:
    Save to file:

    >>> from kstlib.monitoring.delivery import FileDelivery
    >>> delivery = FileDelivery(output_dir="./reports")  # doctest: +SKIP
    >>> result = await delivery.deliver(monitoring_result, "daily")  # doctest: +SKIP
    >>> print(result.path)  # doctest: +SKIP

    Send via email:

    >>> from kstlib.monitoring.delivery import MailDelivery
    >>> delivery = MailDelivery(  # doctest: +SKIP
    ...     transport=gmail_transport,
    ...     sender="bot@example.com",
    ...     recipients=["team@example.com"],
    ... )
    >>> result = await delivery.deliver(monitoring_result, "Daily Report")  # doctest: +SKIP

"""

from __future__ import annotations

import asyncio
import pathlib
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from email.message import EmailMessage
from typing import TYPE_CHECKING, Any

from kstlib.monitoring.exceptions import MonitoringError

if TYPE_CHECKING:
    from kstlib.mail.transport import AsyncMailTransport, MailTransport
    from kstlib.monitoring.service import MonitoringResult

# Deep defense: Security limits
MAX_OUTPUT_DIR_DEPTH = 10  # Maximum directory depth from cwd
MAX_FILENAME_LENGTH = 200  # Maximum filename length
MAX_FILES_PER_DIR = 1000  # Maximum files to keep in output directory
MAX_FILE_SIZE = 50 * 1024 * 1024  # 50 MB max output file size
MAX_RECIPIENTS = 50  # Maximum email recipients
MAX_SUBJECT_LENGTH = 200  # Maximum email subject length

# Filename validation pattern (alphanumeric, dash, underscore, dot)
SAFE_FILENAME_PATTERN = re.compile(r"^[a-zA-Z0-9_\-\.]+$")

# Pre-compiled patterns for delivery processing
_UNSAFE_CHAR_PATTERN = re.compile(r"[^a-zA-Z0-9_\-]")
_HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
_WHITESPACE_PATTERN = re.compile(r"\s+")


[docs] class DeliveryError(MonitoringError): """Base exception for delivery errors."""
[docs] class DeliveryConfigError(DeliveryError, ValueError): """Invalid delivery configuration."""
[docs] class DeliveryIOError(DeliveryError, OSError): """I/O error during delivery."""
[docs] @dataclass(frozen=True, slots=True) class DeliveryResult: """Result of a delivery operation. Attributes: success: Whether delivery succeeded. timestamp: When delivery was attempted. path: Output file path (for file delivery). message_id: Email message ID (for mail delivery). error: Error message if delivery failed. metadata: Additional delivery metadata. """ success: bool timestamp: datetime path: pathlib.Path | None = None message_id: str | None = None error: str | None = None metadata: dict[str, Any] = field(default_factory=dict)
[docs] class DeliveryBackend(ABC): """Abstract base class for delivery backends."""
[docs] @abstractmethod async def deliver( self, result: MonitoringResult, name: str, ) -> DeliveryResult: """Deliver a monitoring result. Args: result: The MonitoringResult to deliver. name: Name/subject for this delivery. Returns: DeliveryResult with success status and metadata. """
def _validate_path_safety(path: pathlib.Path, base_dir: pathlib.Path) -> None: """Validate path is within allowed directory (deep defense).""" try: resolved = path.resolve() base_resolved = base_dir.resolve() resolved.relative_to(base_resolved) except ValueError as e: raise DeliveryConfigError(f"Path traversal detected: {path}") from e def _sanitize_filename(name: str, timestamp: datetime) -> str: """Create a safe filename from name and timestamp.""" # Remove unsafe characters safe_name = _UNSAFE_CHAR_PATTERN.sub("_", name) # Limit length safe_name = safe_name[:50] # Add timestamp ts = timestamp.strftime("%Y%m%d_%H%M%S") return f"{safe_name}_{ts}.html"
[docs] @dataclass class FileDeliveryConfig: """Configuration for file delivery. Attributes: output_dir: Directory to save files. filename_template: Template for filenames (supports {name}, {timestamp}). create_dirs: Create output directory if missing. max_files: Maximum files to keep (oldest deleted, 0=unlimited). encoding: File encoding. """ output_dir: str | pathlib.Path filename_template: str = "{name}_{timestamp}.html" create_dirs: bool = True max_files: int = 100 encoding: str = "utf-8"
[docs] def __post_init__(self) -> None: """Validate configuration after initialization.""" if isinstance(self.output_dir, str): object.__setattr__(self, "output_dir", pathlib.Path(self.output_dir)) # Deep defense: Validate max_files if self.max_files < 0: raise DeliveryConfigError("max_files cannot be negative") if self.max_files > MAX_FILES_PER_DIR: raise DeliveryConfigError(f"max_files exceeds limit ({MAX_FILES_PER_DIR})")
[docs] class FileDelivery(DeliveryBackend): """Deliver monitoring results to local files. Saves HTML output to files with automatic rotation and cleanup. Args: output_dir: Directory to save files (str or Path). filename_template: Template for filenames. create_dirs: Create output directory if missing. max_files: Maximum files to keep (oldest deleted when exceeded). encoding: File encoding. Examples: >>> delivery = FileDelivery(output_dir="./reports") # doctest: +SKIP >>> result = await delivery.deliver(monitoring_result, "daily") # doctest: +SKIP >>> print(f"Saved to: {result.path}") # doctest: +SKIP With rotation (keep last 7 files): >>> delivery = FileDelivery( # doctest: +SKIP ... output_dir="./reports", ... max_files=7, ... ) """
[docs] def __init__( self, output_dir: str | pathlib.Path, *, filename_template: str = "{name}_{timestamp}.html", create_dirs: bool = True, max_files: int = 100, encoding: str = "utf-8", ) -> None: """Initialize file delivery backend.""" self._config = FileDeliveryConfig( output_dir=pathlib.Path(output_dir), filename_template=filename_template, create_dirs=create_dirs, max_files=max_files, encoding=encoding, ) self._last_result: DeliveryResult | None = None
@property def config(self) -> FileDeliveryConfig: """Return the delivery configuration.""" return self._config @property def last_result(self) -> DeliveryResult | None: """Return the last delivery result.""" return self._last_result def _generate_filename(self, name: str, timestamp: datetime) -> str: """Generate filename from template.""" ts_str = timestamp.strftime("%Y%m%d_%H%M%S") # Sanitize name safe_name = _UNSAFE_CHAR_PATTERN.sub("_", name)[:50] filename = self._config.filename_template.format( name=safe_name, timestamp=ts_str, ) # Deep defense: Validate final filename if len(filename) > MAX_FILENAME_LENGTH: raise DeliveryConfigError(f"Generated filename too long ({len(filename)} > {MAX_FILENAME_LENGTH})") return filename def _cleanup_old_files(self, output_dir: pathlib.Path) -> int: """Remove oldest files if max_files exceeded. Returns count deleted.""" if self._config.max_files == 0: return 0 html_files = sorted( output_dir.glob("*.html"), key=lambda p: p.stat().st_mtime, ) to_delete = len(html_files) - self._config.max_files deleted = 0 if to_delete > 0: for old_file in html_files[:to_delete]: try: # reason: per-file best-effort cleanup on rotation old_file.unlink() deleted += 1 except OSError: pass # Best effort cleanup return deleted def _validate_output_dir(self, output_dir: pathlib.Path) -> None: """Validate output directory depth and existence (deep defense).""" try: cwd = pathlib.Path.cwd().resolve() rel_path = output_dir.relative_to(cwd) if len(rel_path.parts) > MAX_OUTPUT_DIR_DEPTH: raise DeliveryConfigError(f"Output directory too deep ({len(rel_path.parts)} > {MAX_OUTPUT_DIR_DEPTH})") except ValueError: # Path not relative to cwd, check absolute depth if len(output_dir.parts) > MAX_OUTPUT_DIR_DEPTH + 5: raise DeliveryConfigError("Output directory path too deep") from None # Create directory if needed if self._config.create_dirs: output_dir.mkdir(parents=True, exist_ok=True) if not output_dir.is_dir(): raise DeliveryConfigError(f"Output directory does not exist: {output_dir}")
[docs] async def deliver( self, result: MonitoringResult, name: str, ) -> DeliveryResult: """Save monitoring result HTML to a file. Args: result: The MonitoringResult to save. name: Name for this report (used in filename). Returns: DeliveryResult with file path on success. Raises: DeliveryIOError: If file cannot be written. DeliveryConfigError: If configuration is invalid. """ timestamp = datetime.now(timezone.utc) delivery_result: DeliveryResult | None = None try: output_dir = pathlib.Path(self._config.output_dir).resolve() # Deep defense: Validate directory self._validate_output_dir(output_dir) # Generate filename filename = self._generate_filename(name, timestamp) output_path = output_dir / filename # Deep defense: Validate path safety _validate_path_safety(output_path, output_dir) # Deep defense: Check content size html_bytes = result.html.encode(self._config.encoding) if len(html_bytes) > MAX_FILE_SIZE: raise DeliveryConfigError(f"Output too large ({len(html_bytes)} > {MAX_FILE_SIZE} bytes)") # Write file (run in executor for async compatibility) loop = asyncio.get_running_loop() await loop.run_in_executor( None, lambda: output_path.write_bytes(html_bytes), ) # Cleanup old files deleted = await loop.run_in_executor( None, lambda: self._cleanup_old_files(output_dir), ) delivery_result = DeliveryResult( success=True, timestamp=timestamp, path=output_path, metadata={ "size_bytes": len(html_bytes), "files_deleted": deleted, "encoding": self._config.encoding, }, ) except DeliveryError as e: delivery_result = DeliveryResult( success=False, timestamp=timestamp, error=str(e), ) raise except OSError as e: delivery_result = DeliveryResult( success=False, timestamp=timestamp, error=f"I/O error: {e}", ) raise DeliveryIOError(f"Failed to write file: {e}") from e except Exception as e: delivery_result = DeliveryResult( success=False, timestamp=timestamp, error=str(e), ) raise DeliveryError(f"Unexpected error during delivery: {e}") from e finally: if delivery_result is not None: self._last_result = delivery_result return delivery_result
[docs] @dataclass class MailDeliveryConfig: """Configuration for mail delivery. Attributes: sender: Sender email address. recipients: List of recipient addresses. cc: List of CC addresses. bcc: List of BCC addresses. subject_template: Subject template (supports {name}). include_plain_text: Include plain text version. """ sender: str recipients: list[str] cc: list[str] = field(default_factory=list) bcc: list[str] = field(default_factory=list) subject_template: str = "Monitoring Report: {name}" include_plain_text: bool = True
[docs] def __post_init__(self) -> None: """Validate configuration after initialization.""" if not self.sender: raise DeliveryConfigError("Sender address is required") if not self.recipients: raise DeliveryConfigError("At least one recipient is required") # Deep defense: Limit recipients total_recipients = len(self.recipients) + len(self.cc) + len(self.bcc) if total_recipients > MAX_RECIPIENTS: raise DeliveryConfigError(f"Too many recipients ({total_recipients} > {MAX_RECIPIENTS})")
[docs] class MailDelivery(DeliveryBackend): """Deliver monitoring results via email. Wraps kstlib.mail transports for monitoring delivery. Args: transport: Mail transport (sync or async). sender: Sender email address. recipients: List of recipient addresses. cc: Optional CC addresses. bcc: Optional BCC addresses. subject_template: Subject template with {name} placeholder. include_plain_text: Include plain text version of HTML. Examples: >>> from kstlib.mail.transports.gmail import GmailTransport >>> transport = GmailTransport(...) # doctest: +SKIP >>> delivery = MailDelivery( # doctest: +SKIP ... transport=transport, ... sender="bot@example.com", ... recipients=["team@example.com"], ... ) >>> result = await delivery.deliver(monitoring_result, "Daily Report") # doctest: +SKIP """
[docs] def __init__( self, transport: MailTransport | AsyncMailTransport, config: MailDeliveryConfig, ) -> None: """Initialize mail delivery backend. Args: transport: Mail transport (sync or async). config: Mail delivery configuration. """ self._transport = transport self._config = config self._last_result: DeliveryResult | None = None
[docs] @classmethod def create( cls, transport: MailTransport | AsyncMailTransport, sender: str, recipients: list[str], **kwargs: Any, ) -> MailDelivery: """Create a MailDelivery with configuration. Convenience factory method that creates the config internally. Args: transport: Mail transport (sync or async). sender: Sender email address. recipients: List of recipient addresses. **kwargs: Additional config options (cc, bcc, subject_template, etc.). Returns: Configured MailDelivery instance. """ config = MailDeliveryConfig( sender=sender, recipients=list(recipients), cc=list(kwargs.get("cc", [])) if kwargs.get("cc") else [], bcc=list(kwargs.get("bcc", [])) if kwargs.get("bcc") else [], subject_template=kwargs.get("subject_template", "Monitoring Report: {name}"), include_plain_text=kwargs.get("include_plain_text", True), ) return cls(transport, config)
@property def config(self) -> MailDeliveryConfig: """Return the delivery configuration.""" return self._config @property def last_result(self) -> DeliveryResult | None: """Return the last delivery result.""" return self._last_result def _build_message(self, html: str, subject: str) -> EmailMessage: """Build EmailMessage from HTML content.""" msg = EmailMessage() msg["From"] = self._config.sender msg["To"] = ", ".join(self._config.recipients) if self._config.cc: msg["Cc"] = ", ".join(self._config.cc) if self._config.bcc: msg["Bcc"] = ", ".join(self._config.bcc) # Deep defense: Validate subject length if len(subject) > MAX_SUBJECT_LENGTH: subject = subject[: MAX_SUBJECT_LENGTH - 3] + "..." msg["Subject"] = subject if self._config.include_plain_text: # Create multipart message with plain and HTML # Simple HTML to text conversion (strip tags) plain_text = _HTML_TAG_PATTERN.sub("", html) plain_text = _WHITESPACE_PATTERN.sub(" ", plain_text).strip() msg.set_content(plain_text) msg.add_alternative(html, subtype="html") else: msg.set_content(html, subtype="html") return msg
[docs] async def deliver( self, result: MonitoringResult, name: str, ) -> DeliveryResult: """Send monitoring result via email. Args: result: The MonitoringResult to send. name: Name for this report (used in subject). Returns: DeliveryResult with message ID on success. Raises: DeliveryError: If email cannot be sent. """ import inspect timestamp = datetime.now(timezone.utc) try: # Generate subject subject = self._config.subject_template.format(name=name) # Build message message = self._build_message(result.html, subject) # Send via transport if hasattr(self._transport, "send") and inspect.iscoroutinefunction(self._transport.send): await self._transport.send(message) else: # Sync transport - run in executor loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._transport.send, message) # Try to get message ID from transport response message_id = None if hasattr(self._transport, "last_response"): resp = self._transport.last_response if resp and hasattr(resp, "id"): message_id = resp.id delivery_result = DeliveryResult( success=True, timestamp=timestamp, message_id=message_id, metadata={ "recipients": len(self._config.recipients), "subject": subject, }, ) except Exception as e: delivery_result = DeliveryResult( success=False, timestamp=timestamp, error=str(e), ) raise DeliveryError(f"Failed to send email: {e}") from e finally: self._last_result = delivery_result return delivery_result
__all__ = [ "DeliveryBackend", "DeliveryConfigError", "DeliveryError", "DeliveryIOError", "DeliveryResult", "FileDelivery", "FileDeliveryConfig", "MailDelivery", "MailDeliveryConfig", ]