"""Transport interfaces for mail delivery.
Provides both sync and async transport abstractions for sending emails.
Sync transports can be wrapped for async usage via :class:`AsyncTransportWrapper`.
Examples:
Using a sync transport directly::
from kstlib.mail.transports import SMTPTransport
transport = SMTPTransport(host="smtp.example.com", port=587)
transport.send(message)
Wrapping a sync transport for async usage::
from kstlib.mail.transport import AsyncTransportWrapper
from kstlib.mail.transports import SMTPTransport
smtp = SMTPTransport(host="smtp.example.com", port=587)
async_transport = AsyncTransportWrapper(smtp)
await async_transport.send(message)
"""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
from kstlib.mail.exceptions import MailTransportError
if TYPE_CHECKING:
import logging
from concurrent.futures import ThreadPoolExecutor
from email.message import EmailMessage
import httpx
[docs]
class MailTransport(ABC):
"""Abstract sync transport for delivering emails.
Subclass this for synchronous transport implementations like SMTP.
Examples:
Implementing a custom sync transport::
class MyTransport(MailTransport):
def send(self, message: EmailMessage) -> None:
# Send the message
pass
"""
[docs]
@abstractmethod
def send(self, message: EmailMessage) -> None:
"""Deliver the message to the underlying service.
Args:
message: The email message to send.
Raises:
MailTransportError: If delivery fails.
"""
[docs]
class AsyncMailTransport(ABC):
"""Abstract async transport for delivering emails.
Subclass this for asynchronous transport implementations like HTTP APIs.
Examples:
Implementing a custom async transport::
class MyAsyncTransport(AsyncMailTransport):
async def send(self, message: EmailMessage) -> None:
async with httpx.AsyncClient() as client:
await client.post(...)
"""
[docs]
@abstractmethod
async def send(self, message: EmailMessage) -> None:
"""Deliver the message asynchronously.
Args:
message: The email message to send.
Raises:
MailTransportError: If delivery fails.
"""
[docs]
class AsyncTransportWrapper(AsyncMailTransport):
"""Wrap a sync transport for async usage.
Executes the sync transport's send method in a thread pool executor
to avoid blocking the event loop.
Args:
transport: The sync transport to wrap.
executor: Optional thread pool executor. If None, uses the default.
Examples:
Wrapping an SMTP transport::
from kstlib.mail.transport import AsyncTransportWrapper
from kstlib.mail.transports import SMTPTransport
smtp = SMTPTransport(host="smtp.example.com", port=587)
async_smtp = AsyncTransportWrapper(smtp)
# Now usable in async context
await async_smtp.send(message)
With custom executor::
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=2)
async_smtp = AsyncTransportWrapper(smtp, executor=executor)
"""
[docs]
def __init__(
self,
transport: MailTransport,
*,
executor: ThreadPoolExecutor | None = None,
) -> None:
"""Initialize the async wrapper.
Args:
transport: The sync transport to wrap.
executor: Optional custom thread pool executor.
"""
self._transport = transport
self._executor = executor
@property
def transport(self) -> MailTransport:
"""Return the wrapped sync transport."""
return self._transport
[docs]
async def send(self, message: EmailMessage) -> None:
"""Send message asynchronously via the wrapped transport.
Runs the sync transport's send method in a thread pool to avoid
blocking the async event loop.
Args:
message: The email message to send.
Raises:
MailTransportError: If the underlying transport fails.
"""
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self._executor,
self._transport.send,
message,
)
def handle_http_error_response(
response: httpx.Response,
service_name: str,
logger: logging.Logger,
*,
extract_code: bool = False,
) -> None:
"""Parse HTTP error response and raise MailTransportError.
Shared utility for HTTP-based mail transports (Gmail API, Resend, etc.).
Extracts error details from JSON response body when available.
Args:
response: The HTTP error response from the API.
service_name: Name of the service for error messages (e.g., "Gmail", "Resend").
logger: Logger instance for warning messages.
extract_code: If True, extract error code from response body (Gmail style).
If False, use HTTP status code only (Resend style).
Raises:
MailTransportError: Always raises with extracted error details.
Examples:
>>> import httpx # doctest: +SKIP
>>> response = httpx.Response(400, json={"error": "Bad request"}) # doctest: +SKIP
>>> handle_http_error_response(response, "MyAPI", logger) # doctest: +SKIP
Traceback (most recent call last):
...
MailTransportError: MyAPI error: Bad request
"""
error_msg: str
error_code: int | str = response.status_code
try:
data: dict[str, Any] = response.json()
if extract_code:
# Gmail-style: nested error object with code and message
error = data.get("error", {})
if isinstance(error, dict):
error_msg = error.get("message", str(error))
error_code = error.get("code", response.status_code)
else:
error_msg = str(error)
else:
# Resend-style: flat structure with message or error key
error_msg = data.get("message", data.get("error", "Unknown error"))
except Exception:
error_msg = response.text or f"HTTP {response.status_code}"
# Option C : keep the WARNING free of the API response body. Vendors
# like Gmail, Resend, and Mailgun routinely echo the offending API key
# prefix, the recipient address, or other diagnostic that should not
# land in the default log stream. Emit a short WARNING with the
# service name + numeric code only, and stash the redacted body
# detail at TRACE for explicit opt-in.
from kstlib._shared.redaction import redact_sensitive
from kstlib.logging import TRACE_LEVEL
logger.warning("%s API error (code=%s, see TRACE for details)", service_name, error_code)
if logger.isEnabledFor(TRACE_LEVEL):
logger.log(TRACE_LEVEL, "%s API error body: %s", service_name, redact_sensitive(error_msg))
if extract_code:
raise MailTransportError(f"{service_name} API error ({error_code}): {error_msg}")
raise MailTransportError(f"{service_name} API error: {error_msg}")
__all__ = [
"AsyncMailTransport",
"AsyncTransportWrapper",
"MailTransport",
"handle_http_error_response",
]