Alerts Subsystem¶
Multi-channel alerting with throttling, level filtering, and config-driven defaults.
Tip
Pair this reference with Alerts for the feature guide.
Quick Overview¶
AlertManagerorchestrates delivery to multiple channels with per-channel level filteringAlertThrottleprovides rate limiting via token bucket algorithmAlertMessageandAlertLeveldefine alert content and severitySlackChannelandEmailChannelare the built-in delivery backendsAll settings support config-driven defaults with hard limit enforcement
Core Components¶
AlertManager¶
- class kstlib.alerts.AlertManager[source]¶
Bases:
objectOrchestrates alert delivery to multiple channels.
Manages a collection of alert channels with per-channel level filtering and optional throttling. Alerts are sent concurrently to all matching channels.
Examples
Basic usage:
manager = AlertManager() manager.add_channel(slack_channel) manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL) alert = AlertMessage( title="Service Down", body="API server not responding", level=AlertLevel.CRITICAL, ) results = await manager.send(alert) for result in results: if result.success: print(f"{result.channel}: OK")
Fluent API:
manager = ( AlertManager() .add_channel(slack, min_level=AlertLevel.WARNING) .add_channel(email, min_level=AlertLevel.CRITICAL) )
From config:
manager = AlertManager.from_config( config=config["alerts"], credential_resolver=resolver, )
- property stats: AlertManagerStats¶
Return statistics for this manager.
- add_channel(self, channel: 'AlertChannel | AsyncAlertChannel', *, min_level: 'AlertLevel' = <AlertLevel.INFO: 10>, throttle: 'AlertThrottle | None' = None, key: 'str | None' = None, alias: 'str | None' = None) 'Self' -> Self[source]¶
Add a channel to the manager.
- Parameters:
channel (AlertChannel | AsyncAlertChannel) – The channel to add (sync or async).
min_level (AlertLevel) – Minimum alert level for this channel.
throttle (AlertThrottle | None) – Optional throttle for rate limiting.
key (str | None) – Config key for targeting (e.g., “hb”).
alias (str | None) – Human-readable alias for targeting (e.g., “heartbeat”).
- Returns:
Self for fluent chaining.
- Return type:
Self
Examples
>>> manager = AlertManager() >>> manager.add_channel(slack_channel) AlertManager(channels=1) >>> manager.add_channel(email_channel, min_level=AlertLevel.CRITICAL) AlertManager(channels=2)
- async send(self, alert: 'AlertMessage | list[AlertMessage]', *, channel: 'str | None' = None) 'list[AlertResult]' -> list[AlertResult][source]¶
Send one or more alerts to matching channels.
Delivers alerts concurrently to channels where the alert level meets the channel’s minimum level. Optionally target a specific channel by key or alias.
- Parameters:
- Returns:
Flat list of AlertResult for all alerts and channels.
- Return type:
list[AlertResult]
Examples
Send single alert (broadcast):
>>> results = await manager.send(alert)
Send single alert to specific channel:
>>> results = await manager.send(alert, channel="hb")
Send multiple alerts to same channel:
>>> alerts = [alert1, alert2, alert3] >>> results = await manager.send(alerts, channel="watchdog")
- classmethod from_config(config: 'Mapping[str, Any]', credential_resolver: 'CredentialResolver | None' = None) 'AlertManager' -> AlertManager[source]¶
Create AlertManager from configuration dict.
Config format:
alerts: throttle: rate: 10 per: 60 channels: slack_ops: type: slack credentials: slack_webhook username: "kstlib-alerts" min_level: warning email_critical: type: email transport: type: smtp host: smtp.example.com sender: "alerts@example.com" recipients: ["oncall@example.com"] min_level: critical
- Parameters:
config (Mapping[str, Any]) – Alerts configuration dict.
credential_resolver (CredentialResolver | None) – Resolver for credential references.
- Returns:
Configured AlertManager instance.
- Raises:
AlertConfigurationError – If configuration is invalid.
- Return type:
AlertThrottle¶
- class kstlib.alerts.throttle.AlertThrottle(rate=None, per=None, *, burst=None, name=None)[source]¶
Bases:
objectRate limiter for alert delivery.
Wraps
kstlib.resilience.rate_limiter.RateLimiterwith alert-specific behavior and config-driven defaults.All parameters are optional. If not provided, values are read from
kstlib.conf.ymlunderalerts.throttle. Hard limits are enforced for deep defense against misconfiguration.- Parameters:
rate (float | None) – Maximum alerts per period. If None, uses config value. Hard limits: [1, 1000].
per (float | None) – Period duration in seconds. If None, uses config value. Hard limits: [1.0, 86400.0] (1 day).
burst (float | None) – Initial capacity. If None, defaults to rate value. Hard limits: [1, rate].
name (str | None) – Optional name for identification.
Examples
Config-driven (recommended):
throttle = AlertThrottle() # Uses kstlib.conf.yml
Explicit values:
throttle = AlertThrottle(rate=10, per=60.0)
Per-hour limiting with burst:
throttle = AlertThrottle(rate=100, per=3600.0, burst=20)
Non-blocking check:
if throttle.try_acquire(): send_alert() else: log.warning("Alert throttled")
- __init__(self, rate: 'float | None' = None, per: 'float | None' = None, *, burst: 'float | None' = None, name: 'str | None' = None) 'None' -> None[source]¶
Initialize AlertThrottle.
- try_acquire(self) 'bool' -> bool[source]¶
Try to acquire permission to send an alert.
- Returns:
True if alert can be sent, False if throttled.
- Return type:
Examples
>>> throttle = AlertThrottle(rate=2, per=1.0) >>> throttle.try_acquire() True >>> throttle.try_acquire() True >>> throttle.try_acquire() # Throttled False
- acquire(self, *, timeout: 'float | None' = None) 'None' -> None[source]¶
Acquire permission to send an alert, blocking if needed.
- Parameters:
timeout (float | None) – Maximum time to wait in seconds.
- Raises:
AlertThrottledError – If timeout exceeded.
Examples
>>> throttle = AlertThrottle(rate=10, per=60.0) >>> throttle.acquire() # Blocks if needed
- async acquire_async(self, *, timeout: 'float | None' = None) 'None' -> None[source]¶
Acquire permission asynchronously, waiting if needed.
- Parameters:
timeout (float | None) – Maximum time to wait in seconds.
- Raises:
AlertThrottledError – If timeout exceeded.
Examples
>>> import asyncio >>> throttle = AlertThrottle(rate=10, per=60.0) >>> asyncio.run(throttle.acquire_async())
- reset(self) 'None' -> None[source]¶
Reset throttle to full capacity.
Examples
>>> throttle = AlertThrottle(rate=1, per=60.0) >>> throttle.try_acquire() True >>> throttle.try_acquire() False >>> throttle.reset() >>> throttle.try_acquire() True
- __exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]¶
Exit context manager.
Models¶
- class kstlib.alerts.models.AlertMessage(title, body, level=AlertLevel.INFO, timestamp=False)[source]
Bases:
objectAn alert message to be sent via one or more channels.
- title
Short summary of the alert (max 150 chars for Slack).
- Type:
- body
Detailed message content (max 3000 chars for Slack).
- Type:
- level
Severity level of the alert.
- Type:
kstlib.alerts.models.AlertLevel
- timestamp
If True, prefix title with send datetime.
- Type:
Examples
>>> msg = AlertMessage(title="Disk Full", body="Server disk at 95%") >>> msg.level <AlertLevel.INFO: 10> >>> msg = AlertMessage( ... title="DB Connection Failed", ... body="Cannot connect to primary database", ... level=AlertLevel.CRITICAL, ... ) >>> msg.level.name 'CRITICAL' >>> msg = AlertMessage( ... title="Alert", ... body="With timestamp", ... timestamp=True, ... ) >>> ":::" in msg.formatted_title True
- title: str
- body: str
- level: AlertLevel
- timestamp: bool
- property formatted_title: str
Return title with optional timestamp prefix.
If timestamp is True, prefixes the title with current datetime in format “YYYY-MM-DD HH:MM:SS ::: “.
- Returns:
Title string, optionally prefixed with timestamp.
Examples
>>> msg = AlertMessage(title="Test", body="Body", timestamp=False) >>> msg.formatted_title 'Test' >>> msg = AlertMessage(title="Test", body="Body", timestamp=True) >>> "Test" in msg.formatted_title True
- __init__(self, title: 'str', body: 'str', level: 'AlertLevel' = <AlertLevel.INFO: 10>, timestamp: 'bool' = False) None -> None
- class kstlib.alerts.models.AlertLevel(value)[source]
Bases:
IntEnumSeverity level for alerts.
Values are ordered by severity (higher value = more severe). Use these to filter which alerts go to which channels.
- INFO
Informational messages (10).
- SUCCESS
Positive confirmation messages (11).
- WARNING
Warning conditions that need attention (20).
- CRITICAL
Critical issues requiring immediate action (30).
Examples
>>> AlertLevel.CRITICAL > AlertLevel.WARNING True >>> AlertLevel.INFO.name 'INFO' >>> int(AlertLevel.WARNING) 20 >>> AlertLevel.SUCCESS > AlertLevel.INFO True >>> AlertLevel.SUCCESS < AlertLevel.WARNING True
- INFO = 10
- SUCCESS = 11
- WARNING = 20
- CRITICAL = 30
- class kstlib.alerts.models.AlertResult(channel, success, message_id=None, error=None)[source]
Bases:
objectResult of sending an alert to a channel.
- channel
Name of the channel that processed this alert.
- Type:
- success
Whether the alert was delivered successfully.
- Type:
- message_id
ID assigned by the channel (if available).
- Type:
str | None
- error
Error message if delivery failed.
- Type:
str | None
Examples
>>> result = AlertResult(channel="slack", success=True, message_id="msg123") >>> result.success True >>> result = AlertResult(channel="email", success=False, error="SMTP timeout") >>> result.error 'SMTP timeout'
- channel: str
- success: bool
- __init__(self, channel: 'str', success: 'bool', message_id: 'str | None' = None, error: 'str | None' = None) None -> None
Channels¶
SlackChannel¶
- class kstlib.alerts.channels.SlackChannel(webhook_url, *, username='kstlib-alerts', icon_emoji=':bell:', timeout=None, channel_name=None, ssl_verify=None, ssl_ca_bundle=None)[source]¶
Bases:
AsyncAlertChannelAsync channel for sending alerts to Slack via webhooks.
Uses Slack’s incoming webhook API to post formatted messages. Supports customizable username, emoji, and timeout settings.
- Parameters:
webhook_url (str) – Slack incoming webhook URL.
username (str) – Bot username shown in Slack (default: ‘kstlib-alerts’).
icon_emoji (str) – Emoji for bot avatar (default: ‘:bell:’).
timeout (float | None) – HTTP request timeout in seconds (default: 10.0).
channel_name (str | None) – Optional name override for this channel.
- Raises:
AlertConfigurationError – If webhook_url is invalid.
Examples
Basic usage:
channel = SlackChannel( webhook_url="https://hooks.slack.com/services/T.../B.../xxx" ) result = await channel.send(alert)
Custom settings:
channel = SlackChannel( webhook_url="https://hooks.slack.com/services/T.../B.../xxx", username="prod-alerts", icon_emoji=":fire:", timeout=5.0, )
- __init__(self, webhook_url: 'str', *, username: 'str' = 'kstlib-alerts', icon_emoji: 'str' = ':bell:', timeout: 'float | None' = None, channel_name: 'str | None' = None, ssl_verify: 'bool | None' = None, ssl_ca_bundle: 'str | None' = None) 'None' -> None[source]¶
Initialize SlackChannel.
- Parameters:
webhook_url (str) – Slack incoming webhook URL.
username (str) – Bot username shown in Slack.
icon_emoji (str) – Emoji for bot avatar.
timeout (float | None) – HTTP request timeout in seconds. If None, uses config. Hard limits: [1.0, 120.0].
channel_name (str | None) – Optional name override for this channel.
ssl_verify (bool | None) – Override SSL verification (True/False). If None, uses global config from kstlib.conf.yml.
ssl_ca_bundle (str | None) – Override CA bundle path. If None, uses global config from kstlib.conf.yml.
- Raises:
AlertConfigurationError – If webhook_url is invalid.
- async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]¶
Send an alert to Slack via webhook.
Formats the alert as a Slack message with appropriate emoji and color based on the alert level.
- Parameters:
alert (AlertMessage) – The alert message to send.
- Returns:
AlertResult with delivery status.
- Raises:
AlertDeliveryError – If the webhook request fails.
- Return type:
AlertResult
- classmethod from_config(config: 'Mapping[str, Any]', credential_resolver: 'CredentialResolver | None' = None) 'SlackChannel' -> SlackChannel[source]¶
Create SlackChannel from configuration dict.
Config format:
slack_ops: type: slack credentials: slack_webhook # reference to credentials section username: "kstlib-alerts" icon_emoji: ":bell:" timeout: 10.0
The webhook URL is resolved via credential_resolver using the ‘credentials’ key, supporting env, file, and SOPS sources.
- Parameters:
config (Mapping[str, Any]) – Channel configuration dict.
credential_resolver (CredentialResolver | None) – Resolver for credential references.
- Returns:
Configured SlackChannel instance.
- Raises:
AlertConfigurationError – If configuration is invalid.
- Return type:
EmailChannel¶
- class kstlib.alerts.channels.EmailChannel(transport, *, sender, recipients, subject_prefix='[ALERT]', channel_name=None)[source]¶
Bases:
AsyncAlertChannelAsync channel for sending alerts via email.
Wraps kstlib.mail transports to send alert messages as emails. Sync transports are automatically wrapped for async usage.
- Parameters:
transport (MailTransport | AsyncMailTransport) – Mail transport (sync or async).
sender (str) – Email sender address.
recipients (Sequence[str]) – List of recipient email addresses.
subject_prefix (str) – Prefix for email subjects (default: ‘[ALERT]’).
channel_name (str | None) – Optional name override for this channel.
- Raises:
AlertConfigurationError – If configuration is invalid.
Examples
With SMTP:
transport = SMTPTransport(host="smtp.example.com", port=587) channel = EmailChannel( transport=transport, sender="alerts@example.com", recipients=["team@example.com"], )
With Gmail:
from kstlib.mail.transports import GmailOAuth2Transport transport = GmailOAuth2Transport.from_config(config) channel = EmailChannel( transport=transport, sender="alerts@company.com", recipients=["oncall@company.com"], subject_prefix="[PROD]", )
- __init__(self, transport: 'MailTransport | AsyncMailTransport', *, sender: 'str', recipients: 'Sequence[str]', subject_prefix: 'str' = '[ALERT]', channel_name: 'str | None' = None) 'None' -> None[source]¶
Initialize EmailChannel.
- Parameters:
transport (MailTransport | AsyncMailTransport) – Mail transport (sync or async).
sender (str) – Email sender address.
recipients (Sequence[str]) – List of recipient email addresses.
subject_prefix (str) – Prefix for email subjects.
channel_name (str | None) – Optional name override for this channel.
- Raises:
AlertConfigurationError – If configuration is invalid.
- async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]¶
Send an alert via email.
Constructs an email message with appropriate subject and body formatting based on the alert level.
- Parameters:
alert (AlertMessage) – The alert message to send.
- Returns:
AlertResult with delivery status.
- Raises:
AlertDeliveryError – If email delivery fails.
- Return type:
AlertResult
Base Classes¶
- class kstlib.alerts.channels.base.AlertChannel[source]¶
Bases:
ABCAbstract sync channel for delivering alerts.
Subclass this for synchronous alert delivery implementations.
Examples
Implementing a custom sync channel:
class LogChannel(AlertChannel): @property def name(self) -> str: return "log" def send(self, alert: AlertMessage) -> AlertResult: print(f"[{alert.level.name}] {alert.title}") return AlertResult(channel=self.name, success=True)
- abstract property name: str¶
Return the unique name of this channel.
Used for logging, metrics, and result identification.
- abstract send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]¶
Deliver the alert via this channel.
- Parameters:
alert (AlertMessage) – The alert message to deliver.
- Returns:
AlertResult with delivery status.
- Raises:
AlertDeliveryError – If delivery fails.
- Return type:
AlertResult
- class kstlib.alerts.channels.base.AsyncAlertChannel[source]¶
Bases:
ABCAbstract async channel for delivering alerts.
Subclass this for asynchronous alert delivery implementations that use async HTTP clients or other async I/O.
Examples
Implementing a custom async channel:
class SlackChannel(AsyncAlertChannel): @property def name(self) -> str: return "slack" async def send(self, alert: AlertMessage) -> AlertResult: async with httpx.AsyncClient() as client: await client.post(...) return AlertResult(channel=self.name, success=True)
- abstract property name: str¶
Return the unique name of this channel.
Used for logging, metrics, and result identification.
- abstract async send(self, alert: 'AlertMessage') 'AlertResult' -> AlertResult[source]¶
Deliver the alert asynchronously via this channel.
- Parameters:
alert (AlertMessage) – The alert message to deliver.
- Returns:
AlertResult with delivery status.
- Raises:
AlertDeliveryError – If delivery fails.
- Return type:
AlertResult
Configuration Limits¶
- class kstlib.limits.AlertsLimits(throttle_rate, throttle_per, throttle_burst, channel_timeout, channel_retries)[source]
Bases:
objectResolved alerts configuration limits.
- throttle_rate
Maximum alerts per period.
- Type:
- throttle_per
Period duration in seconds.
- Type:
- throttle_burst
Initial burst capacity.
- Type:
- channel_timeout
Timeout for sending alerts (seconds).
- Type:
- channel_retries
Retry attempts on delivery failure.
- Type:
- throttle_rate: int
- throttle_per: float
- throttle_burst: int
- channel_timeout: float
- channel_retries: int
- __init__(self, throttle_rate: 'int', throttle_per: 'float', throttle_burst: 'int', channel_timeout: 'float', channel_retries: 'int') None -> None
- kstlib.limits.get_alerts_limits(config: 'Mapping[str, Any] | None' = None) 'AlertsLimits' -> AlertsLimits[source]
Resolve alerts limits from config with hard limit enforcement.
- Parameters:
config (Mapping[str, Any] | None) – Optional config mapping. If None, loads from get_config().
- Returns:
AlertsLimits with resolved values clamped to hard bounds.
- Return type:
AlertsLimits
Examples
>>> limits = get_alerts_limits() >>> limits.throttle_rate 10 >>> limits.throttle_per 60.0