Alerts Subsystem

Multi-channel alerting with throttling, level filtering, and config-driven defaults.

Tip

Pair this reference with Alerts for the feature guide.

Quick Overview

  • AlertManager orchestrates delivery to multiple channels with per-channel level filtering

  • AlertThrottle provides rate limiting via token bucket algorithm

  • AlertMessage and AlertLevel define alert content and severity

  • SlackChannel and EmailChannel are the built-in delivery backends

  • All settings support config-driven defaults with hard limit enforcement

Core Components

AlertManager

class kstlib.alerts.AlertManager[source]

Bases: object

Orchestrates alert delivery to multiple channels.

Manages a collection of alert channels with per-channel level filtering and optional throttling. Alerts are sent concurrently to all matching channels.

Examples

Basic usage:

manager = AlertManager()
manager.add_channel(slack_channel)
manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL)

alert = AlertMessage(
    title="Service Down",
    body="API server not responding",
    level=AlertLevel.CRITICAL,
)

results = await manager.send(alert)
for result in results:
    if result.success:
        print(f"{result.channel}: OK")

Fluent API:

manager = (
    AlertManager()
    .add_channel(slack, min_level=AlertLevel.WARNING)
    .add_channel(email, min_level=AlertLevel.CRITICAL)
)

From config:

manager = AlertManager.from_config(
    config=config["alerts"],
    credential_resolver=resolver,
)
__init__(self) 'None' -> None[source]

Initialize AlertManager with no channels.

property stats: AlertManagerStats

Return statistics for this manager.

property channel_count: int

Return number of registered channels.

add_channel(self, channel: 'AlertChannel | AsyncAlertChannel', *, min_level: 'AlertLevel' = <AlertLevel.INFO: 10>, throttle: 'AlertThrottle | None' = None, key: 'str | None' = None, alias: 'str | None' = None) 'Self' -> Self[source]

Add a channel to the manager.

Parameters:
  • channel (AlertChannel | AsyncAlertChannel) – The channel to add (sync or async).

  • min_level (AlertLevel) – Minimum alert level for this channel.

  • throttle (AlertThrottle | None) – Optional throttle for rate limiting.

  • key (str | None) – Config key for targeting (e.g., “hb”).

  • alias (str | None) – Human-readable alias for targeting (e.g., “heartbeat”).

Returns:

Self for fluent chaining.

Return type:

Self

Examples

>>> manager = AlertManager()
>>> manager.add_channel(slack_channel)  
AlertManager(channels=1)
>>> manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL)  
AlertManager(channels=2)
async send(self, alert: 'AlertMessage | list[AlertMessage]', *, channel: 'str | None' = None) 'list[AlertResult]' -> list[AlertResult][source]

Send one or more alerts to matching channels.

Delivers alerts concurrently to channels where the alert level meets the channel’s minimum level. Optionally target a specific channel by key or alias.

Parameters:
  • alert (AlertMessage | list[AlertMessage]) – Single alert or list of alerts to send.

  • channel (str | None) – Optional channel key or alias to target. If None, broadcasts to all matching channels based on level.

Returns:

Flat list of AlertResult for all alerts and channels.

Return type:

list[AlertResult]

Examples

Send single alert (broadcast):

>>> results = await manager.send(alert)  

Send single alert to specific channel:

>>> results = await manager.send(alert, channel="hb")  

Send multiple alerts to same channel:

>>> alerts = [alert1, alert2, alert3]  
>>> results = await manager.send(alerts, channel="watchdog")  
classmethod from_config(config: 'Mapping[str, Any]', credential_resolver: 'CredentialResolver | None' = None) 'AlertManager' -> AlertManager[source]

Create AlertManager from configuration dict.

Config format:

alerts:
  throttle:
    rate: 10
    per: 60

  channels:
    slack_ops:
      type: slack
      credentials: slack_webhook
      username: "kstlib-alerts"
      min_level: warning

    email_critical:
      type: email
      transport:
        type: smtp
        host: smtp.example.com
      sender: "alerts@example.com"
      recipients: ["oncall@example.com"]
      min_level: critical
Parameters:
  • config (Mapping[str, Any]) – Alerts configuration dict.

  • credential_resolver (CredentialResolver | None) – Resolver for credential references.

Returns:

Configured AlertManager instance.

Raises:

AlertConfigurationError – If configuration is invalid.

Return type:

AlertManager

__repr__(self) 'str' -> str[source]

Return string representation.

AlertThrottle

class kstlib.alerts.throttle.AlertThrottle(rate=None, per=None, *, burst=None, name=None)[source]

Bases: object

Rate limiter for alert delivery.

Wraps kstlib.resilience.rate_limiter.RateLimiter with alert-specific behavior and config-driven defaults.

All parameters are optional. If not provided, values are read from kstlib.conf.yml under alerts.throttle. Hard limits are enforced for deep defense against misconfiguration.

Parameters:
  • rate (float | None) – Maximum alerts per period. If None, uses config value. Hard limits: [1, 1000].

  • per (float | None) – Period duration in seconds. If None, uses config value. Hard limits: [1.0, 86400.0] (1 day).

  • burst (float | None) – Initial capacity. If None, defaults to rate value. Hard limits: [1, rate].

  • name (str | None) – Optional name for identification.

Examples

Config-driven (recommended):

throttle = AlertThrottle()  # Uses kstlib.conf.yml

Explicit values:

throttle = AlertThrottle(rate=10, per=60.0)

Per-hour limiting with burst:

throttle = AlertThrottle(rate=100, per=3600.0, burst=20)

Non-blocking check:

if throttle.try_acquire():
    send_alert()
else:
    log.warning("Alert throttled")
__init__(self, rate: 'float | None' = None, per: 'float | None' = None, *, burst: 'float | None' = None, name: 'str | None' = None) 'None' -> None[source]

Initialize AlertThrottle.

Parameters:
  • rate (float | None) – Maximum alerts per period. If None, uses config.

  • per (float | None) – Period duration in seconds. If None, uses config.

  • burst (float | None) – Initial token count. If None, defaults to rate.

  • name (str | None) – Optional name for identification.

property rate: float

Maximum alerts per period.

property per: float

Period duration in seconds.

property available: float

Current available tokens (alerts allowed).

property time_until_available: float

Seconds until next alert can be sent.

try_acquire(self) 'bool' -> bool[source]

Try to acquire permission to send an alert.

Returns:

True if alert can be sent, False if throttled.

Return type:

bool

Examples

>>> throttle = AlertThrottle(rate=2, per=1.0)
>>> throttle.try_acquire()
True
>>> throttle.try_acquire()
True
>>> throttle.try_acquire()  # Throttled
False
acquire(self, *, timeout: 'float | None' = None) 'None' -> None[source]

Acquire permission to send an alert, blocking if needed.

Parameters:

timeout (float | None) – Maximum time to wait in seconds.

Raises:

AlertThrottledError – If timeout exceeded.

Examples

>>> throttle = AlertThrottle(rate=10, per=60.0)
>>> throttle.acquire()  # Blocks if needed
async acquire_async(self, *, timeout: 'float | None' = None) 'None' -> None[source]

Acquire permission asynchronously, waiting if needed.

Parameters:

timeout (float | None) – Maximum time to wait in seconds.

Raises:

AlertThrottledError – If timeout exceeded.

Examples

>>> import asyncio
>>> throttle = AlertThrottle(rate=10, per=60.0)
>>> asyncio.run(throttle.acquire_async())
reset(self) 'None' -> None[source]

Reset throttle to full capacity.

Examples

>>> throttle = AlertThrottle(rate=1, per=60.0)
>>> throttle.try_acquire()
True
>>> throttle.try_acquire()
False
>>> throttle.reset()
>>> throttle.try_acquire()
True
__enter__(self) 'Self' -> Self[source]

Enter context manager, acquiring permission.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit context manager.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager, acquiring permission.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit async context manager.

__repr__(self) 'str' -> str[source]

Return string representation.

Models

class kstlib.alerts.models.AlertMessage(title, body, level=AlertLevel.INFO, timestamp=False)[source]

Bases: object

An alert message to be sent via one or more channels.

title

Short summary of the alert (max 150 chars for Slack).

Type:

str

body

Detailed message content (max 3000 chars for Slack).

Type:

str

level

Severity level of the alert.

Type:

kstlib.alerts.models.AlertLevel

timestamp

If True, prefix title with send datetime.

Type:

bool

Examples

>>> msg = AlertMessage(title="Disk Full", body="Server disk at 95%")
>>> msg.level
<AlertLevel.INFO: 10>
>>> msg = AlertMessage(
...     title="DB Connection Failed",
...     body="Cannot connect to primary database",
...     level=AlertLevel.CRITICAL,
... )
>>> msg.level.name
'CRITICAL'
>>> msg = AlertMessage(
...     title="Alert",
...     body="With timestamp",
...     timestamp=True,
... )
>>> ":::" in msg.formatted_title
True
title: str
body: str
level: AlertLevel
timestamp: bool
property formatted_title: str

Return title with optional timestamp prefix.

If timestamp is True, prefixes the title with current datetime in format “YYYY-MM-DD HH:MM:SS ::: “.

Returns:

Title string, optionally prefixed with timestamp.

Examples

>>> msg = AlertMessage(title="Test", body="Body", timestamp=False)
>>> msg.formatted_title
'Test'
>>> msg = AlertMessage(title="Test", body="Body", timestamp=True)
>>> "Test" in msg.formatted_title
True
__init__(self, title: 'str', body: 'str', level: 'AlertLevel' = <AlertLevel.INFO: 10>, timestamp: 'bool' = False) None -> None
class kstlib.alerts.models.AlertLevel(value)[source]

Bases: IntEnum

Severity level for alerts.

Values are ordered by severity (higher value = more severe). Use these to filter which alerts go to which channels.

INFO

Informational messages (10).

SUCCESS

Positive confirmation messages (11).

WARNING

Warning conditions that need attention (20).

CRITICAL

Critical issues requiring immediate action (30).

Examples

>>> AlertLevel.CRITICAL > AlertLevel.WARNING
True
>>> AlertLevel.INFO.name
'INFO'
>>> int(AlertLevel.WARNING)
20
>>> AlertLevel.SUCCESS > AlertLevel.INFO
True
>>> AlertLevel.SUCCESS < AlertLevel.WARNING
True
INFO = 10
SUCCESS = 11
WARNING = 20
CRITICAL = 30
class kstlib.alerts.models.AlertResult(channel, success, message_id=None, error=None)[source]

Bases: object

Result of sending an alert to a channel.

channel

Name of the channel that processed this alert.

Type:

str

success

Whether the alert was delivered successfully.

Type:

bool

message_id

ID assigned by the channel (if available).

Type:

str | None

error

Error message if delivery failed.

Type:

str | None

Examples

>>> result = AlertResult(channel="slack", success=True, message_id="msg123")
>>> result.success
True
>>> result = AlertResult(channel="email", success=False, error="SMTP timeout")
>>> result.error
'SMTP timeout'
channel: str
success: bool
message_id: str | None
error: str | None
__init__(self, channel: 'str', success: 'bool', message_id: 'str | None' = None, error: 'str | None' = None) None -> None

Channels

SlackChannel

class kstlib.alerts.channels.SlackChannel(webhook_url, *, username='kstlib-alerts', icon_emoji=':bell:', timeout=None, channel_name=None, ssl_verify=None, ssl_ca_bundle=None)[source]

Bases: AsyncAlertChannel

Async channel for sending alerts to Slack via webhooks.

Uses Slack’s incoming webhook API to post formatted messages. Supports customizable username, emoji, and timeout settings.

Parameters:
  • webhook_url (str) – Slack incoming webhook URL.

  • username (str) – Bot username shown in Slack (default: ‘kstlib-alerts’).

  • icon_emoji (str) – Emoji for bot avatar (default: ‘:bell:’).

  • timeout (float | None) – HTTP request timeout in seconds (default: 10.0).

  • channel_name (str | None) – Optional name override for this channel.

Raises:

AlertConfigurationError – If webhook_url is invalid.

Examples

Basic usage:

channel = SlackChannel(
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx"
)
result = await channel.send(alert)

Custom settings:

channel = SlackChannel(
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
    username="prod-alerts",
    icon_emoji=":fire:",
    timeout=5.0,
)
__init__(self, webhook_url: 'str', *, username: 'str' = 'kstlib-alerts', icon_emoji: 'str' = ':bell:', timeout: 'float | None' = None, channel_name: 'str | None' = None, ssl_verify: 'bool | None' = None, ssl_ca_bundle: 'str | None' = None) 'None' -> None[source]

Initialize SlackChannel.

Parameters:
  • webhook_url (str) – Slack incoming webhook URL.

  • username (str) – Bot username shown in Slack.

  • icon_emoji (str) – Emoji for bot avatar.

  • timeout (float | None) – HTTP request timeout in seconds. If None, uses config. Hard limits: [1.0, 120.0].

  • channel_name (str | None) – Optional name override for this channel.

  • ssl_verify (bool | None) – Override SSL verification (True/False). If None, uses global config from kstlib.conf.yml.

  • ssl_ca_bundle (str | None) – Override CA bundle path. If None, uses global config from kstlib.conf.yml.

Raises:

AlertConfigurationError – If webhook_url is invalid.

property name: str

Return the channel name.

async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]

Send an alert to Slack via webhook.

Formats the alert as a Slack message with appropriate emoji and color based on the alert level.

Parameters:

alert (AlertMessage) – The alert message to send.

Returns:

AlertResult with delivery status.

Raises:

AlertDeliveryError – If the webhook request fails.

Return type:

AlertResult

classmethod from_config(config: 'Mapping[str, Any]', credential_resolver: 'CredentialResolver | None' = None) 'SlackChannel' -> SlackChannel[source]

Create SlackChannel from configuration dict.

Config format:

slack_ops:
  type: slack
  credentials: slack_webhook  # reference to credentials section
  username: "kstlib-alerts"
  icon_emoji: ":bell:"
  timeout: 10.0

The webhook URL is resolved via credential_resolver using the ‘credentials’ key, supporting env, file, and SOPS sources.

Parameters:
  • config (Mapping[str, Any]) – Channel configuration dict.

  • credential_resolver (CredentialResolver | None) – Resolver for credential references.

Returns:

Configured SlackChannel instance.

Raises:

AlertConfigurationError – If configuration is invalid.

Return type:

SlackChannel

__repr__(self) 'str' -> str[source]

Return string representation without secrets.

EmailChannel

class kstlib.alerts.channels.EmailChannel(transport, *, sender, recipients, subject_prefix='[ALERT]', channel_name=None)[source]

Bases: AsyncAlertChannel

Async channel for sending alerts via email.

Wraps kstlib.mail transports to send alert messages as emails. Sync transports are automatically wrapped for async usage.

Parameters:
  • transport (MailTransport | AsyncMailTransport) – Mail transport (sync or async).

  • sender (str) – Email sender address.

  • recipients (Sequence[str]) – List of recipient email addresses.

  • subject_prefix (str) – Prefix for email subjects (default: ‘[ALERT]’).

  • channel_name (str | None) – Optional name override for this channel.

Raises:

AlertConfigurationError – If configuration is invalid.

Examples

With SMTP:

transport = SMTPTransport(host="smtp.example.com", port=587)
channel = EmailChannel(
    transport=transport,
    sender="alerts@example.com",
    recipients=["team@example.com"],
)

With Gmail:

from kstlib.mail.transports import GmailOAuth2Transport

transport = GmailOAuth2Transport.from_config(config)
channel = EmailChannel(
    transport=transport,
    sender="alerts@company.com",
    recipients=["oncall@company.com"],
    subject_prefix="[PROD]",
)
__init__(self, transport: 'MailTransport | AsyncMailTransport', *, sender: 'str', recipients: 'Sequence[str]', subject_prefix: 'str' = '[ALERT]', channel_name: 'str | None' = None) 'None' -> None[source]

Initialize EmailChannel.

Parameters:
  • transport (MailTransport | AsyncMailTransport) – Mail transport (sync or async).

  • sender (str) – Email sender address.

  • recipients (Sequence[str]) – List of recipient email addresses.

  • subject_prefix (str) – Prefix for email subjects.

  • channel_name (str | None) – Optional name override for this channel.

Raises:

AlertConfigurationError – If configuration is invalid.

property name: str

Return the channel name.

async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]

Send an alert via email.

Constructs an email message with appropriate subject and body formatting based on the alert level.

Parameters:

alert (AlertMessage) – The alert message to send.

Returns:

AlertResult with delivery status.

Raises:

AlertDeliveryError – If email delivery fails.

Return type:

AlertResult

__repr__(self) 'str' -> str[source]

Return string representation.

Base Classes

class kstlib.alerts.channels.base.AlertChannel[source]

Bases: ABC

Abstract sync channel for delivering alerts.

Subclass this for synchronous alert delivery implementations.

Examples

Implementing a custom sync channel:

class LogChannel(AlertChannel):
    @property
    def name(self) -> str:
        return "log"

    def send(self, alert: AlertMessage) -> AlertResult:
        print(f"[{alert.level.name}] {alert.title}")
        return AlertResult(channel=self.name, success=True)
abstract property name: str

Return the unique name of this channel.

Used for logging, metrics, and result identification.

abstract send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]

Deliver the alert via this channel.

Parameters:

alert (AlertMessage) – The alert message to deliver.

Returns:

AlertResult with delivery status.

Raises:

AlertDeliveryError – If delivery fails.

Return type:

AlertResult

class kstlib.alerts.channels.base.AsyncAlertChannel[source]

Bases: ABC

Abstract async channel for delivering alerts.

Subclass this for asynchronous alert delivery implementations that use async HTTP clients or other async I/O.

Examples

Implementing a custom async channel:

class SlackChannel(AsyncAlertChannel):
    @property
    def name(self) -> str:
        return "slack"

    async def send(self, alert: AlertMessage) -> AlertResult:
        async with httpx.AsyncClient() as client:
            await client.post(...)
        return AlertResult(channel=self.name, success=True)
abstract property name: str

Return the unique name of this channel.

Used for logging, metrics, and result identification.

abstract async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]

Deliver the alert asynchronously via this channel.

Parameters:

alert (AlertMessage) – The alert message to deliver.

Returns:

AlertResult with delivery status.

Raises:

AlertDeliveryError – If delivery fails.

Return type:

AlertResult

Configuration Limits

class kstlib.limits.AlertsLimits(throttle_rate, throttle_per, throttle_burst, channel_timeout, channel_retries)[source]

Bases: object

Resolved alerts configuration limits.

throttle_rate

Maximum alerts per period.

Type:

int

throttle_per

Period duration in seconds.

Type:

float

throttle_burst

Initial burst capacity.

Type:

int

channel_timeout

Timeout for sending alerts (seconds).

Type:

float

channel_retries

Retry attempts on delivery failure.

Type:

int

throttle_rate: int
throttle_per: float
throttle_burst: int
channel_timeout: float
channel_retries: int
__init__(self, throttle_rate: 'int', throttle_per: 'float', throttle_burst: 'int', channel_timeout: 'float', channel_retries: 'int') None -> None
kstlib.limits.get_alerts_limits(config: 'Mapping[str, Any] | None' = None) 'AlertsLimits' -> AlertsLimits[source]

Resolve alerts limits from config with hard limit enforcement.

Parameters:

config (Mapping[str, Any] | None) – Optional config mapping. If None, loads from get_config().

Returns:

AlertsLimits with resolved values clamped to hard bounds.

Return type:

AlertsLimits

Examples

>>> limits = get_alerts_limits()
>>> limits.throttle_rate
10
>>> limits.throttle_per
60.0