Source code for kstlib.secrets.sensitive

"""Helpers that minimise the footprint of decrypted secrets.

The :func:`sensitive` context manager temporarily exposes a secret value and
then attempts to scrub it from memory, clear provider caches, and drop any
remaining references.
"""

from __future__ import annotations

import logging
from collections.abc import Callable, Iterator, Mapping, Sequence
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, Protocol, cast

if TYPE_CHECKING:
    from kstlib.secrets.models import SecretRecord

logger = logging.getLogger(__name__)

LegacyPurge = Callable[[], None]


[docs] class CachePurgeProtocol(Protocol): # pylint: disable=too-few-public-methods """Protocol implemented by providers exposing a cache purge hook."""
[docs] def purge_cache(self, *, path: str | Path | None = None) -> None: # pragma: no cover - protocol stub """Clear cached decrypted material."""
[docs] @contextmanager def sensitive( record: SecretRecord, *, providers: Sequence[CachePurgeProtocol] | None = None, ) -> Iterator[Any]: """Temporarily expose a secret and scrub it afterwards. The context manager yields the secret value so it can be used within the protected block. On exit it attempts to overwrite mutable buffers in place, clears provider caches when available, and replaces the value stored in the :class:`SecretRecord` with ``None`` to break lingering references. Example: >>> from kstlib.secrets.models import SecretRecord, SecretSource >>> from kstlib.secrets.sensitive import sensitive >>> record = SecretRecord(value=bytearray(b"api-token"), source=SecretSource.SOPS) >>> with sensitive(record) as secret: ... secret[:3] = b"***" # handle the secret >>> record.value is None True Args: record: Secret wrapped in a :class:`SecretRecord`. providers: Optional providers whose caches should be purged after use. Yields: The decrypted secret value. """ try: yield record.value finally: _scrub_value(record.value) record.value = None metadata = record.metadata if isinstance(record.metadata, Mapping) else {} _purge_providers(providers, metadata=metadata)
def _scrub_value(value: Any) -> None: """Best-effort scrubbing for mutable buffers.""" if value is None: return if isinstance(value, bytearray): value[:] = b"\x00" * len(value) return if isinstance(value, memoryview): if not value.readonly: value[:] = b"\x00" * len(value) value.release() return if hasattr(value, "clear") and hasattr(value, "__setitem__"): try: length = len(value) except TypeError: # pragma: no cover - objects without __len__ length = 0 for index in range(length): if _try_assign(value, index, 0): continue _try_assign(value, index, None) with suppress(AttributeError, TypeError, ValueError): value.clear() def _purge_providers( providers: Sequence[CachePurgeProtocol] | None, *, metadata: Mapping[str, Any] | None, ) -> None: """Invoke cache purge hooks for the supplied providers.""" if not providers: return path_hint: str | Path | None = None if metadata: candidate = metadata.get("path") if isinstance(candidate, str | Path): path_hint = candidate for provider in providers: purge = getattr(provider, "purge_cache", None) if not callable(purge): continue try: purge(path=path_hint) except TypeError: legacy_purge = cast("LegacyPurge", purge) legacy_purge() except (AttributeError, OSError, RuntimeError, ValueError) as error: # pragma: no cover - defensive logging logger.debug("Provider cache purge failed for %s", provider, exc_info=error) def _try_assign(target: Any, index: int, replacement: Any) -> bool: """Attempt to assign ``replacement`` at ``index`` and report success.""" try: target[index] = replacement except (AttributeError, IndexError, KeyError, TypeError, ValueError): return False return True __all__ = ["CachePurgeProtocol", "sensitive"]