Source code for kstlib.alerts.manager

"""Alert manager for orchestrating multi-channel delivery.

The AlertManager coordinates sending alerts to multiple channels with
per-channel level filtering and optional throttling.

Examples:
    Basic setup::

        from kstlib.alerts import AlertManager, AlertLevel
        from kstlib.alerts.channels import SlackChannel, EmailChannel

        manager = AlertManager()
        manager.add_channel(slack_channel, min_level=AlertLevel.WARNING)
        manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL)

        results = await manager.send(alert)

    With throttling::

        from kstlib.alerts.throttle import AlertThrottle

        throttle = AlertThrottle(rate=10, per=60.0)
        manager.add_channel(slack_channel, throttle=throttle)

    Fluent API::

        manager = (
            AlertManager()
            .add_channel(slack_channel, min_level=AlertLevel.INFO)
            .add_channel(email_channel, min_level=AlertLevel.CRITICAL)
        )

"""

from __future__ import annotations

import asyncio
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from kstlib.alerts.channels.base import AlertChannel, AsyncAlertChannel, AsyncChannelWrapper
from kstlib.alerts.exceptions import AlertConfigurationError, AlertThrottledError
from kstlib.alerts.models import AlertLevel, AlertResult

if TYPE_CHECKING:
    from collections.abc import Mapping

    from typing_extensions import Self

    from kstlib.alerts.models import AlertMessage
    from kstlib.alerts.throttle import AlertThrottle
    from kstlib.mail.transport import AsyncMailTransport, MailTransport
    from kstlib.rapi.credentials import CredentialResolver

__all__ = ["AlertManager"]

log = logging.getLogger(__name__)


@dataclass
class _ChannelEntry:
    """Internal entry for a registered channel."""

    channel: AsyncAlertChannel
    min_level: AlertLevel = AlertLevel.INFO
    throttle: AlertThrottle | None = None
    key: str | None = None  # Config key (e.g., "hb")
    alias: str | None = None  # Optional alias (e.g., "heartbeat")


@dataclass
class AlertManagerStats:
    """Statistics for alert manager monitoring.

    Attributes:
        total_sent: Total alerts successfully sent.
        total_failed: Total alerts that failed delivery.
        total_throttled: Total alerts dropped due to throttling.
        by_channel: Per-channel statistics.

    """

    total_sent: int = 0
    total_failed: int = 0
    total_throttled: int = 0
    by_channel: dict[str, dict[str, int]] = field(default_factory=dict)

    def record_sent(self, channel: str) -> None:
        """Record a successful send."""
        self.total_sent += 1
        self._ensure_channel(channel)
        self.by_channel[channel]["sent"] += 1

    def record_failed(self, channel: str) -> None:
        """Record a failed send."""
        self.total_failed += 1
        self._ensure_channel(channel)
        self.by_channel[channel]["failed"] += 1

    def record_throttled(self, channel: str) -> None:
        """Record a throttled alert."""
        self.total_throttled += 1
        self._ensure_channel(channel)
        self.by_channel[channel]["throttled"] += 1

    def _ensure_channel(self, channel: str) -> None:
        """Ensure channel stats dict exists."""
        if channel not in self.by_channel:
            self.by_channel[channel] = {"sent": 0, "failed": 0, "throttled": 0}


[docs] class AlertManager: """Orchestrates alert delivery to multiple channels. Manages a collection of alert channels with per-channel level filtering and optional throttling. Alerts are sent concurrently to all matching channels. Examples: Basic usage:: manager = AlertManager() manager.add_channel(slack_channel) manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL) alert = AlertMessage( title="Service Down", body="API server not responding", level=AlertLevel.CRITICAL, ) results = await manager.send(alert) for result in results: if result.success: print(f"{result.channel}: OK") Fluent API:: manager = ( AlertManager() .add_channel(slack, min_level=AlertLevel.WARNING) .add_channel(email, min_level=AlertLevel.CRITICAL) ) From config:: manager = AlertManager.from_config( config=config["alerts"], credential_resolver=resolver, ) """
[docs] def __init__(self) -> None: """Initialize AlertManager with no channels.""" self._channels: list[_ChannelEntry] = [] self._stats = AlertManagerStats()
@property def stats(self) -> AlertManagerStats: """Return statistics for this manager.""" return self._stats @property def channel_count(self) -> int: """Return number of registered channels.""" return len(self._channels)
[docs] def add_channel( self, channel: AlertChannel | AsyncAlertChannel, *, min_level: AlertLevel = AlertLevel.INFO, throttle: AlertThrottle | None = None, key: str | None = None, alias: str | None = None, ) -> Self: """Add a channel to the manager. Args: channel: The channel to add (sync or async). min_level: Minimum alert level for this channel. throttle: Optional throttle for rate limiting. key: Config key for targeting (e.g., "hb"). alias: Human-readable alias for targeting (e.g., "heartbeat"). Returns: Self for fluent chaining. Examples: >>> manager = AlertManager() >>> manager.add_channel(slack_channel) # doctest: +SKIP AlertManager(channels=1) >>> manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL) # doctest: +SKIP AlertManager(channels=2) """ # Wrap sync channels for async usage if isinstance(channel, AlertChannel): async_channel: AsyncAlertChannel = AsyncChannelWrapper(channel) else: async_channel = channel entry = _ChannelEntry( channel=async_channel, min_level=min_level, throttle=throttle, key=key, alias=alias, ) self._channels.append(entry) log.debug( "Added channel: name=%s, min_level=%s, throttle=%s", async_channel.name, min_level.name, throttle is not None, ) return self
[docs] async def send( self, alert: AlertMessage | list[AlertMessage], *, channel: str | None = None, ) -> list[AlertResult]: """Send one or more alerts to matching channels. Delivers alerts concurrently to channels where the alert level meets the channel's minimum level. Optionally target a specific channel by key or alias. Args: alert: Single alert or list of alerts to send. channel: Optional channel key or alias to target. If None, broadcasts to all matching channels based on level. Returns: Flat list of AlertResult for all alerts and channels. Examples: Send single alert (broadcast):: >>> results = await manager.send(alert) # doctest: +SKIP Send single alert to specific channel:: >>> results = await manager.send(alert, channel="hb") # doctest: +SKIP Send multiple alerts to same channel:: >>> alerts = [alert1, alert2, alert3] # doctest: +SKIP >>> results = await manager.send(alerts, channel="watchdog") # doctest: +SKIP """ if not self._channels: log.warning("No channels configured, alert not sent") return [] # Normalize to list alerts = [alert] if not isinstance(alert, list) else alert if not alerts: return [] # Get target entries (if channel specified) target_entries: list[_ChannelEntry] | None = None if channel is not None: target_entries = self._find_channel(channel) if not target_entries: log.warning("Channel '%s' not found", channel) return [] # Send all alerts all_results: list[AlertResult] = [] for single_alert in alerts: results = await self._send_alert(single_alert, target_entries) all_results.extend(results) return all_results
async def _send_alert( self, alert: AlertMessage, target_entries: list[_ChannelEntry] | None, ) -> list[AlertResult]: """Send a single alert to matching channels. Args: alert: The alert to send. target_entries: Specific entries to target, or None for broadcast. Returns: List of results for this alert. """ # Determine matching entries if target_entries is not None: matching_entries = target_entries else: # Filter channels by level (broadcast mode) matching_entries = [entry for entry in self._channels if alert.level >= entry.min_level] if not matching_entries: log.debug( "No channels match alert level %s", alert.level.name, ) return [] log.debug( "Sending alert to %d channels: level=%s, title=%r", len(matching_entries), alert.level.name, alert.title[:50], ) # Send to all matching channels concurrently tasks = [self._send_to_entry(entry, alert) for entry in matching_entries] results = await asyncio.gather(*tasks, return_exceptions=False) return list(results) def _find_channel(self, identifier: str) -> list[_ChannelEntry]: """Find channel entry by key, alias, or channel name. Args: identifier: Channel key, alias, or name. Returns: List with matching entry, or empty list if not found. """ for entry in self._channels: # Match by key (e.g., "hb") if entry.key and entry.key == identifier: return [entry] # Match by alias (e.g., "heartbeat") if entry.alias and entry.alias == identifier: return [entry] # Match by channel name (fallback) if entry.channel.name == identifier: return [entry] return [] async def _send_to_entry( self, entry: _ChannelEntry, alert: AlertMessage, ) -> AlertResult: """Send alert to a single channel entry. Args: entry: The channel entry with config. alert: The alert to send. Returns: AlertResult with delivery status. """ channel_name = entry.channel.name # Check throttle if configured if entry.throttle is not None and not entry.throttle.try_acquire(): self._stats.record_throttled(channel_name) log.debug("Alert throttled for channel: %s", channel_name) return AlertResult( channel=channel_name, success=False, error="Rate limit exceeded", ) try: result = await entry.channel.send(alert) if result.success: self._stats.record_sent(channel_name) else: self._stats.record_failed(channel_name) return result except AlertThrottledError as e: self._stats.record_throttled(channel_name) return AlertResult( channel=channel_name, success=False, error=f"Throttled: retry after {e.retry_after}s", ) except Exception as e: self._stats.record_failed(channel_name) # Option C : the chained exception ``e`` may have been built by # a downstream channel (Slack/Email) from a response body or # transport error message that we cannot vet here. Keep the # WARNING free of that content; redacted detail at TRACE. from kstlib._shared.redaction import redact_sensitive from kstlib.logging import TRACE_LEVEL log.warning("Channel %s failed (see TRACE for details)", channel_name) if log.isEnabledFor(TRACE_LEVEL): log.log(TRACE_LEVEL, "Channel %s error detail: %s", channel_name, redact_sensitive(str(e))) return AlertResult( channel=channel_name, success=False, error=str(e), )
[docs] @classmethod def from_config( cls, config: Mapping[str, Any], credential_resolver: CredentialResolver | None = None, ) -> AlertManager: """Create AlertManager from configuration dict. Config format:: alerts: throttle: rate: 10 per: 60 channels: slack_ops: type: slack credentials: slack_webhook username: "kstlib-alerts" min_level: warning email_critical: type: email transport: type: smtp host: smtp.example.com sender: "alerts@example.com" recipients: ["oncall@example.com"] min_level: critical Args: config: Alerts configuration dict. credential_resolver: Resolver for credential references. Returns: Configured AlertManager instance. Raises: AlertConfigurationError: If configuration is invalid. """ from kstlib.alerts.channels import SlackChannel from kstlib.alerts.throttle import AlertThrottle manager = cls() # Parse global throttle config global_throttle = None throttle_config = config.get("throttle") if throttle_config: global_throttle = AlertThrottle( rate=float(throttle_config.get("rate", 10)), per=float(throttle_config.get("per", 60)), ) # Parse channels channels_config = config.get("channels", {}) if not channels_config: log.warning("No channels configured in alerts config") return manager for config_key, channel_config in channels_config.items(): channel_type = channel_config.get("type", "").lower() # Extract alias from config (optional "name" field) # If not specified, alias defaults to None (use key for targeting) channel_alias = channel_config.get("name") # Parse min_level min_level_str = channel_config.get("min_level", "info").lower() min_level = _parse_level(min_level_str) # Parse per-channel throttle (or use global) channel_throttle = global_throttle if "throttle" in channel_config: tc = channel_config["throttle"] channel_throttle = AlertThrottle( rate=float(tc.get("rate", 10)), per=float(tc.get("per", 60)), ) # Determine display name: use alias if provided, else config key display_name = channel_alias if channel_alias else config_key # Create channel based on type try: if channel_type == "slack": channel: AlertChannel | AsyncAlertChannel = SlackChannel.from_config( {**channel_config, "name": display_name}, credential_resolver, ) elif channel_type == "email": channel = _create_email_channel( channel_config, display_name, credential_resolver, ) else: raise AlertConfigurationError(f"Unknown channel type '{channel_type}' for channel '{config_key}'") manager.add_channel( channel, min_level=min_level, throttle=channel_throttle, key=config_key, alias=channel_alias, ) except Exception as e: if isinstance(e, AlertConfigurationError): raise raise AlertConfigurationError(f"Failed to configure channel '{config_key}': {e}") from e return manager
[docs] def __repr__(self) -> str: """Return string representation.""" return f"AlertManager(channels={len(self._channels)})"
def _parse_level(level_str: str) -> AlertLevel: """Parse alert level from string. Args: level_str: Level name (case-insensitive). Returns: AlertLevel enum value. Raises: AlertConfigurationError: If level is invalid. """ level_map = { "info": AlertLevel.INFO, "success": AlertLevel.SUCCESS, "warning": AlertLevel.WARNING, "critical": AlertLevel.CRITICAL, } level = level_map.get(level_str.lower()) if level is None: raise AlertConfigurationError(f"Invalid alert level '{level_str}'. Valid levels: {', '.join(level_map.keys())}") return level def _create_smtp_transport( transport_config: Mapping[str, Any], ) -> MailTransport: """Build an SMTP mail transport from raw config.""" from kstlib.mail.transports import SMTPCredentials, SMTPSecurity, SMTPTransport credentials = None username = transport_config.get("username") if username: credentials = SMTPCredentials( username=username, password=transport_config.get("password"), ) security = SMTPSecurity( use_starttls=transport_config.get("use_tls", True), ) return SMTPTransport( host=transport_config.get("host", "localhost"), port=int(transport_config.get("port", 587)), credentials=credentials, security=security, ) def _create_resend_transport( transport_config: Mapping[str, Any], name: str, credential_resolver: CredentialResolver | None, ) -> AsyncMailTransport: """Build a Resend mail transport from raw config.""" from kstlib.mail.transports import ResendTransport api_key = transport_config.get("api_key") if not api_key and credential_resolver: cred_name = transport_config.get("credentials") if cred_name: record = credential_resolver.resolve(cred_name) api_key = record.value if not api_key: raise AlertConfigurationError(f"Resend transport for '{name}' requires 'api_key' or 'credentials'") return ResendTransport(api_key=api_key) def _create_ses_transport( transport_config: Mapping[str, Any], credential_resolver: CredentialResolver | None, ) -> AsyncMailTransport: """Build an AWS SES mail transport from raw config.""" from kstlib.mail.transports import SesTransport aws_access_key_id = transport_config.get("aws_access_key_id") aws_secret_access_key = transport_config.get("aws_secret_access_key") if credential_resolver: cred_name = transport_config.get("credentials") if cred_name: record = credential_resolver.resolve(cred_name) aws_access_key_id = aws_access_key_id or record.value return SesTransport( region=transport_config.get("region", "eu-west-3"), aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) def _create_email_transport( transport_config: Mapping[str, Any], name: str, credential_resolver: CredentialResolver | None, ) -> MailTransport | AsyncMailTransport: """Create a mail transport from configuration. Delegates to a per-type factory to keep the dispatcher small. Args: transport_config: Transport configuration dict. name: Channel name for error messages. credential_resolver: Credential resolver. Returns: Configured mail transport. Raises: AlertConfigurationError: If configuration is invalid. """ transport_type = transport_config.get("type", "smtp").lower() if transport_type == "smtp": return _create_smtp_transport(transport_config) if transport_type == "gmail": raise AlertConfigurationError( f"Gmail transport for '{name}' requires programmatic configuration. " "Use GmailTransport(token=...) directly instead of config." ) if transport_type == "resend": return _create_resend_transport(transport_config, name, credential_resolver) if transport_type == "ses": return _create_ses_transport(transport_config, credential_resolver) raise AlertConfigurationError(f"Unknown transport type '{transport_type}' for email channel '{name}'") def _create_email_channel( config: Mapping[str, Any], name: str, credential_resolver: CredentialResolver | None, ) -> AlertChannel | AsyncAlertChannel: """Create an EmailChannel from config. Args: config: Channel configuration. name: Channel name. credential_resolver: Credential resolver. Returns: Configured EmailChannel. Raises: AlertConfigurationError: If configuration is invalid. """ from kstlib.alerts.channels import EmailChannel transport_config = config.get("transport") if not transport_config: raise AlertConfigurationError(f"Email channel '{name}' requires 'transport' configuration") try: transport = _create_email_transport(transport_config, name, credential_resolver) except ImportError as e: transport_type = transport_config.get("type", "smtp") raise AlertConfigurationError(f"Missing dependency for transport '{transport_type}': {e}") from e sender = config.get("sender") if not sender: raise AlertConfigurationError(f"Email channel '{name}' requires 'sender'") recipients = config.get("recipients", []) if not recipients: raise AlertConfigurationError(f"Email channel '{name}' requires 'recipients'") return EmailChannel( transport=transport, sender=sender, recipients=recipients, subject_prefix=config.get("subject_prefix", "[ALERT]"), channel_name=name, )