Source code for kstlib.mail.throttle

"""Mail throttle (anti-spam kill switch) with config cascade and singleton per preset.

Wraps :class:`kstlib.resilience.RateLimiter` with mail-specific exception
type, on_exceed policy, security logging, and a singleton registry per
preset to prevent bypass via multiple builder instances.

Hard limits are shared with the alerts module
(``HARD_MIN/MAX_THROTTLE_RATE`` and ``HARD_MIN/MAX_THROTTLE_PER`` in
:mod:`kstlib.limits`). Mode ``"drop"`` is intentionally rejected at init:
a security event (rate-limit exceeded) must never be silent.

Resolution cascade (most specific first):

1. ``mail.presets.<preset_name>.throttle.<key>`` (preset-level)
2. ``mail.throttle.<key>`` (mail-level)
3. Defaults from :mod:`kstlib.limits`

Each key (``enabled``, ``rate``, ``per``, ``on_exceed``) cascades
independently, mirroring the SSL cascade pattern in the mail builder.

Examples:
    Default throttle (config-driven)::

        from kstlib.mail import MailBuilder
        mail = MailBuilder(preset="corporate")
        mail.sender("a@x.com").to("b@x.com").subject("Hi").message("...").send()

    Disable per builder (tests, single-shot scripts)::

        mail = MailBuilder(preset="corporate", throttle=False)

    Custom override per builder::

        mail = MailBuilder(
            preset="corporate",
            throttle={"rate": 100, "per": 3600.0, "on_exceed": "warn"},
        )

"""

from __future__ import annotations

import threading
from typing import Any, Literal

from kstlib.limits import (
    DEFAULT_MAIL_THROTTLE_ON_EXCEED,
    DEFAULT_MAIL_THROTTLE_PER,
    DEFAULT_MAIL_THROTTLE_RATE,
    HARD_MAX_THROTTLE_PER,
    HARD_MAX_THROTTLE_RATE,
    HARD_MAX_THROTTLE_REGISTRY_SIZE,
    HARD_MIN_THROTTLE_PER,
    HARD_MIN_THROTTLE_RATE,
)
from kstlib.logging import get_logger
from kstlib.mail._helpers import _load_mail_section
from kstlib.mail.exceptions import MailConfigurationError, MailThrottledError
from kstlib.resilience import RateLimiter

log = get_logger(__name__)

OnExceedMode = Literal["raise", "warn"]

_VALID_ON_EXCEED: frozenset[str] = frozenset({"raise", "warn"})
_REJECTED_ON_EXCEED: frozenset[str] = frozenset({"drop"})
_VALID_THROTTLE_KEYS: frozenset[str] = frozenset({"enabled", "rate", "per", "on_exceed"})

_MAX_SUBJECT_LOG_LEN = 80
_SUBJECT_NOT_SET = "<subject not set>"
_SUBJECT_NULL_BYTE = "<subject contains null byte>"

_throttle_registry: dict[str, MailThrottle] = {}
_registry_lock = threading.Lock()


[docs] class MailThrottle: """Token-bucket throttle for mail sends, config-driven. Singleton per preset (or per builder for explicit transports). Wraps :class:`kstlib.resilience.RateLimiter` with mail-specific exception type and on_exceed behavior. The throttle is a kill switch against accidental or buggy mail spam: runaway batch loops, recursion, exception handlers that mail, the ``@mail.notify`` decorator on a hot function. It is enforced before the transport is invoked. Mode ``"drop"`` (silent) is intentionally rejected at init: a security event must never be silent. Valid modes are ``"raise"`` and ``"warn"``, and both emit a ``WARNING [SECURITY]`` log per blocked send. Args: rate: Maximum mails per period. Must be ``int`` in ``[HARD_MIN_THROTTLE_RATE, HARD_MAX_THROTTLE_RATE]``. per: Period in seconds. Must be ``int`` or ``float`` in ``[HARD_MIN_THROTTLE_PER, HARD_MAX_THROTTLE_PER]``. on_exceed: Policy when the bucket is empty. ``"raise"`` raises :class:`~kstlib.mail.MailThrottledError` after the security warning; ``"warn"`` logs the warning and returns silently. Raises: MailConfigurationError: If a parameter is out of bounds, of a wrong type, or if ``on_exceed`` is ``"drop"`` (rejected). Examples: Strict mode (default):: throttle = MailThrottle(rate=20, per=60.0) if throttle.consume(subject_for_log="Daily report"): send_mail() # MailThrottledError raised on the 21st call within 60s Warn-only mode (operational critical channels):: throttle = MailThrottle(rate=5, per=60.0, on_exceed="warn") allowed = throttle.consume("Critical alert") if not allowed: pass # silently dropped, [SECURITY] logged Inspect the resolved configuration on a fresh instance: >>> throttle = MailThrottle(rate=20, per=60.0) >>> throttle.rate 20 >>> throttle.per 60.0 >>> throttle.on_exceed 'raise' """ __slots__ = ("_limiter", "_on_exceed", "_per", "_rate")
[docs] def __init__( self, *, rate: int = DEFAULT_MAIL_THROTTLE_RATE, per: float = DEFAULT_MAIL_THROTTLE_PER, on_exceed: OnExceedMode = DEFAULT_MAIL_THROTTLE_ON_EXCEED, ) -> None: """Validate parameters and build the underlying RateLimiter.""" self._validate_on_exceed(on_exceed) self._validate_rate(rate) self._validate_per(per) self._limiter = RateLimiter(rate=float(rate), per=float(per)) self._on_exceed: OnExceedMode = on_exceed self._rate: int = rate self._per: float = float(per)
@staticmethod def _validate_on_exceed(on_exceed: Any) -> None: """Reject 'drop' explicitly and any other unknown value.""" if on_exceed in _REJECTED_ON_EXCEED: msg = ( f"throttle.on_exceed={on_exceed!r} is rejected: silent drop " f"is forbidden by kstlib logging convention (security " f"events must never be silent). " f"Valid modes: {sorted(_VALID_ON_EXCEED)}." ) raise MailConfigurationError(msg) if on_exceed not in _VALID_ON_EXCEED: msg = f"throttle.on_exceed must be one of {sorted(_VALID_ON_EXCEED)}, got {on_exceed!r}." raise MailConfigurationError(msg) @staticmethod def _validate_rate(rate: Any) -> None: """Reject non-int rate and out-of-bounds values.""" # bool is a subclass of int in Python, exclude it explicitly. if isinstance(rate, bool) or not isinstance(rate, int): msg = f"throttle.rate must be int, got {type(rate).__name__}: {rate!r}." raise MailConfigurationError(msg) if rate < HARD_MIN_THROTTLE_RATE: msg = f"throttle.rate must be >= {HARD_MIN_THROTTLE_RATE}, got {rate}." raise MailConfigurationError(msg) if rate > HARD_MAX_THROTTLE_RATE: msg = f"throttle.rate {rate} exceeds HARD_MAX {HARD_MAX_THROTTLE_RATE}." raise MailConfigurationError(msg) @staticmethod def _validate_per(per: Any) -> None: """Reject non-numeric per and out-of-bounds values.""" if isinstance(per, bool) or not isinstance(per, (int, float)): msg = f"throttle.per must be int or float, got {type(per).__name__}: {per!r}." raise MailConfigurationError(msg) if per < HARD_MIN_THROTTLE_PER: msg = f"throttle.per must be >= {HARD_MIN_THROTTLE_PER}, got {per}." raise MailConfigurationError(msg) if per > HARD_MAX_THROTTLE_PER: msg = f"throttle.per {per} exceeds HARD_MAX {HARD_MAX_THROTTLE_PER}." raise MailConfigurationError(msg) @property def rate(self) -> int: """Configured rate (mails per period).""" return self._rate @property def per(self) -> float: """Configured period in seconds.""" return self._per @property def on_exceed(self) -> OnExceedMode: """Configured policy when the bucket is empty.""" return self._on_exceed
[docs] def consume(self, subject_for_log: str | None = None) -> bool: """Try to consume one token, applying the on_exceed policy on failure. Args: subject_for_log: Optional subject of the mail being sent. Truncated to 80 chars in the log; if it contains a null byte, replaced by a placeholder. Used for debug only, never for routing. Returns: ``True`` if the caller should proceed with the send, ``False`` if the send should be dropped silently (``warn`` mode only). Raises: MailThrottledError: If throttled and ``on_exceed='raise'``. """ if self._limiter.try_acquire(): return True safe_subject = self._sanitize_subject(subject_for_log) if self._on_exceed == "raise": log.warning( "[SECURITY] Mail throttle exceeded (rate=%d, per=%.1fs). " "Raising MailThrottledError. Subject dropped: %s", self._rate, self._per, safe_subject, ) msg = f"Mail throttle exceeded: {self._rate} sends per {self._per}s. Subject dropped: {safe_subject}" raise MailThrottledError(msg) log.warning( "[SECURITY] Mail throttle exceeded (rate=%d, per=%.1fs). Mail dropped silently (mode=warn). Subject: %s", self._rate, self._per, safe_subject, ) return False
@staticmethod def _sanitize_subject(raw: str | None) -> str: """Sanitize a subject for log (truncate, null-byte, None handling). Hardening for log injection / terminal escape / log size blowup. Subject is debug-only; it never affects routing or delivery. """ if raw is None or raw == "": return _SUBJECT_NOT_SET if "\x00" in raw: return _SUBJECT_NULL_BYTE if len(raw) > _MAX_SUBJECT_LOG_LEN: return raw[:_MAX_SUBJECT_LOG_LEN] + "..." return raw
[docs] def __repr__(self) -> str: """Return a non-sensitive repr (no builder content, no recipients).""" return f"MailThrottle(rate={self._rate}, per={self._per}, on_exceed={self._on_exceed!r})"
def get_or_create_throttle( preset_name: str | None, config: dict[str, Any] | bool | None, ) -> MailThrottle | None: """Resolve the throttle for a builder, applying the cascade and singleton. Resolution order (most specific first): 1. Explicit kwarg ``throttle=False`` -> disabled (returns ``None``). 2. Explicit kwarg ``throttle=dict`` -> per-instance throttle (no singleton; unknown keys rejected; missing keys filled from defaults). 3. ``throttle=None`` (default) -> YAML cascade: a. ``mail.presets.<preset_name>.throttle.<key>`` (preset-level) b. ``mail.throttle.<key>`` (mail-level) c. Defaults from :mod:`kstlib.limits` If ``enabled=False`` at the most-specific cascade level, returns ``None``. Otherwise, builds (and caches as singleton) a :class:`MailThrottle` keyed on ``preset_name``. For an explicit transport (``preset_name=None``), the preset-level cascade is skipped, and the resulting throttle is per-instance (no singleton, no shared state across builders). Args: preset_name: Preset name or ``None`` for an explicit transport. config: Builder ``throttle=`` kwarg: ``False`` to disable, a ``dict`` for a per-instance custom throttle, or ``None`` for config-driven cascade. Returns: :class:`MailThrottle` to enforce, or ``None`` if disabled. Raises: MailConfigurationError: If a dict has unknown keys, or if any cascade-resolved value violates hard limits. """ if config is False: log.debug( "Mail throttle disabled (kwarg throttle=False), preset=%s", preset_name or "<none>", ) return None if isinstance(config, dict): return _build_from_dict(config, preset_name=preset_name) return _resolve_cascade(preset_name) def _build_from_dict(config: dict[str, Any], *, preset_name: str | None) -> MailThrottle | None: """Build a per-instance throttle from a dict kwarg, validating keys.""" _reject_unknown_throttle_keys(config, source="kwarg") if config.get("enabled") is False: log.debug( "Mail throttle disabled (dict enabled=false), preset=%s", preset_name or "<none>", ) return None kwargs = {k: v for k, v in config.items() if k != "enabled"} throttle = MailThrottle(**kwargs) log.debug( "Mail throttle resolved: rate=%d, per=%.1fs, mode=%s, source=kwarg, preset=%s", throttle.rate, throttle.per, throttle.on_exceed, preset_name or "<none>", ) return throttle def _resolve_cascade(preset_name: str | None) -> MailThrottle | None: """Resolve via YAML cascade, returning a (cached) singleton or None.""" mail_cfg = _load_mail_section(silent=True) preset_throttle = _read_preset_throttle(mail_cfg, preset_name) mail_throttle = _read_mail_throttle(mail_cfg) enabled = _resolve_key(preset_throttle, mail_throttle, "enabled", default=True) if enabled is False: log.debug( "Mail throttle disabled (cascade enabled=false), preset=%s", preset_name or "<none>", ) return None rate = _resolve_key(preset_throttle, mail_throttle, "rate", default=DEFAULT_MAIL_THROTTLE_RATE) per = _resolve_key(preset_throttle, mail_throttle, "per", default=DEFAULT_MAIL_THROTTLE_PER) on_exceed = _resolve_key( preset_throttle, mail_throttle, "on_exceed", default=DEFAULT_MAIL_THROTTLE_ON_EXCEED, ) source = _resolve_source(preset_throttle, mail_throttle) if preset_name is not None: with _registry_lock: cached = _throttle_registry.get(preset_name) if cached is not None: log.debug( "Mail throttle reused (singleton): rate=%d, per=%.1fs, mode=%s, source=%s, preset=%s", cached.rate, cached.per, cached.on_exceed, source, preset_name, ) return cached if len(_throttle_registry) >= HARD_MAX_THROTTLE_REGISTRY_SIZE: log.warning( "[SECURITY] Mail throttle registry size cap reached " "(%d entries). Refusing new entry for preset=%s. " "This usually indicates dynamic preset names " "(e.g. UUID-suffixed); preset names should be a " "small static set.", HARD_MAX_THROTTLE_REGISTRY_SIZE, preset_name, ) msg = ( f"Mail throttle registry size cap reached " f"({HARD_MAX_THROTTLE_REGISTRY_SIZE} entries). " f"This usually indicates dynamic preset names " f"(e.g. UUID-suffixed). Preset names should be a " f"small static set; check your configuration." ) raise MailConfigurationError(msg) throttle = MailThrottle(rate=rate, per=per, on_exceed=on_exceed) _throttle_registry[preset_name] = throttle log.debug( "Mail throttle resolved: rate=%d, per=%.1fs, mode=%s, source=%s, preset=%s", throttle.rate, throttle.per, throttle.on_exceed, source, preset_name, ) return throttle throttle = MailThrottle(rate=rate, per=per, on_exceed=on_exceed) log.debug( "Mail throttle resolved (per-instance, no preset): rate=%d, per=%.1fs, mode=%s, source=%s", throttle.rate, throttle.per, throttle.on_exceed, source, ) return throttle def _resolve_key( preset_throttle: dict[str, Any] | None, mail_throttle: dict[str, Any] | None, key: str, default: Any, ) -> Any: """Cascade lookup for a single key (preset > mail > default).""" if preset_throttle is not None and key in preset_throttle: return preset_throttle[key] if mail_throttle is not None and key in mail_throttle: return mail_throttle[key] return default def _resolve_source( preset_throttle: dict[str, Any] | None, mail_throttle: dict[str, Any] | None, ) -> str: """Identify which cascade level provided values (for the debug log).""" if preset_throttle: return "preset" if mail_throttle: return "mail" return "default" def _read_preset_throttle(mail_cfg: Any, preset_name: str | None) -> dict[str, Any] | None: """Extract ``mail.presets.<name>.throttle`` as a plain dict, or None.""" if mail_cfg is None or preset_name is None or not hasattr(mail_cfg, "get"): return None presets = mail_cfg.get("presets") if presets is None or not hasattr(presets, "get"): return None preset_cfg = presets.get(preset_name) if preset_cfg is None or not hasattr(preset_cfg, "get"): return None raw = preset_cfg.get("throttle") if raw is None or not hasattr(raw, "items"): return None raw_dict = dict(raw.items()) _reject_unknown_throttle_keys(raw_dict, source="preset") return raw_dict def _read_mail_throttle(mail_cfg: Any) -> dict[str, Any] | None: """Extract ``mail.throttle`` as a plain dict, or None.""" if mail_cfg is None or not hasattr(mail_cfg, "get"): return None raw = mail_cfg.get("throttle") if raw is None or not hasattr(raw, "items"): return None raw_dict = dict(raw.items()) _reject_unknown_throttle_keys(raw_dict, source="mail") return raw_dict def _reject_unknown_throttle_keys(raw: dict[str, Any], *, source: str) -> None: """Reject unknown keys in a throttle config dict (anti-typo). Shared by the three entry points: - ``_build_from_dict`` (kwarg ``MailBuilder(throttle={...})``) - ``_read_preset_throttle`` (YAML ``mail.presets.<name>.throttle``) - ``_read_mail_throttle`` (YAML ``mail.throttle``) Args: raw: The dict whose keys must be a subset of ``_VALID_THROTTLE_KEYS``. source: Origin label (``"kwarg"`` / ``"preset"`` / ``"mail"``) included in the error message to help the caller locate the typo. Raises: MailConfigurationError: If ``raw`` contains any key outside ``_VALID_THROTTLE_KEYS``. """ unknown = set(raw.keys()) - _VALID_THROTTLE_KEYS if unknown: if source == "kwarg": msg = f"throttle config has unknown keys: {sorted(unknown)}. Valid keys: {sorted(_VALID_THROTTLE_KEYS)}." else: msg = ( f"YAML mail throttle ({source}) has unknown keys: " f"{sorted(unknown)}. Valid keys: {sorted(_VALID_THROTTLE_KEYS)}." ) raise MailConfigurationError(msg) def _reset_registry() -> None: """Clear the singleton registry. Test helper, not for production use. The throttle is intentionally a kill switch: in production, the registry persists across :func:`kstlib.config.clear_config` calls so that a buggy caller cannot bypass the throttle by reloading configuration. This helper is exposed for unit tests only. """ with _registry_lock: _throttle_registry.clear() __all__ = [ "MailThrottle", "OnExceedMode", "get_or_create_throttle", ]