"""AWS SES transport for async email delivery.
Sends emails via the AWS Simple Email Service (SES) using raw MIME messages.
The boto3 client is synchronous, so calls are wrapped with ``run_in_executor``
to avoid blocking the async event loop.
Requirements:
pip install kstlib[ses]
Examples:
Basic usage with default credential chain (recommended on EC2)::
from kstlib.mail.transports import SesTransport
transport = SesTransport(region="eu-west-3")
# Use with MailBuilder
mail = MailBuilder(transport=transport)
await mail.sender("you@example.com").to("user@example.com").send_async()
With explicit credentials::
transport = SesTransport(
region="us-east-1",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
)
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from kstlib.logging import TRACE_LEVEL
from kstlib.mail.exceptions import MailConfigurationError, MailTransportError
from kstlib.mail.transport import AsyncMailTransport
if TYPE_CHECKING:
from email.message import EmailMessage
__all__ = ["SesTransport"]
log = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True, slots=True)
class SesResponse:
"""Response from AWS SES after sending an email.
Attributes:
message_id: The unique message ID assigned by SES.
"""
message_id: str
[docs]
class SesTransport(AsyncMailTransport):
"""Async transport for sending emails via AWS SES.
Uses ``send_raw_email`` to pass the full MIME message directly to SES,
which preserves all headers, attachments, and HTML content without
conversion.
Args:
region: AWS region for the SES endpoint (default: ``eu-west-3``).
aws_access_key_id: Explicit AWS access key. If omitted, boto3 uses
its default credential chain (env vars, instance profile, etc.).
aws_secret_access_key: Explicit AWS secret key. Must be provided
together with *aws_access_key_id*.
timeout: Boto3 connect/read timeout in seconds (default: 30.0).
Raises:
MailConfigurationError: If *region* is empty, *timeout* is not
positive, or only one of the two credential arguments is given.
Examples:
Send with EC2 instance profile (no explicit credentials)::
transport = SesTransport(region="eu-west-3")
await transport.send(message)
Send with explicit credentials::
transport = SesTransport(
region="us-east-1",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
)
await transport.send(message)
"""
[docs]
def __init__(
self,
*,
region: str = "eu-west-3",
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
timeout: float = 30.0,
) -> None:
"""Initialize the SES transport with AWS region, optional credentials, and a send timeout."""
if not region:
raise MailConfigurationError("AWS region is required")
has_key = aws_access_key_id is not None
has_secret = aws_secret_access_key is not None
if has_key != has_secret:
raise MailConfigurationError("Both aws_access_key_id and aws_secret_access_key must be provided together")
if timeout <= 0:
raise MailConfigurationError("Timeout must be greater than 0")
self._region = region
self._aws_access_key_id = aws_access_key_id
self._aws_secret_access_key = aws_secret_access_key
self._timeout = timeout
self._last_response: SesResponse | None = None
[docs]
def __repr__(self) -> str:
"""Redact AWS credentials from repr output."""
return (
f"SesTransport(region={self._region!r}, aws_access_key_id={'***' if self._aws_access_key_id else None!r})"
)
@property
def last_response(self) -> SesResponse | None:
"""Return the response from the last successful send."""
return self._last_response
[docs]
async def send(self, message: EmailMessage) -> None:
"""Send an email via AWS SES using ``send_raw_email``.
The message is sent as raw MIME bytes, preserving all headers,
body parts, and attachments exactly as built by :class:`EmailMessage`.
Boto3 is synchronous, so the actual API call runs inside
``run_in_executor`` to keep the event loop free.
When TRACE logging is enabled, request metadata is logged.
Args:
message: The email message to send.
Raises:
MailTransportError: If the SES API call fails.
MailConfigurationError: If boto3 is not installed or AWS
credentials cannot be resolved.
"""
try:
import boto3
from botocore.config import Config as BotoConfig
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
NoCredentialsError,
)
except ImportError as e:
raise MailConfigurationError(
"boto3 is required for SesTransport. Install with: pip install kstlib[ses]"
) from e
trace_enabled = log.isEnabledFor(TRACE_LEVEL)
if trace_enabled:
log.log(TRACE_LEVEL, "[SES] Sending email via AWS SES (region=%s)", self._region)
log.log(TRACE_LEVEL, "[SES] From: %s, To: %s", message.get("From"), message.get("To"))
raw_message = self._build_raw_message(message)
client = self._create_client(boto3, BotoConfig)
loop = asyncio.get_running_loop()
try:
response: dict[str, Any] = await loop.run_in_executor(
None,
lambda: client.send_raw_email(RawMessage={"Data": raw_message}),
)
message_id = response.get("MessageId", "")
self._last_response = SesResponse(message_id=message_id)
log.debug("Email sent via SES: %s", message_id)
if trace_enabled:
log.log(TRACE_LEVEL, "[SES] Message sent successfully, MessageId=%s", message_id)
except ClientError as e:
error_msg = e.response.get("Error", {}).get("Message", str(e))
if trace_enabled:
log.log(TRACE_LEVEL, "[SES] ClientError: %s", error_msg)
raise MailTransportError(f"SES API error: {error_msg}") from e
except NoCredentialsError as e:
if trace_enabled:
log.log(TRACE_LEVEL, "[SES] NoCredentialsError: %s", e)
raise MailConfigurationError(f"AWS credentials not found: {e}") from e
except EndpointConnectionError as e:
if trace_enabled:
log.log(TRACE_LEVEL, "[SES] EndpointConnectionError: %s", e)
raise MailTransportError(f"SES endpoint connection failed: {e}") from e
def _build_raw_message(self, message: EmailMessage) -> bytes:
"""Convert an EmailMessage to raw MIME bytes for SES.
Args:
message: The email message.
Returns:
Raw MIME bytes suitable for ``send_raw_email``.
"""
return message.as_bytes()
def _create_client(self, boto3_module: Any, boto_config_cls: Any) -> Any:
"""Create a boto3 SES client with stored configuration.
Args:
boto3_module: The imported boto3 module.
boto_config_cls: The botocore Config class.
Returns:
A boto3 SES client instance.
"""
kwargs: dict[str, Any] = {
"service_name": "ses",
"region_name": self._region,
"config": boto_config_cls(
connect_timeout=self._timeout,
read_timeout=self._timeout,
),
}
if self._aws_access_key_id is not None:
kwargs["aws_access_key_id"] = self._aws_access_key_id
kwargs["aws_secret_access_key"] = self._aws_secret_access_key
return boto3_module.client(**kwargs)