Source code for kstlib.pipeline.steps.callable

"""Callable step executor for pipeline.

Imports and calls a Python function directly using ``importlib``.
The callable target format is ``module.path:function_name``.

Note:
    Callable steps do not support timeout natively. If timeout control
    is needed, use a shell or python step instead.

"""

from __future__ import annotations

import importlib
import logging
import time

from kstlib.pipeline.exceptions import PipelineConfigError, StepImportError
from kstlib.pipeline.models import StepConfig, StepResult, StepStatus

logger = logging.getLogger(__name__)

#: Modules that are ALWAYS rejected by CallableStep regardless of the
#: caller-supplied ``allowed_modules`` whitelist. These modules provide
#: unrestricted OS or interpreter access and must never be importable
#: via a YAML-driven pipeline config.
DANGEROUS_MODULES: frozenset[str] = frozenset(
    {
        "os",
        "sys",
        "subprocess",
        "builtins",
        "ctypes",
        "posix",
        "nt",
        "shutil",
        "importlib",
        "pickle",
        "marshal",
        "code",
        "compile",
        "__main__",
    }
)


[docs] class CallableStep: """Execute a Python callable as a pipeline step. Parses the ``callable`` target as ``module.path:function_name``, imports the module, and calls the function. The return value is captured in ``StepResult.return_value``. Security enforcement (defense-in-depth): - The root module is always checked against :data:`DANGEROUS_MODULES`. - When ``allowed_modules`` is provided, the module must match the whitelist (prefix match, identical to transform.chain pattern). - When ``allowed_modules`` is ``None``, only the blacklist applies (backward compatible default). Args: allowed_modules: Optional whitelist of module paths permitted as callable targets. When ``None``, no whitelist is enforced (blacklist still applies). Examples: >>> from kstlib.pipeline.models import StepConfig, StepType >>> step = CallableStep() >>> config = StepConfig( ... name="process", ... type=StepType.CALLABLE, ... callable="json:dumps", ... ) >>> result = step.execute(config) # doctest: +SKIP """
[docs] def __init__(self, allowed_modules: tuple[str, ...] | None = None) -> None: """Initialize CallableStep with an optional allow-list of modules.""" self._allowed_modules = allowed_modules
[docs] def execute( self, config: StepConfig, *, dry_run: bool = False, ) -> StepResult: """Execute a Python callable. Args: config: Step configuration with callable target, args, etc. dry_run: If True, log the callable without executing it. Returns: StepResult with return_value, duration, and status. Raises: PipelineConfigError: If the target module is blacklisted or not present in the configured whitelist. """ target = config.callable or "" logger.debug("CallableStep '%s': target=%r", config.name, target) if dry_run: logger.info( "[DRY RUN] CallableStep '%s': %s", config.name, target, ) return StepResult( name=config.name, status=StepStatus.SKIPPED, stdout=f"[dry-run] would call: {target}", ) # Parse target module_path, _, func_name = target.rpartition(":") if not module_path or not func_name: return StepResult( name=config.name, status=StepStatus.FAILED, error=f"Invalid callable target: {target!r}", ) # Security gates (before any import) root_module = module_path.split(".")[0] if root_module in DANGEROUS_MODULES: raise PipelineConfigError( f"Step '{config.name}': module '{module_path}' is in DANGEROUS_MODULES " f"blacklist and cannot be used as a callable target" ) if self._allowed_modules is not None and not any( module_path == allowed or module_path.startswith(f"{allowed}.") for allowed in self._allowed_modules ): raise PipelineConfigError( f"Step '{config.name}': module '{module_path}' is not in allowed_callable_modules" ) # Import and resolve function try: module = importlib.import_module(module_path) func = getattr(module, func_name) except (ImportError, AttributeError) as exc: logger.exception( "CallableStep '%s' import error", config.name, ) raise StepImportError(config.name, target) from exc # Call the function start = time.monotonic() try: return_value = func(*config.args) duration = time.monotonic() - start logger.debug( "CallableStep '%s' completed in %.3fs", config.name, duration, ) return StepResult( name=config.name, status=StepStatus.SUCCESS, return_value=return_value, duration=duration, ) except Exception as exc: duration = time.monotonic() - start logger.exception( "CallableStep '%s' execution error", config.name, ) return StepResult( name=config.name, status=StepStatus.FAILED, duration=duration, error=str(exc), )
__all__ = [ "DANGEROUS_MODULES", "CallableStep", ]