Source code for kstlib.secrets.resolver

"""SecretResolver orchestrates provider lookups."""

from __future__ import annotations

import logging
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from kstlib.config.loader import get_config
from kstlib.secrets.exceptions import SecretNotFoundError
from kstlib.secrets.models import SecretRecord, SecretRequest, SecretSource
from kstlib.secrets.providers import configure_provider, get_provider

if TYPE_CHECKING:
    from kstlib.secrets.providers.base import SecretProvider

logger = logging.getLogger(__name__)


[docs] class SecretResolver: """Resolve secrets by delegating to a sequence of providers. The resolver iterates through providers in order until one returns a value. If no provider can resolve the secret and no default is given, a ``SecretNotFoundError`` is raised (when ``required=True``). Example: >>> from kstlib.secrets.resolver import SecretResolver >>> from kstlib.secrets.providers import get_provider >>> from kstlib.secrets.models import SecretRequest >>> resolver = SecretResolver([get_provider("environment")], name="app") >>> # resolver.resolve(SecretRequest(name="API_KEY")) """
[docs] def __init__(self, providers: Sequence[SecretProvider], *, name: str | None = None) -> None: """Initialise the resolver with a provider cascade. Args: providers: Ordered sequence of secret providers to query. name: Human-readable name for this resolver (used in error messages). """ self._providers = list(providers) self._name = name or "default"
@property def name(self) -> str: """Return the resolver name.""" return self._name
[docs] def resolve(self, request: SecretRequest) -> SecretRecord: """Resolve the secret using the configured provider cascade. Args: request: The secret request to resolve. Returns: A SecretRecord with the resolved value and metadata. Raises: SecretNotFoundError: If the secret is not found and required=True. """ for provider in self._providers: record = provider.resolve(request) if record is not None: # Identifier + source only; never log request.default nor # record.value (the latter is repr-suppressed anyway). logger.debug( "Resolved secret '%s' from source=%s (resolver=%s)", request.name, record.source.value, self._name, ) return record if request.default is not None: return self._default_record(request.default, is_async=False) if request.required: raise SecretNotFoundError(f"Secret '{request.name}' not found in resolver '{self._name}'") return self._default_record(None, is_async=False)
[docs] async def resolve_async(self, request: SecretRequest) -> SecretRecord: """Async counterpart for ``resolve``. Args: request: The secret request to resolve. Returns: A SecretRecord with the resolved value and metadata. Raises: SecretNotFoundError: If the secret is not found and required=True. """ for provider in self._providers: record = await provider.resolve_async(request) if record is not None: logger.debug( "Resolved secret '%s' from source=%s (resolver=%s, async)", request.name, record.source.value, self._name, ) return record if request.default is not None: return self._default_record(request.default, is_async=True) if request.required: raise SecretNotFoundError(f"Secret '{request.name}' not found in resolver '{self._name}'") return self._default_record(None, is_async=True)
def _default_record(self, value: Any, *, is_async: bool) -> SecretRecord: metadata: dict[str, Any] = {"resolver": self._name} if is_async: metadata["async"] = True return SecretRecord(value=value, source=SecretSource.DEFAULT, metadata=metadata)
[docs] def get_secret_resolver( config: Mapping[str, Any] | None = None, *, secrets: Mapping[str, Any] | None = None, ) -> SecretResolver: """Build a resolver from configuration mapping. When no explicit provider list is given, the default cascade is: ``(KwargsProvider if secrets) -> EnvironmentProvider -> KeyringProvider -> (SOPSProvider if configured)``. Args: config: Optional mapping with ``providers`` list and/or ``sops`` settings. When ``None``, uses the default provider chain. secrets: Optional mapping of secret overrides to inject via KwargsProvider. Returns: Configured ``SecretResolver`` instance. Example: >>> from kstlib.secrets.resolver import get_secret_resolver >>> resolver = get_secret_resolver() # uses defaults >>> resolver.name 'default' """ config = config or {} providers: list[SecretProvider] = [] # KwargsProvider always comes first (highest priority) if secrets: providers.append(get_provider("kwargs", secrets=secrets)) provider_configs = config.get("providers", []) if not provider_configs: providers.extend([get_provider("environment"), get_provider("keyring")]) sops_config = config.get("sops") if isinstance(sops_config, Mapping): providers.append(_build_sops_provider(sops_config)) else: for provider_cfg in provider_configs: name = provider_cfg.get("name") if not name: raise ValueError("Provider configuration requires a 'name' field") settings = provider_cfg.get("settings") provider = get_provider(name, **(provider_cfg.get("options") or {})) providers.append(configure_provider(provider, settings)) return SecretResolver(providers, name=config.get("name"))
[docs] def resolve_secret( name: str, *, config: Mapping[str, Any] | None = None, secrets: Mapping[str, Any] | None = None, **request_kwargs: Any, ) -> SecretRecord: """Resolve a secret by name using the global resolver cascade. Args: name: Identifier of the secret (``"smtp.password"`` for example). config: Optional configuration mapping describing providers. When not provided the function attempts to reuse the globally loaded config. secrets: Optional mapping of secret overrides. These take precedence over all other providers (useful for testing). request_kwargs: Additional keyword arguments forwarded to :class:`SecretRequest`. Supported keys are ``scope``, ``required``, ``default`` and ``metadata``. Returns: A ``SecretRecord`` describing the resolved secret and its provenance. Raises: SecretNotFoundError: If the secret is not found and required=True. TypeError: If unsupported keyword arguments are provided. Example: >>> from kstlib.secrets.resolver import resolve_secret >>> # Override for testing >>> record = resolve_secret("api.key", secrets={"api.key": "test-value"}) >>> record.source <SecretSource.KWARGS: 'kwargs'> """ if config is None: global_config = get_config() secrets_config = getattr(global_config, "secrets", None) config = secrets_config.to_dict() if secrets_config is not None else None allowed_keys = {"scope", "required", "default", "metadata"} unexpected = set(request_kwargs) - allowed_keys if unexpected: unexpected_list = ", ".join(sorted(unexpected)) raise TypeError(f"Unsupported keyword arguments: {unexpected_list}") resolver = get_secret_resolver(config, secrets=secrets) scope = request_kwargs.get("scope") required = request_kwargs.get("required", True) default = request_kwargs.get("default") metadata = request_kwargs.get("metadata") request = SecretRequest( name=name, scope=scope, required=required, default=default, metadata=dict(metadata) if metadata else {}, ) return resolver.resolve(request)
def _build_sops_provider(config: Mapping[str, Any]) -> SecretProvider: """Instantiate a SOPS provider from a simple mapping.""" option_keys = {"path", "binary", "document_format", "format"} raw_options = config.get("options") options = ( {key: value for key, value in config.items() if key in option_keys} if raw_options is None else dict(raw_options) ) if "format" in options and "document_format" not in options: options["document_format"] = options.pop("format") settings = config.get("settings") provider = get_provider("sops", **options) return configure_provider(provider, settings) __all__ = ["SecretResolver", "get_secret_resolver"]