"""Fluent mail builder with transport-agnostic delivery."""
from __future__ import annotations
# pylint: disable=too-many-instance-attributes
import contextlib
import copy
import functools
import html
import inspect
import mimetypes
import ssl
import time
import traceback
from dataclasses import dataclass
from datetime import datetime, timezone
from email.message import EmailMessage
from typing import TYPE_CHECKING, Any, Literal, ParamSpec, TypeVar, overload
from kstlib._shared.jinja import render_jinja
from kstlib.limits import MailLimits, get_mail_limits
from kstlib.logging import get_logger
from kstlib.mail._helpers import _load_mail_section
from kstlib.mail.exceptions import MailConfigurationError, MailTransportError, MailValidationError
from kstlib.mail.filesystem import MailFilesystemGuards
from kstlib.mail.throttle import MailThrottle, get_or_create_throttle
from kstlib.ssl import get_ssl_config, validate_ca_bundle_path
from kstlib.utils import (
EmailAddress,
ValidationError,
format_bytes,
normalize_address_list,
parse_email_address,
)
log = get_logger(__name__)
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Mapping
from pathlib import Path
from kstlib.mail.collector import NotifyCollector
from kstlib.mail.transport import AsyncMailTransport, MailTransport
from kstlib.mail.transports.resend import ResendTransport
from kstlib.mail.transports.smtp import SMTPTransport
TransportLike = MailTransport | AsyncMailTransport
P = ParamSpec("P")
R = TypeVar("R")
_DEFAULT_ENCODING = "utf-8"
# Envelope fields that may appear under ``mail.presets.<name>.defaults``.
# Any other key is logged once as a WARNING and ignored (forward-compat).
_KNOWN_PRESET_ENVELOPE_DEFAULTS: frozenset[str] = frozenset({"sender", "reply_to"})
@dataclass(frozen=True, slots=True)
class _InlineResource:
cid: str
path: Path
@dataclass(frozen=True, slots=True)
class _NotifyConfig:
"""Internal closure config for the notify decorator wrappers."""
builder: MailBuilder
subject: str
send_on_success: bool
send_on_failure: bool
collector: NotifyCollector | None
include_return: bool
include_traceback: bool
[docs]
@dataclass(slots=True)
class NotifyResult:
"""Result of a notified function execution.
Attributes:
function_name: Name of the decorated function.
success: Whether the function completed without exception.
started_at: UTC timestamp when execution started.
ended_at: UTC timestamp when execution ended.
duration_ms: Execution duration in milliseconds.
return_value: Function return value (if success and include_return=True).
exception: Exception raised (if failure).
traceback_str: Formatted traceback string (if failure and include_traceback=True).
"""
function_name: str
success: bool
started_at: datetime
ended_at: datetime
duration_ms: float
return_value: Any = None
exception: BaseException | None = None
traceback_str: str | None = None
[docs]
class MailBuilder:
"""Compose and send emails using a fluent interface.
Supports plain text and HTML bodies, file attachments, inline images,
and template-based content with placeholder substitution.
Transport resolution cascade, highest priority first:
1. ``transport=`` kwarg (explicit instance, backward compatible)
2. ``preset=`` kwarg (named preset under ``mail.presets`` in config)
3. ``mail.default`` in ``kstlib.conf.yml`` (auto-resolved preset name)
4. ``None``: no transport. ``.build()`` still works. ``.send()`` raises
``MailConfigurationError``.
Preset envelope defaults:
A preset may declare a ``defaults`` subsection with ``sender`` and
``reply_to`` keys. These are applied automatically when the builder
is initialised with ``preset=`` or when the config's ``mail.default``
resolves to such a preset. User-provided values via ``.sender()`` or
``.reply_to()`` always override the preset defaults (user wins).
Deliberately scoped to ``sender`` and ``reply_to`` only:
``to``/``cc``/``bcc`` are excluded on purpose to prevent silent
accidental sends to the preset's audience. Unsupported keys inside
``defaults`` are logged once as a WARNING and ignored (forward
compatibility).
Example YAML::
mail:
presets:
corporate:
transport: smtp
host: smtp-secure.corp.local
defaults:
sender: "Service Notifications <notify@corp.local>"
reply_to: "Service Notifications <notify@corp.local>"
Example:
Build an email without sending (useful for inspection)::
>>> from kstlib.mail import MailBuilder
>>> mail = (
... MailBuilder()
... .sender("noreply@example.com")
... .to("user@example.com")
... .subject("Welcome!")
... .message("<h1>Hello</h1>", content_type="html")
... )
>>> msg = mail.build()
>>> msg["Subject"]
'Welcome!'
With a configured transport for actual delivery (explicit)::
>>> from kstlib.mail import MailBuilder
>>> from kstlib.mail.transports import SMTPTransport
>>> transport = SMTPTransport(host="smtp.example.com", port=587)
>>> mail = MailBuilder(transport=transport)
>>> # mail.sender(...).to(...).subject(...).message(...).send()
Config-driven via a named preset (defaults.sender is pre-filled)::
>>> mail = MailBuilder(preset="corporate") # doctest: +SKIP
>>> # mail.to(...).subject(...).message(...).send()
Config-driven via ``mail.default`` preset::
>>> mail = MailBuilder() # doctest: +SKIP
>>> # Uses preset referenced by mail.default in kstlib.conf.yml
"""
[docs]
def __init__(
self,
*,
transport: MailTransport | None = None,
preset: str | None = None,
encoding: str = _DEFAULT_ENCODING,
filesystem: MailFilesystemGuards | None = None,
limits: MailLimits | None = None,
throttle: bool | dict[str, Any] | None = None,
) -> None:
"""Initialise the builder with optional transport, preset, charset, and guardrails.
Args:
transport: Explicit transport instance. Takes priority over ``preset``
and the config default. Backward compatible: passing this is
equivalent to the pre-preset API. When set, preset envelope
defaults are **not** applied.
preset: Name of a preset declared under ``mail.presets`` in
``kstlib.conf.yml``. Resolved immediately. Raises
``MailConfigurationError`` if the preset does not exist.
Any ``defaults.sender`` / ``defaults.reply_to`` declared on
the preset are applied right after transport resolution.
encoding: Character encoding for message bodies (default: utf-8).
filesystem: Filesystem guardrails for attachments, inline
resources, and templates.
limits: Message and attachment limits.
throttle: Anti-spam kill switch. ``False`` disables the
throttle on this builder. A ``dict`` (e.g.
``{"rate": 100, "per": 3600.0, "on_exceed": "warn"}``)
builds a per-instance custom throttle. ``None`` (default)
resolves the throttle from configuration via the cascade
``mail.presets.<name>.throttle`` > ``mail.throttle`` >
code defaults. See :class:`~kstlib.mail.MailThrottle` for
the available knobs.
Raises:
MailConfigurationError: If ``preset`` is passed but does not
resolve to a valid preset in configuration, if a preset
default has a non-string value, or if the resolved
throttle configuration is invalid.
MailValidationError: If a preset default holds an unparseable
email address.
"""
resolved_preset_name: str | None
if transport is not None:
self._transport: TransportLike | None = transport
resolved_preset_name = None
elif preset is not None:
self._transport = _build_transport_from_preset(preset)
resolved_preset_name = preset
else:
self._transport = _resolve_default_transport()
resolved_preset_name = _resolve_default_preset_name() if self._transport is not None else None
self._encoding = encoding
self._filesystem = filesystem or MailFilesystemGuards.default()
self._limits = limits or get_mail_limits()
self._throttle: MailThrottle | None = get_or_create_throttle(resolved_preset_name, throttle)
self._sender: EmailAddress | None = None
self._reply_to: EmailAddress | None = None
self._to: list[EmailAddress] = []
self._cc: list[EmailAddress] = []
self._bcc: list[EmailAddress] = []
self._subject: str = ""
self._plain_body: str | None = None
self._html_body: str | None = None
self._attachments: list[Path] = []
self._inline: list[_InlineResource] = []
if resolved_preset_name is not None:
envelope_defaults = _load_preset_envelope_defaults(resolved_preset_name)
if envelope_defaults:
self._apply_preset_envelope_defaults(envelope_defaults)
# ------------------------------------------------------------------
# Addressing
# ------------------------------------------------------------------
[docs]
def transport(self, transport: MailTransport) -> MailBuilder:
"""Attach a transport backend to this builder."""
self._transport = transport
return self
[docs]
def sender(self, value: str) -> MailBuilder:
"""Set the sender address."""
self._sender = self._parse_address(value)
return self
[docs]
def reply_to(self, value: str | None) -> MailBuilder:
"""Set an optional reply-to address."""
self._reply_to = self._parse_address(value) if value else None
return self
[docs]
def to(self, *values: str) -> MailBuilder:
"""Append recipients to the ``To`` header."""
self._to.extend(self._parse_addresses(values))
return self
[docs]
def cc(self, *values: str) -> MailBuilder:
"""Append recipients to the ``Cc`` header."""
self._cc.extend(self._parse_addresses(values))
return self
[docs]
def bcc(self, *values: str) -> MailBuilder:
"""Append recipients to the ``Bcc`` header."""
self._bcc.extend(self._parse_addresses(values))
return self
# ------------------------------------------------------------------
# Content
# ------------------------------------------------------------------
[docs]
def subject(self, value: str) -> MailBuilder:
"""Set the message subject."""
self._subject = value
return self
[docs]
def message(
self,
content: str | None = None,
*,
content_type: Literal["plain", "html"] = "html",
template: str | Path | None = None,
placeholders: Mapping[str, Any] | None = None,
**extra_placeholders: Any,
) -> MailBuilder:
"""Populate the message body either via raw content or a template.
.. warning::
HTML content is **not** sanitized. The caller is responsible for
ensuring the HTML is safe. This is by design: mail templates are
authored by the application developer, not by end users.
Raises:
MailValidationError: If content_type is unsupported.
"""
body = self._resolve_body(content, template, placeholders, extra_placeholders)
if content_type == "html":
self._html_body = body
elif content_type == "plain":
self._plain_body = body
else: # pragma: no cover - defensive guard
raise MailValidationError(f"Unsupported content type: {content_type}")
return self
[docs]
def attach(self, *paths: str | Path) -> MailBuilder:
"""Attach binary files to the message.
Args:
*paths: One or more file paths to attach.
Returns:
Self for method chaining.
Raises:
MailValidationError: If no paths provided, attachment limit exceeded,
or file size exceeds configured limits.
"""
if not paths:
raise MailValidationError("attach() expects at least one file path")
for raw in paths:
path = self._filesystem.resolve_attachment(raw)
# Validate attachment count
if len(self._attachments) >= self._limits.max_attachments:
raise MailValidationError(f"Maximum of {self._limits.max_attachments} attachments exceeded")
# Validate file size
file_size = path.stat().st_size
if file_size > self._limits.max_attachment_size:
raise MailValidationError(
f"Attachment '{path.name}' exceeds size limit "
f"({format_bytes(file_size)} > {self._limits.max_attachment_size_display})"
)
self._attachments.append(path)
return self
[docs]
def attach_inline(self, cid: str, path: str | Path) -> MailBuilder:
"""Attach an inline resource (e.g. image referenced with ``cid:``).
Raises:
MailValidationError: If cid is empty or file size exceeds limits.
"""
if not cid:
raise MailValidationError("Inline resources require a non-empty content ID")
resource_path = self._filesystem.resolve_inline(path)
# Validate file size for inline resources
file_size = resource_path.stat().st_size
if file_size > self._limits.max_attachment_size:
raise MailValidationError(
f"Inline resource '{resource_path.name}' exceeds size limit "
f"({format_bytes(file_size)} > {self._limits.max_attachment_size_display})"
)
self._inline.append(_InlineResource(cid=cid, path=resource_path))
return self
# ------------------------------------------------------------------
# Build & send
# ------------------------------------------------------------------
[docs]
def build(self) -> EmailMessage:
"""Assemble and return an :class:`EmailMessage` without sending it.
Raises:
MailValidationError: If sender, recipient, or body is missing.
"""
sender = self._validate_ready()
message = self._initialise_message(sender)
self._apply_inline_resources(message)
self._apply_file_attachments(message)
return message
[docs]
def send(self) -> EmailMessage:
"""Build and send the email using the configured transport.
If a :class:`~kstlib.mail.MailThrottle` is attached (default,
unless disabled via ``throttle=False``), it is consulted before
the transport. When the bucket is empty, the throttle either
raises :class:`~kstlib.mail.MailThrottledError` (mode ``raise``)
or logs a security warning and returns the built message without
sending (mode ``warn``).
Returns:
The constructed :class:`~email.message.EmailMessage`. In
``warn`` mode, the returned message may have been dropped
silently (a ``WARNING [SECURITY]`` log is emitted).
Raises:
MailConfigurationError: If no transport has been configured.
MailTransportError: If the transport fails to deliver the message.
MailThrottledError: If the throttle is in ``raise`` mode and
the bucket is empty.
"""
from kstlib.mail.transport import AsyncMailTransport, MailTransport
if self._transport is None:
raise MailConfigurationError("No mail transport configured")
if isinstance(self._transport, AsyncMailTransport):
raise MailConfigurationError(
f"Transport '{type(self._transport).__name__}' is async-only; "
"use it directly via `await transport.send(message)` instead of "
"MailBuilder.send()."
)
assert isinstance(self._transport, MailTransport)
message = self.build()
if self._throttle is not None and not self._throttle.consume(self._subject):
return message
try:
self._transport.send(message)
except MailTransportError:
raise
except Exception as exc: # pragma: no cover - defensive guard
raise MailTransportError("Unexpected error during delivery") from exc
return message
# ------------------------------------------------------------------
# Notification decorator
# ------------------------------------------------------------------
def _snapshot(self) -> MailBuilder:
"""Create an independent copy of this builder for decoration.
Returns a copy that shares the transport and throttle but has
independent message state, so decorated functions don't interfere
with each other. The throttle MUST be shared (not deep-copied)
so that ``@mail.notify`` on a hot function cannot bypass the
rate limit by getting its own bucket.
"""
# Save transport and throttle before deepcopy: neither can be
# safely deep-copied (transport may hold sockets, throttle holds
# a threading.Lock) and both must be shared by reference so the
# snapshot enforces the same kill switch as the original builder.
transport = self._transport
throttle = self._throttle
self._transport = None
self._throttle = None
try:
snapshot = copy.deepcopy(self)
finally:
self._transport = transport
self._throttle = throttle
snapshot._transport = transport
snapshot._throttle = throttle
return snapshot
@overload
def notify(
self,
func: Callable[P, R],
/,
) -> Callable[P, R]: ...
@overload
def notify(
self,
func: None = None,
/,
*,
subject: str | None = None,
on_error_only: bool = False,
on_success_only: bool = False,
mode: str | None = None,
collector: NotifyCollector | None = None,
include_return: bool = False,
include_traceback: bool = True,
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
[docs]
def notify(
self,
func: Callable[P, R] | None = None,
/,
*,
subject: str | None = None,
on_error_only: bool = False,
on_success_only: bool = False,
mode: str | None = None,
collector: NotifyCollector | None = None,
include_return: bool = False,
include_traceback: bool = True,
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
"""Send email notifications on function execution.
Sends a notification email after the decorated function completes,
reporting success or failure with execution metrics. Optionally
accumulates results into a :class:`NotifyCollector` for run summaries.
Can be used with or without parentheses::
@mail.notify
def task(): ...
@mail.notify(subject="Step 1", on_error_only=True)
def task(): ...
@mail.notify(mode="ok")
def check(): ...
Filtering modes (mutually exclusive with ``on_*_only``):
- ``mode="both"`` (default): notify on both success and failure.
- ``mode="ok"`` (alias of ``on_success_only=True``): notify only on
successful execution.
- ``mode="ko"`` (alias of ``on_error_only=True``): notify only on
exception.
``mode`` is case-insensitive.
When ``collector`` is provided, every notification that passes the
active filter is also recorded into the collector (so the same gate
applies to mail send and to capture, guaranteeing one entry per
execution under the double-decorator pattern).
Args:
func: The function to decorate (when used without parentheses).
subject: Override the builder's subject for this notification.
on_error_only: Only send notification if the function raises.
on_success_only: Only send notification on successful return.
mode: Filtering mode (``"ok"``, ``"ko"``, or ``"both"``,
case-insensitive). Mutually exclusive with ``on_*_only``.
collector: Optional collector that records every result that
passes the active filter.
include_return: Include return value in success notifications.
include_traceback: Include traceback in failure notifications.
Returns:
Decorated function that sends notifications.
Raises:
MailValidationError: If ``mode`` and ``on_*_only`` are combined,
if ``on_error_only`` and ``on_success_only`` are both true,
or if ``mode`` is not in ``{"ok", "ko", "both"}``.
Example:
>>> from kstlib.mail import MailBuilder
>>> mail = MailBuilder().sender("bot@x.com").to("admin@x.com")
>>> _ = mail.subject("Daily ETL")
>>> @mail.notify(on_error_only=True)
... def extract():
... return {"rows": 100}
"""
effective_mode = self._resolve_notify_mode(
mode=mode,
on_error_only=on_error_only,
on_success_only=on_success_only,
)
def decorator(fn: Callable[P, R]) -> Callable[P, R]:
builder = self._snapshot()
cfg = _NotifyConfig(
builder=builder,
subject=subject if subject is not None else builder._subject,
send_on_success=effective_mode in {"ok", "both"},
send_on_failure=effective_mode in {"ko", "both"},
collector=collector,
include_return=include_return,
include_traceback=include_traceback,
)
if inspect.iscoroutinefunction(fn):
return self._build_async_notify_wrapper(fn, cfg) # type: ignore[arg-type]
return self._build_sync_notify_wrapper(fn, cfg)
if func is not None:
return decorator(func)
return decorator
@staticmethod
def _resolve_notify_mode(
*,
mode: str | None,
on_error_only: bool,
on_success_only: bool,
) -> str:
"""Validate notify filter kwargs and return the effective mode."""
if mode is not None and (on_error_only or on_success_only):
raise MailValidationError("Use mode= OR on_error_only/on_success_only, not both")
if on_error_only and on_success_only:
raise MailValidationError("on_error_only and on_success_only are mutually exclusive")
if mode is not None:
if not isinstance(mode, str):
raise MailValidationError(f"mode must be 'ok', 'ko', or 'both', got: {mode!r}")
normalized = mode.lower()
if normalized not in {"ok", "ko", "both"}:
raise MailValidationError(f"mode must be 'ok', 'ko', or 'both', got: {mode!r}")
return normalized
if on_error_only:
return "ko"
if on_success_only:
return "ok"
return "both"
@staticmethod
def _record_notify_outcome(cfg: _NotifyConfig, notify_result: NotifyResult) -> None:
"""Send the notification mail (best-effort) and capture into the collector."""
cfg.builder._send_notification(notify_result, cfg.subject, cfg.include_return)
if cfg.collector is not None:
cfg.collector.add(notify_result)
def _build_sync_notify_wrapper(
self,
fn: Callable[P, R],
cfg: _NotifyConfig,
) -> Callable[P, R]:
"""Build the sync wrapper for the notify decorator."""
@functools.wraps(fn)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
started_at = datetime.now(timezone.utc)
try:
result = fn(*args, **kwargs)
except BaseException as exc:
ended_at = datetime.now(timezone.utc)
duration_ms = (time.perf_counter() - start) * 1000
if cfg.send_on_failure:
self._record_notify_outcome(
cfg,
NotifyResult(
function_name=fn.__name__,
success=False,
started_at=started_at,
ended_at=ended_at,
duration_ms=duration_ms,
exception=exc,
traceback_str=traceback.format_exc() if cfg.include_traceback else None,
),
)
raise
ended_at = datetime.now(timezone.utc)
duration_ms = (time.perf_counter() - start) * 1000
if cfg.send_on_success:
self._record_notify_outcome(
cfg,
NotifyResult(
function_name=fn.__name__,
success=True,
started_at=started_at,
ended_at=ended_at,
duration_ms=duration_ms,
return_value=result if cfg.include_return else None,
),
)
return result
return sync_wrapper
def _build_async_notify_wrapper(
self,
fn: Callable[P, R],
cfg: _NotifyConfig,
) -> Callable[P, R]:
"""Build the async wrapper for the notify decorator."""
@functools.wraps(fn)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
started_at = datetime.now(timezone.utc)
try:
result = await fn(*args, **kwargs) # type: ignore[misc]
except BaseException as exc:
ended_at = datetime.now(timezone.utc)
duration_ms = (time.perf_counter() - start) * 1000
if cfg.send_on_failure:
self._record_notify_outcome(
cfg,
NotifyResult(
function_name=fn.__name__,
success=False,
started_at=started_at,
ended_at=ended_at,
duration_ms=duration_ms,
exception=exc,
traceback_str=traceback.format_exc() if cfg.include_traceback else None,
),
)
raise
ended_at = datetime.now(timezone.utc)
duration_ms = (time.perf_counter() - start) * 1000
if cfg.send_on_success:
self._record_notify_outcome(
cfg,
NotifyResult(
function_name=fn.__name__,
success=True,
started_at=started_at,
ended_at=ended_at,
duration_ms=duration_ms,
return_value=result if cfg.include_return else None,
),
)
return result # type: ignore[no-any-return]
return async_wrapper # type: ignore[return-value]
[docs]
def send_summary(
self,
collector: NotifyCollector,
*,
subject: str | None = None,
format: Literal["html", "plain", "monitor_table"] = "html", # noqa: A002 - public API name; mirrors documented format= kwarg
) -> EmailMessage:
"""Send a summary email built from a :class:`NotifyCollector`.
Operates on an isolated snapshot so the original builder state is
preserved for subsequent sends.
Args:
collector: The collector whose recorded results are rendered.
subject: Optional subject override for the summary email.
Falls back to the builder's current subject when omitted.
format: Output format for the body. ``"html"`` uses
:meth:`NotifyCollector.render_html`, ``"plain"`` uses
:meth:`NotifyCollector.render_plain`, ``"monitor_table"``
renders the collector's
:meth:`~NotifyCollector.to_monitor_table` with inline CSS
for email-safe HTML.
Returns:
The :class:`~email.message.EmailMessage` that was sent.
Raises:
MailValidationError: If ``format`` is not one of the supported
values.
"""
if format not in {"html", "plain", "monitor_table"}:
raise MailValidationError(f"format must be 'html', 'plain', or 'monitor_table', got: {format!r}")
snapshot = self._snapshot()
if subject is not None:
snapshot._subject = subject
snapshot._attachments = []
snapshot._inline = []
if format == "plain":
snapshot._plain_body = collector.render_plain()
snapshot._html_body = None
elif format == "html":
snapshot._plain_body = None
snapshot._html_body = collector.render_html()
else: # "monitor_table"
snapshot._plain_body = None
snapshot._html_body = collector.to_monitor_table().render(inline_css=True)
return snapshot.send()
def _send_notification(
self,
result: NotifyResult,
subject: str,
include_return: bool,
) -> None:
"""Send the notification email based on execution result."""
if result.success:
body = self._format_success_body(result, include_return)
full_subject = f"[OK] {subject} - {result.function_name}"
else:
body = self._format_failure_body(result)
full_subject = f"[FAILED] {subject} - {result.function_name}"
# Create fresh message with notification content
self._subject = full_subject
self._html_body = body
self._plain_body = None
self._attachments = []
self._inline = []
# Don't let notification failure crash the decorated function
with contextlib.suppress(MailTransportError):
self.send()
def _format_success_body(self, result: NotifyResult, include_return: bool) -> str:
"""Format HTML body for successful execution notification."""
parts = [
"<h2>Function completed successfully</h2>",
f"<p><strong>Function:</strong> <code>{html.escape(result.function_name)}</code></p>",
f"<p><strong>Started:</strong> {result.started_at.isoformat()}</p>",
f"<p><strong>Ended:</strong> {result.ended_at.isoformat()}</p>",
f"<p><strong>Duration:</strong> {result.duration_ms:.2f} ms</p>",
]
if include_return and result.return_value is not None:
escaped_value = html.escape(repr(result.return_value))
parts.append(f"<p><strong>Return value:</strong></p><pre>{escaped_value}</pre>")
return "\n".join(parts)
def _format_failure_body(self, result: NotifyResult) -> str:
"""Format HTML body for failed execution notification."""
exc_type = type(result.exception).__name__ if result.exception else "Unknown"
exc_msg = str(result.exception) if result.exception else "No message"
parts = [
"<h2>Function execution failed</h2>",
f"<p><strong>Function:</strong> <code>{html.escape(result.function_name)}</code></p>",
f"<p><strong>Started:</strong> {result.started_at.isoformat()}</p>",
f"<p><strong>Ended:</strong> {result.ended_at.isoformat()}</p>",
f"<p><strong>Duration:</strong> {result.duration_ms:.2f} ms</p>",
f"<p><strong>Exception:</strong> {html.escape(exc_type)}: {html.escape(exc_msg)}</p>",
]
if result.traceback_str:
escaped_tb = html.escape(result.traceback_str)
parts.append(f"<h3>Traceback</h3><pre>{escaped_tb}</pre>")
return "\n".join(parts)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _parse_address(self, value: str) -> EmailAddress:
try:
return parse_email_address(value)
except ValidationError as exc:
raise MailValidationError(str(exc)) from exc
def _apply_preset_envelope_defaults(self, defaults: Mapping[str, Any]) -> None:
"""Seed ``_sender`` / ``_reply_to`` from a preset's ``defaults`` section.
User-provided values via :meth:`sender` or :meth:`reply_to` overwrite
these seeds by the natural assignment pattern - no extra bookkeeping
required.
Args:
defaults: Envelope defaults dict, already filtered to the
supported keys by :func:`_load_preset_envelope_defaults`.
Raises:
MailConfigurationError: If a supported key is present with a
non-string value.
MailValidationError: If a supported key holds an unparseable
email address (propagated from :meth:`_parse_address`).
"""
sender = defaults.get("sender")
if sender is not None:
if not isinstance(sender, str):
msg = f"preset defaults.sender must be a string, got {type(sender).__name__}: {sender!r}"
raise MailConfigurationError(msg)
self._sender = self._parse_address(sender)
reply_to = defaults.get("reply_to")
if reply_to is not None:
if not isinstance(reply_to, str):
msg = f"preset defaults.reply_to must be a string, got {type(reply_to).__name__}: {reply_to!r}"
raise MailConfigurationError(msg)
self._reply_to = self._parse_address(reply_to)
def _parse_addresses(self, values: Iterable[str]) -> list[EmailAddress]:
try:
return normalize_address_list(values)
except ValidationError as exc:
raise MailValidationError(str(exc)) from exc
def _resolve_body(
self,
content: str | None,
template: str | Path | None,
placeholders: Mapping[str, Any] | None,
extra_placeholders: Mapping[str, Any],
) -> str:
if template is not None:
template_path = self._filesystem.resolve_template(template)
content = template_path.read_text(encoding=self._encoding)
if content is None:
raise MailValidationError("Message content cannot be empty")
merged: dict[str, Any] = {}
if placeholders:
merged.update(dict(placeholders))
if extra_placeholders:
merged.update(extra_placeholders)
if merged:
content = render_jinja(content, merged)
return content
def _validate_ready(self) -> EmailAddress:
if self._sender is None:
raise MailValidationError("Sender must be provided")
if not (self._to or self._cc or self._bcc):
raise MailValidationError("At least one recipient must be specified")
if self._plain_body is None and self._html_body is None:
raise MailValidationError("Message body is empty")
return self._sender
def _initialise_message(self, sender: EmailAddress) -> EmailMessage:
message = EmailMessage()
message["From"] = sender.formatted
if self._reply_to:
message["Reply-To"] = self._reply_to.formatted
if self._to:
message["To"] = ", ".join(addr.formatted for addr in self._to)
if self._cc:
message["Cc"] = ", ".join(addr.formatted for addr in self._cc)
if self._bcc:
message["Bcc"] = ", ".join(addr.formatted for addr in self._bcc)
if self._subject:
message["Subject"] = self._subject
plain = self._plain_body if self._plain_body is not None else ""
message.set_content(plain, subtype="plain", charset=self._encoding)
if self._html_body is not None:
message.add_alternative(self._html_body, subtype="html", charset=self._encoding)
return message
def _apply_inline_resources(self, message: EmailMessage) -> None:
if not self._inline:
return
html_part = message.get_body("html")
if html_part is None:
raise MailValidationError("Inline resources require an HTML body")
for resource in self._inline:
data = resource.path.read_bytes()
maintype, subtype = _detect_mime(resource.path)
html_part.add_related(
data,
maintype=maintype,
subtype=subtype,
cid=f"<{resource.cid}>",
filename=resource.path.name,
)
def _apply_file_attachments(self, message: EmailMessage) -> None:
for attachment in self._attachments:
data = attachment.read_bytes()
maintype, subtype = _detect_mime(attachment)
message.add_attachment(
data,
maintype=maintype,
subtype=subtype,
filename=attachment.name,
)
def _detect_mime(path: Path) -> tuple[str, str]:
guessed, _ = mimetypes.guess_type(path.name)
if not guessed:
return "application", "octet-stream"
maintype, subtype = guessed.split("/", 1)
return maintype, subtype
# ----------------------------------------------------------------------
# Config-driven transport resolution
# ----------------------------------------------------------------------
_SUPPORTED_TRANSPORTS = ("smtp", "resend")
def _resolve_default_transport() -> TransportLike | None:
"""Resolve the transport from ``mail.default`` in configuration.
Returns ``None`` when no default is configured or when the config is
unavailable. The returned ``None`` preserves the legacy behaviour:
``.build()`` keeps working and ``.send()`` raises at call time.
Raises:
MailConfigurationError: If ``mail.default`` points to an unknown
preset. This is a user-visible misconfiguration, so fail fast.
"""
try:
mail_cfg = _load_mail_section()
except MailConfigurationError:
return None
if mail_cfg is None or not hasattr(mail_cfg, "get"):
return None
default_name = mail_cfg.get("default")
if not default_name:
return None
return _build_transport_from_preset(str(default_name))
def _resolve_default_preset_name() -> str | None:
"""Return the preset name referenced by ``mail.default``, or ``None``.
Swallows any config loading error and returns ``None`` so that
``MailBuilder()`` stays usable even when the config file is missing.
Mirrors the silent-empty contract of :func:`_load_mail_section`.
"""
try:
mail_cfg = _load_mail_section()
except MailConfigurationError:
return None
if mail_cfg is None or not hasattr(mail_cfg, "get"):
return None
default_name = mail_cfg.get("default")
if not default_name:
return None
return str(default_name)
def _load_preset_envelope_defaults(preset_name: str) -> dict[str, Any]:
"""Read the ``defaults`` subsection of a named mail preset.
Returns an empty dict silently when the config is unavailable, the
preset does not exist, the preset has no ``defaults`` section, or the
section is empty / of the wrong shape. Only keys in
:data:`_KNOWN_PRESET_ENVELOPE_DEFAULTS` are returned; any other key is
logged once as a WARNING (batched, one log per call) and ignored. This
keeps old YAML files working when kstlib later adds new supported keys.
Args:
preset_name: Name of the preset declared under ``mail.presets``.
Returns:
Dict containing only the supported keys present in the preset
defaults. Empty dict if nothing is configured.
Examples:
>>> _load_preset_envelope_defaults("corporate") # doctest: +SKIP
{'sender': 'Service Notifications <notify@corp.local>'}
"""
try:
mail_cfg = _load_mail_section()
except MailConfigurationError:
return {}
if mail_cfg is None or not hasattr(mail_cfg, "get"):
return {}
presets = mail_cfg.get("presets")
if presets is None or not hasattr(presets, "get"):
return {}
preset_cfg = presets.get(preset_name)
if preset_cfg is None or not hasattr(preset_cfg, "get"):
return {}
raw_defaults = preset_cfg.get("defaults")
if raw_defaults is None or not hasattr(raw_defaults, "items"):
return {}
known: dict[str, Any] = {}
unknown: list[str] = []
for key, value in raw_defaults.items():
key_str = str(key)
if key_str in _KNOWN_PRESET_ENVELOPE_DEFAULTS:
known[key_str] = value
else:
unknown.append(key_str)
if unknown:
log.warning(
"mail preset %r has unsupported defaults keys: %s. Supported keys are: %s. Unsupported keys are ignored.",
preset_name,
sorted(unknown),
sorted(_KNOWN_PRESET_ENVELOPE_DEFAULTS),
)
return known
def _build_transport_from_preset(preset_name: str) -> TransportLike:
"""Build a transport from a named preset in the configuration.
Args:
preset_name: Name of the preset defined under ``mail.presets``.
Returns:
Configured ``MailTransport`` instance ready to use.
Raises:
MailConfigurationError: If the preset is not found, the transport
field is missing or unsupported, or required fields are absent.
"""
mail_cfg = _load_mail_section()
if mail_cfg is None:
raise MailConfigurationError(
f"Preset '{preset_name}' cannot be resolved: 'mail' section missing from configuration"
)
presets = mail_cfg.get("presets") if hasattr(mail_cfg, "get") else None
if not presets or preset_name not in presets:
available = sorted(presets.keys()) if presets else []
raise MailConfigurationError(f"Preset '{preset_name}' not found in mail.presets. Available: {available}")
preset_cfg = presets[preset_name]
transport_type = preset_cfg.get("transport") if hasattr(preset_cfg, "get") else None
if not transport_type:
raise MailConfigurationError(
f"Preset '{preset_name}' is missing required field 'transport'. Supported: {list(_SUPPORTED_TRANSPORTS)}"
)
if transport_type == "smtp":
return _build_smtp_transport(preset_cfg)
if transport_type == "resend":
return _build_resend_transport(preset_cfg)
raise MailConfigurationError(
f"Unknown transport type '{transport_type}' in preset '{preset_name}'. Supported: {list(_SUPPORTED_TRANSPORTS)}"
)
def _load_mail_ssl_section() -> Any | None:
"""Return the ``mail.ssl`` section of the configuration, or ``None``.
Isolates the defensive imports/lookups so :func:`_resolve_mail_ssl_config`
stays readable. Returns ``None`` whenever the section is missing or the
config loader cannot be reached.
"""
try:
mail_section = _load_mail_section()
except MailConfigurationError:
return None
if mail_section is None or not hasattr(mail_section, "get"):
return None
ssl_section = mail_section.get("ssl")
if ssl_section is None or not hasattr(ssl_section, "get"):
return None
return ssl_section
def _load_root_ssl_config() -> Any | None:
"""Return the root-level :class:`kstlib.ssl.SSLConfig`, or ``None``.
Swallows any loading error (missing config, unreadable YAML) and yields
``None`` so the caller keeps cascading to the Python default.
"""
try:
return get_ssl_config()
except Exception: # pylint: disable=broad-exception-caught
return None
def _cascade_mail_ssl_level(
verify: Any,
ca_bundle: Any,
source: str | None,
) -> tuple[Any, Any, str | None]:
"""Apply the ``mail.ssl.*`` cascade level to a partial resolution."""
if verify is not None and ca_bundle is not None:
return verify, ca_bundle, source
mail_ssl = _load_mail_ssl_section()
if mail_ssl is None:
return verify, ca_bundle, source
if verify is None:
candidate = mail_ssl.get("verify")
if candidate is not None:
verify = candidate
source = "mail.ssl"
if ca_bundle is None:
ca_bundle = mail_ssl.get("ca_bundle")
return verify, ca_bundle, source
def _cascade_root_ssl_level(
verify: Any,
ca_bundle: Any,
source: str | None,
) -> tuple[Any, Any, str | None]:
"""Apply the root ``ssl.*`` cascade level to a partial resolution."""
if verify is not None and ca_bundle is not None:
return verify, ca_bundle, source
root_ssl = _load_root_ssl_config()
if root_ssl is None:
return verify, ca_bundle, source
if verify is None:
verify = root_ssl.verify
source = "ssl (root)"
if ca_bundle is None:
ca_bundle = root_ssl.ca_bundle
return verify, ca_bundle, source
def _validate_mail_ssl_types(verify: Any, ca_bundle: Any) -> tuple[bool, str | None]:
"""Reject non-bool verify and non-str ca_bundle before SSLContext creation."""
if not isinstance(verify, bool):
msg = f"mail SSL verify must be bool, got {type(verify).__name__}: {verify!r}"
raise TypeError(msg)
if ca_bundle is not None and not isinstance(ca_bundle, str):
msg = f"mail SSL ca_bundle must be str or null, got {type(ca_bundle).__name__}"
raise TypeError(msg)
return verify, ca_bundle
def _warn_if_mail_verify_disabled(verify: bool, source: str | None) -> None:
"""Emit a single source-tagged WARNING when verify resolved to ``False``."""
if verify is False:
log.warning(
"[SECURITY] SSL certificate verification disabled for mail transport "
"(source: %s). This exposes the SMTP session to MITM attacks. Use only "
"in trusted environments, or provide ssl_ca_bundle to validate against a private CA.",
source,
)
def _resolve_mail_ssl_config(preset_cfg: Any) -> tuple[bool, str | None]:
"""Resolve SSL ``(verify, ca_bundle)`` via a 4-level cascade.
Priority (highest to lowest):
1. Preset level: ``preset_cfg.ssl_verify`` / ``preset_cfg.ssl_ca_bundle``
2. Mail level: ``config.mail.ssl.verify`` / ``config.mail.ssl.ca_bundle``
3. Root level: ``config.ssl.verify`` / ``config.ssl.ca_bundle``
(read via :func:`kstlib.ssl.get_ssl_config`)
4. Python defaults: ``True`` / ``None``
The two keys cascade **independently**: ``ssl_verify`` can come from the
preset while ``ssl_ca_bundle`` comes from ``mail.ssl``, for example.
When the resolved ``ssl_verify`` is ``False``, a single WARNING is logged
naming the source level (``"preset"``, ``"mail.ssl"``, ``"ssl (root)"``
or ``"default"``) to help operators debug misconfigured relays.
Args:
preset_cfg: Preset configuration section (Box or dict) for the
SMTP transport being built.
Returns:
Tuple ``(ssl_verify, ssl_ca_bundle)``. ``ssl_verify`` is always a
bool. ``ssl_ca_bundle`` is either the raw path string (path
validation is performed downstream in
:func:`_build_smtp_ssl_context`) or ``None``.
Raises:
TypeError: If a resolved ``ssl_verify`` value is not a bool, or if
``ssl_ca_bundle`` is not a string. YAML may emit ``"yes"`` or
other non-bool scalars; we reject rather than silently coerce.
"""
verify: Any = preset_cfg.get("ssl_verify") if hasattr(preset_cfg, "get") else None
ca_bundle: Any = preset_cfg.get("ssl_ca_bundle") if hasattr(preset_cfg, "get") else None
source: str | None = "preset" if verify is not None else None
verify, ca_bundle, source = _cascade_mail_ssl_level(verify, ca_bundle, source)
verify, ca_bundle, source = _cascade_root_ssl_level(verify, ca_bundle, source)
if verify is None:
verify = True
source = "default"
resolved_verify, resolved_ca_bundle = _validate_mail_ssl_types(verify, ca_bundle)
_warn_if_mail_verify_disabled(resolved_verify, source)
return resolved_verify, resolved_ca_bundle
def _build_smtp_ssl_context(ssl_verify: bool, ssl_ca_bundle: str | None) -> ssl.SSLContext:
"""Build a stdlib :class:`ssl.SSLContext` from resolved cascade values.
Precedence rule: ``ssl_ca_bundle`` takes priority over ``ssl_verify``.
Providing a CA bundle expresses intent to verify (against that bundle),
so the returned context keeps ``verify_mode=CERT_REQUIRED`` and
``check_hostname=True``. Only when no CA bundle is set and
``ssl_verify`` is ``False`` does the context fall back to ``CERT_NONE``.
This function is **pure**: it does not read configuration nor emit log
warnings. The cascade and the security warning live in
:func:`_resolve_mail_ssl_config`.
Args:
ssl_verify: Resolved verify flag (bool).
ssl_ca_bundle: Resolved CA bundle path, or ``None``.
Returns:
Configured :class:`ssl.SSLContext` suitable for
:meth:`smtplib.SMTP.starttls` or :class:`smtplib.SMTP_SSL`.
Raises:
MailConfigurationError: If ``ssl_ca_bundle`` is set but invalid
(path traversal, null byte, missing file, non-PEM content,
unreadable, etc.). Validation is delegated to
:func:`kstlib.ssl.validate_ca_bundle_path`.
"""
if ssl_ca_bundle is not None:
try:
validated_path = validate_ca_bundle_path(ssl_ca_bundle)
except (TypeError, ValueError) as exc:
msg = f"Invalid ssl_ca_bundle for mail transport: {exc}"
raise MailConfigurationError(msg) from exc
return ssl.create_default_context(cafile=validated_path)
if not ssl_verify:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
return ssl.create_default_context()
def _build_smtp_transport(cfg: Any) -> SMTPTransport:
"""Build ``SMTPTransport`` from a preset config section.
SSL configuration follows a 4-level cascade (preset > ``mail.ssl`` >
root ``ssl`` > Python default). See :func:`_resolve_mail_ssl_config`
for the detailed priority rules and :func:`_build_smtp_ssl_context`
for the context construction.
Args:
cfg: Box/dict with smtp preset fields (host, port, login, password,
starttls, ssl, timeout, ssl_verify, ssl_ca_bundle).
Returns:
Configured ``SMTPTransport`` instance.
Raises:
MailConfigurationError: If the ``host`` field is missing or if
``ssl_ca_bundle`` is invalid.
TypeError: If ``ssl_verify`` is not a bool (after cascade).
"""
from kstlib.mail.transports.smtp import SMTPCredentials, SMTPSecurity, SMTPTransport
host = cfg.get("host") if hasattr(cfg, "get") else None
if not host:
raise MailConfigurationError("SMTP preset requires a 'host' field")
port = cfg.get("port", 587)
timeout = cfg.get("timeout")
login = cfg.get("login")
password = cfg.get("password")
credentials = SMTPCredentials(username=login, password=password) if login else None
ssl_verify, ssl_ca_bundle = _resolve_mail_ssl_config(cfg)
ssl_context = _build_smtp_ssl_context(ssl_verify, ssl_ca_bundle)
security = SMTPSecurity(
use_ssl=bool(cfg.get("ssl", False)),
use_starttls=bool(cfg.get("starttls", True)),
ssl_context=ssl_context,
)
return SMTPTransport(
host=str(host),
port=int(port),
credentials=credentials,
security=security,
timeout=float(timeout) if timeout is not None else None,
)
def _build_resend_transport(cfg: Any) -> ResendTransport:
"""Build ``ResendTransport`` from a preset config section.
Args:
cfg: Box/dict with resend preset fields (api_key, timeout).
Returns:
Configured ``ResendTransport`` instance.
Raises:
MailConfigurationError: If the ``api_key`` field is missing.
"""
from kstlib.mail.transports.resend import ResendTransport
api_key = cfg.get("api_key") if hasattr(cfg, "get") else None
if not api_key:
raise MailConfigurationError("Resend preset requires an 'api_key' field")
timeout = cfg.get("timeout", 30.0)
return ResendTransport(api_key=str(api_key), timeout=float(timeout))
__all__ = ["MailBuilder", "NotifyResult"]