Source code for kstlib.secure.fs

"""Filesystem guardrails utilities for securing file access.

Example:
    Basic usage with a temporary directory::

        >>> import tempfile
        >>> from kstlib.secure import PathGuardrails, STRICT_POLICY
        >>> with tempfile.TemporaryDirectory() as tmpdir:
        ...     guard = PathGuardrails(tmpdir, policy=STRICT_POLICY)
        ...     guard.root.is_dir()
        True

"""

from __future__ import annotations

import logging
import os
import stat
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Final

from kstlib.config.exceptions import KstlibError
from kstlib.secure.permissions import DirectoryPermissions

log = logging.getLogger(__name__)

__all__ = [
    "RELAXED_POLICY",
    "STRICT_POLICY",
    "GuardPolicy",
    "PathGuardrails",
    "PathSecurityError",
]


[docs] class PathSecurityError(KstlibError, RuntimeError): """Raised when filesystem guardrails detect a security violation."""
[docs] @dataclass(frozen=True, slots=True) class GuardPolicy: """Configuration values defining how guardrails behave. Attributes: name: Human-friendly label used for diagnostics. allow_external: When True, paths outside the root are accepted. auto_create_root: Automatically create the root directory when missing. enforce_permissions: Whether POSIX permissions should be validated. max_permission_octal: Maximum allowed permission mask (defaults to PRIVATE). Example: >>> from kstlib.secure import GuardPolicy >>> policy = GuardPolicy(name="custom", allow_external=False) >>> policy.name 'custom' """ name: str allow_external: bool = False auto_create_root: bool = True enforce_permissions: bool = True max_permission_octal: int = DirectoryPermissions.PRIVATE # 0o700
STRICT_POLICY: Final[GuardPolicy] = GuardPolicy(name="strict") RELAXED_POLICY: Final[GuardPolicy] = GuardPolicy( name="relaxed", allow_external=False, auto_create_root=True, enforce_permissions=False, max_permission_octal=0o777, # No restrictions )
[docs] class PathGuardrails: """Validate and resolve paths relative to a trusted root. Example: >>> import tempfile >>> from kstlib.secure import PathGuardrails, RELAXED_POLICY >>> with tempfile.TemporaryDirectory() as tmpdir: ... guard = PathGuardrails(tmpdir, policy=RELAXED_POLICY) ... guard.policy.name 'relaxed' """
[docs] def __init__(self, root: str | Path, *, policy: GuardPolicy = STRICT_POLICY) -> None: """Initialise guardrails rooted at *root* while enforcing *policy*. Raises: PathSecurityError: If root does not exist or is not a directory. """ self._policy = policy expanded = Path(root).expanduser() if policy.auto_create_root: expanded.mkdir(parents=True, exist_ok=True, mode=0o755) self._root = expanded.resolve() if not self._root.exists(): raise PathSecurityError(f"Guardrail root does not exist: {self._root}") if not self._root.is_dir(): raise PathSecurityError(f"Guardrail root must be a directory: {self._root}") self._harden_permissions(self._root) self._validate_permissions(self._root) log.debug( "PathGuardrails initialized at %s (allow_external=%s, enforce_permissions=%s)", self._root, policy.allow_external, policy.enforce_permissions, )
@property def root(self) -> Path: """Return the resolved guardrail root directory.""" return self._root @property def policy(self) -> GuardPolicy: """Return the policy associated with the guardrails.""" return self._policy
[docs] def resolve_file(self, candidate: str | Path) -> Path: """Resolve *candidate* and ensure it points to an existing file. Raises: PathSecurityError: If path is not a file or is outside root. """ path = self._resolve(candidate) if not path.is_file(): raise PathSecurityError(f"Expected file path but found: {path}") return path
[docs] def resolve_directory(self, candidate: str | Path) -> Path: """Resolve *candidate* and ensure it points to an existing directory. Raises: PathSecurityError: If path is not a directory or is outside root. """ path = self._resolve(candidate) if not path.is_dir(): raise PathSecurityError(f"Expected directory path but found: {path}") return path
[docs] def resolve_path(self, candidate: str | Path) -> Path: """Resolve *candidate* relative to the guardrail root without type checks.""" return self._resolve(candidate, require_exists=False)
[docs] def relax(self, *, allow_external: bool | None = None) -> PathGuardrails: """Return a new guardrail instance with adjusted external allowances.""" new_policy = replace( self._policy, allow_external=self._policy.allow_external if allow_external is None else allow_external, ) return PathGuardrails(self._root, policy=new_policy)
def _resolve(self, candidate: str | Path, *, require_exists: bool = True) -> Path: path = Path(candidate).expanduser() if not path.is_absolute(): path = self._root / path resolved = path.resolve() self._ensure_within_root(resolved) if require_exists and not resolved.exists(): raise PathSecurityError(f"Resolved path does not exist: {resolved}") return resolved def _ensure_within_root(self, path: Path) -> None: if self._policy.allow_external: return if os.name == "nt" and self._root.drive and path.drive.lower() != self._root.drive.lower(): # Log the attempt before raising so SOC / SIEM has visibility # (Trit/Michel decision Q4: no silent drop on security events). log.warning("[SECURITY] Drive escape attempt: candidate=%s (root=%s)", path, self._root) raise PathSecurityError(f"Path is on a different drive: {path}") try: path.relative_to(self._root) except ValueError as exc: log.warning("[SECURITY] Path traversal attempt: candidate=%s (root=%s)", path, self._root) raise PathSecurityError(f"Path escapes guardrail root: {path}") from exc def _validate_permissions(self, directory: Path) -> None: if not self._policy.enforce_permissions: return if os.name != "posix": return # POSIX-only path - tested with real POSIX tests on Linux/macOS mode = directory.stat().st_mode # pragma: no cover - POSIX only if stat.S_IMODE(mode) & ~self._policy.max_permission_octal: # pragma: no cover - POSIX only raise PathSecurityError( # pragma: no cover - POSIX only f"Directory {directory} exceeds allowed permissions {oct(self._policy.max_permission_octal)}" ) def _harden_permissions(self, directory: Path) -> None: if not self._policy.enforce_permissions: return if os.name != "posix": return # POSIX-only path - tested with real POSIX tests on Linux/macOS current_mode = stat.S_IMODE(directory.stat().st_mode) # pragma: no cover - POSIX only allowed_mask = self._policy.max_permission_octal # pragma: no cover - POSIX only if current_mode & ~allowed_mask == 0: # pragma: no cover - POSIX only return # pragma: no cover - POSIX only desired_mode = current_mode & allowed_mask # pragma: no cover - POSIX only try: # pragma: no cover - POSIX only directory.chmod(desired_mode) # pragma: no cover - POSIX only except PermissionError as exc: # pragma: no cover - POSIX only, env-specific raise PathSecurityError( f"Unable to adjust permissions for {directory}; requires <= {oct(allowed_mask)}" ) from exc