Source code for kstlib.alerts.channels.base
"""Abstract base classes for alert channels.
Provides both sync and async channel interfaces following the same
pattern as :mod:`kstlib.mail.transport`.
Examples:
Implementing a sync channel::
class WebhookChannel(AlertChannel):
@property
def name(self) -> str:
return "webhook"
def send(self, alert: AlertMessage) -> AlertResult:
# Send via HTTP POST
return AlertResult(channel=self.name, success=True)
Implementing an async channel::
class AsyncWebhookChannel(AsyncAlertChannel):
@property
def name(self) -> str:
return "async_webhook"
async def send(self, alert: AlertMessage) -> AlertResult:
async with httpx.AsyncClient() as client:
# Send via HTTP POST
pass
return AlertResult(channel=self.name, success=True)
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from concurrent.futures import ThreadPoolExecutor
from kstlib.alerts.models import AlertMessage, AlertResult
[docs]
class AlertChannel(ABC):
"""Abstract sync channel for delivering alerts.
Subclass this for synchronous alert delivery implementations.
Examples:
Implementing a custom sync channel::
class LogChannel(AlertChannel):
@property
def name(self) -> str:
return "log"
def send(self, alert: AlertMessage) -> AlertResult:
print(f"[{alert.level.name}] {alert.title}")
return AlertResult(channel=self.name, success=True)
"""
@property
@abstractmethod
def name(self) -> str:
"""Return the unique name of this channel.
Used for logging, metrics, and result identification.
"""
[docs]
@abstractmethod
def send(self, alert: AlertMessage) -> AlertResult:
"""Deliver the alert via this channel.
Args:
alert: The alert message to deliver.
Returns:
AlertResult with delivery status.
Raises:
AlertDeliveryError: If delivery fails.
"""
[docs]
class AsyncAlertChannel(ABC):
"""Abstract async channel for delivering alerts.
Subclass this for asynchronous alert delivery implementations
that use async HTTP clients or other async I/O.
Examples:
Implementing a custom async channel::
class SlackChannel(AsyncAlertChannel):
@property
def name(self) -> str:
return "slack"
async def send(self, alert: AlertMessage) -> AlertResult:
async with httpx.AsyncClient() as client:
await client.post(...)
return AlertResult(channel=self.name, success=True)
"""
@property
@abstractmethod
def name(self) -> str:
"""Return the unique name of this channel.
Used for logging, metrics, and result identification.
"""
[docs]
@abstractmethod
async def send(self, alert: AlertMessage) -> AlertResult:
"""Deliver the alert asynchronously via this channel.
Args:
alert: The alert message to deliver.
Returns:
AlertResult with delivery status.
Raises:
AlertDeliveryError: If delivery fails.
"""
class AsyncChannelWrapper(AsyncAlertChannel):
"""Wrap a sync channel for async usage.
Executes the sync channel's send method in a thread pool executor
to avoid blocking the event loop. Follows the same pattern as
:class:`kstlib.mail.transport.AsyncTransportWrapper`.
Args:
channel: The sync channel to wrap.
executor: Optional thread pool executor. If None, uses the default.
Examples:
Wrapping a sync channel::
sync_channel = LogChannel()
async_channel = AsyncChannelWrapper(sync_channel)
# Now usable in async context
await async_channel.send(alert)
"""
def __init__(
self,
channel: AlertChannel,
*,
executor: ThreadPoolExecutor | None = None,
) -> None:
"""Initialize the async wrapper.
Args:
channel: The sync channel to wrap.
executor: Optional custom thread pool executor.
"""
self._channel = channel
self._executor = executor
@property
def name(self) -> str:
"""Return the name of the wrapped channel."""
return self._channel.name
@property
def channel(self) -> AlertChannel:
"""Return the wrapped sync channel."""
return self._channel
async def send(self, alert: AlertMessage) -> AlertResult:
"""Send alert asynchronously via the wrapped channel.
Runs the sync channel's send method in a thread pool to avoid
blocking the async event loop.
Args:
alert: The alert message to send.
Returns:
AlertResult from the wrapped channel.
Raises:
AlertDeliveryError: If the underlying channel fails.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self._executor,
self._channel.send,
alert,
)
__all__ = ["AlertChannel", "AsyncAlertChannel", "AsyncChannelWrapper"]