"""SOPS decryption support for configuration files.
This module provides transparent SOPS decryption for configuration files
with .sops.yml, .sops.yaml, .sops.json, or .sops.toml extensions.
Features:
- Automatic detection of SOPS files by extension
- LRU cache with mtime-based invalidation
- Graceful degradation when SOPS is not available
- Warning detection for unencrypted ENC[...] values
Example:
>>> from kstlib.config.sops import is_sops_file, get_decryptor
>>> from pathlib import Path
>>> is_sops_file(Path("secrets.sops.yml"))
True
>>> is_sops_file(Path("config.yml"))
False
"""
from __future__ import annotations
import logging
import pathlib
import shutil
import subprocess
from collections import OrderedDict
from typing import Any
from kstlib.config.exceptions import ConfigSopsError, ConfigSopsNotAvailableError
from kstlib.limits import (
DEFAULT_MAX_SOPS_CACHE_ENTRIES,
HARD_MAX_SOPS_CACHE_ENTRIES,
)
logger = logging.getLogger(__name__)
def _log_trace(msg: str, *args: object) -> None:
"""Log at TRACE level (custom level 5, below DEBUG).
Uses a lazy import to avoid the circular import chain
``kstlib.logging.manager -> kstlib.config -> kstlib.config.sops``.
"""
from kstlib.logging import TRACE_LEVEL
logger.log(TRACE_LEVEL, msg, *args)
SOPS_FILE_PATTERNS: tuple[str, ...] = (
".sops.yml",
".sops.yaml",
".sops.json",
".sops.toml",
)
ENC_MARKER = "ENC[AES256_GCM,"
def is_sops_file(path: pathlib.Path) -> bool:
"""Check if file should be decrypted via SOPS based on extension.
Args:
path: Path to the configuration file.
Returns:
True if the file has a SOPS extension (.sops.yml, .sops.yaml, etc.).
Examples:
>>> from pathlib import Path
>>> is_sops_file(Path("secrets.sops.yml"))
True
>>> is_sops_file(Path("config.yml"))
False
>>> is_sops_file(Path("data.sops.json"))
True
"""
name = path.name.lower()
return any(name.endswith(ext) for ext in SOPS_FILE_PATTERNS)
def get_real_extension(path: pathlib.Path) -> str:
"""Extract actual format extension, ignoring .sops prefix.
For SOPS files like 'secrets.sops.yml', returns '.yml'.
For non-SOPS files, returns the normal suffix.
Args:
path: Path to the configuration file.
Returns:
The real format extension (e.g., '.yml', '.json', '.toml').
Examples:
>>> from pathlib import Path
>>> get_real_extension(Path("secrets.sops.yml"))
'.yml'
>>> get_real_extension(Path("config.sops.json"))
'.json'
>>> get_real_extension(Path("normal.yml"))
'.yml'
"""
name = path.name.lower()
for marker in (".sops", ".enc"):
if marker in name:
idx = name.rfind(marker)
return name[idx + len(marker) :]
return path.suffix.lower()
#: Maximum recursion depth for has_encrypted_values to prevent stack overflow.
_MAX_SCAN_DEPTH = 32
def has_encrypted_values(data: Any, path: str = "", *, _depth: int = 0) -> list[str]:
"""Recursively find keys containing ENC[AES256_GCM,...] values.
This function detects SOPS-encrypted values that were not decrypted,
typically because the file was loaded without SOPS decryption.
Args:
data: The parsed configuration data to inspect.
path: Current key path (for recursion, start with empty string).
_depth: Internal recursion counter (do not set manually).
Returns:
List of dotted key paths containing encrypted values.
Examples:
>>> has_encrypted_values({"key": "ENC[AES256_GCM,data...]"})
['key']
>>> has_encrypted_values({"db": {"password": "ENC[AES256_GCM,...]"}})
['db.password']
>>> has_encrypted_values({"normal": "value"})
[]
"""
if _depth > _MAX_SCAN_DEPTH:
return []
found: list[str] = []
if isinstance(data, str) and ENC_MARKER in data:
found.append(path or "<root>")
elif isinstance(data, dict):
for k, v in data.items():
found.extend(has_encrypted_values(v, f"{path}.{k}" if path else k, _depth=_depth + 1))
elif isinstance(data, list):
for i, item in enumerate(data):
found.extend(has_encrypted_values(item, f"{path}[{i}]", _depth=_depth + 1))
return found
[docs]
class SopsDecryptor:
"""Lightweight SOPS decryptor with LRU cache.
This class provides SOPS file decryption with:
- Configurable binary path
- LRU cache with mtime-based invalidation
- Clear error messages for troubleshooting
Attributes:
binary: Name or path of the SOPS binary.
max_cache: Maximum cache entries (clamped to hard limit).
Examples:
>>> decryptor = SopsDecryptor() # doctest: +SKIP
>>> content = decryptor.decrypt_file(Path("secrets.sops.yml")) # doctest: +SKIP
"""
[docs]
def __init__(
self,
binary: str = "sops",
max_cache_entries: int = DEFAULT_MAX_SOPS_CACHE_ENTRIES,
) -> None:
"""Initialize the SOPS decryptor.
Args:
binary: Name or path of the SOPS binary.
max_cache_entries: Maximum number of cached decrypted files.
"""
self._binary = binary
self._max_cache = min(max_cache_entries, HARD_MAX_SOPS_CACHE_ENTRIES)
self._cache: OrderedDict[pathlib.Path, tuple[float, str]] = OrderedDict()
@property
def binary(self) -> str:
"""Return the configured SOPS binary name."""
return self._binary
@property
def max_cache(self) -> int:
"""Return the maximum cache size."""
return self._max_cache
[docs]
def decrypt_file(self, path: pathlib.Path) -> str:
"""Decrypt a SOPS-encrypted file and return content as string.
Args:
path: Path to the SOPS-encrypted file.
Returns:
Decrypted file content as a string.
Raises:
ConfigSopsNotAvailableError: If SOPS binary is not found.
ConfigSopsError: If decryption fails.
"""
resolved = path.resolve()
mtime = resolved.stat().st_mtime
# Cache hit?
cached = self._cache.get(resolved)
if cached and cached[0] == mtime:
self._cache.move_to_end(resolved)
_log_trace("SOPS cache hit for: %s", path.name)
return cached[1]
# Security: reject absolute/relative paths to prevent binary override via config
if "/" in self._binary or "\\" in self._binary:
raise ConfigSopsError(f"SOPS binary must be a simple name (not a path): {self._binary!r}")
# Find binary
binary_path = shutil.which(self._binary)
if binary_path is None:
raise ConfigSopsNotAvailableError(
f"SOPS binary '{self._binary}' not found in PATH. Install from https://github.com/getsops/sops"
)
# Decrypt - binary_path is validated via shutil.which()
# resolved is a Path object from user config (trusted source)
result = subprocess.run(
[binary_path, "--decrypt", str(resolved)],
capture_output=True,
text=True,
check=False,
timeout=30,
)
if result.returncode != 0:
raise ConfigSopsError(f"Failed to decrypt '{path.name}': {result.stderr.strip()}")
content = result.stdout
# Update cache with LRU eviction
self._cache[resolved] = (mtime, content)
self._cache.move_to_end(resolved)
while len(self._cache) > self._max_cache:
self._cache.popitem(last=False)
logger.debug("SOPS decrypted and cached: %s", path.name)
return content
[docs]
def purge_cache(self, path: pathlib.Path | None = None) -> None:
"""Clear cache entries.
Args:
path: If provided, only clear this specific path.
If None, clear all cached entries.
"""
if path is None:
self._cache.clear()
logger.debug("SOPS cache cleared")
else:
removed = self._cache.pop(path.resolve(), None)
if removed:
_log_trace("SOPS cache entry removed: %s", path.name)
@property
def cache_size(self) -> int:
"""Return the current number of cached entries."""
return len(self._cache)
# Global singleton
_decryptor: SopsDecryptor | None = None
[docs]
def get_decryptor(binary: str = "sops") -> SopsDecryptor:
"""Get or create global SOPS decryptor singleton.
Args:
binary: SOPS binary name (only used on first call).
Returns:
The global SopsDecryptor instance.
Examples:
>>> decryptor = get_decryptor() # doctest: +SKIP
>>> content = decryptor.decrypt_file(path) # doctest: +SKIP
"""
global _decryptor
if _decryptor is None:
_decryptor = SopsDecryptor(binary=binary)
return _decryptor
def reset_decryptor() -> None:
"""Reset the global decryptor singleton (for testing)."""
global _decryptor
_decryptor = None
__all__ = [
"ENC_MARKER",
"SOPS_FILE_PATTERNS",
"SopsDecryptor",
"get_decryptor",
"get_real_extension",
"has_encrypted_values",
"is_sops_file",
"reset_decryptor",
]