Source code for kstlib.pipeline.runner

"""Pipeline runner for sequential step execution.

Provides the ``PipelineRunner`` class that executes a sequence of steps
with support for conditional execution, error policies, dry-run mode,
and config-driven pipeline definitions.
"""

from __future__ import annotations

import logging
import time
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

from kstlib.pipeline.exceptions import (
    PipelineAbortedError,
    PipelineConfigError,
)
from kstlib.pipeline.models import (
    ErrorPolicy,
    PipelineConfig,
    PipelineResult,
    StepCondition,
    StepConfig,
    StepResult,
    StepStatus,
    StepType,
)
from kstlib.pipeline.steps.callable import CallableStep
from kstlib.pipeline.steps.python import PythonStep
from kstlib.pipeline.steps.shell import ShellStep

if TYPE_CHECKING:
    from kstlib.pipeline.base import AbstractStep

logger = logging.getLogger(__name__)


def _build_executors(config: PipelineConfig) -> dict[StepType, AbstractStep]:
    """Build the step-type to executor mapping for a given pipeline config.

    CallableStep is constructed with the pipeline's optional
    ``allowed_callable_modules`` whitelist so the same runner-wide
    security policy applies to every callable step.
    """
    return {
        StepType.SHELL: ShellStep(),
        StepType.PYTHON: PythonStep(),
        StepType.CALLABLE: CallableStep(allowed_modules=config.allowed_callable_modules),
    }


[docs] class PipelineRunner: """Execute a pipeline of sequential steps. Supports conditional execution (``when``), error policies (``fail_fast`` / ``continue``), timeout cascading, and dry-run mode. Args: config: Pipeline configuration. Examples: Build a pipeline programmatically: >>> from kstlib.pipeline.models import ( ... PipelineConfig, StepConfig, StepType, ... ) >>> config = PipelineConfig( ... name="demo", ... steps=( ... StepConfig(name="greet", type=StepType.SHELL, command="echo hello"), ... ), ... ) >>> runner = PipelineRunner(config) >>> result = runner.run() # doctest: +SKIP Load from ``kstlib.conf.yml``: >>> runner = PipelineRunner.from_config("morning") # doctest: +SKIP >>> result = runner.run() # doctest: +SKIP """
[docs] def __init__(self, config: PipelineConfig) -> None: """Initialize PipelineRunner. Args: config: Pipeline configuration with steps and policies. """ self._config = config self._executors = _build_executors(config)
@property def config(self) -> PipelineConfig: """Return the pipeline configuration.""" return self._config
[docs] @classmethod def from_config( cls, name: str, **overrides: Any, ) -> PipelineRunner: """Create a PipelineRunner from ``kstlib.conf.yml``. Loads the pipeline definition from ``pipeline.pipelines.<name>`` in the global configuration. Args: name: Pipeline name as defined in config. **overrides: Override pipeline-level settings (e.g. ``on_error``). Returns: Configured PipelineRunner instance. Raises: PipelineConfigError: If the pipeline is not found or invalid. Examples: >>> runner = PipelineRunner.from_config("morning") # doctest: +SKIP """ config_data = _load_pipeline_config(name) # Apply overrides if overrides: config_data = {**config_data, **overrides} pipeline_config = _parse_pipeline_config(name, config_data) return cls(pipeline_config)
[docs] def run(self, *, dry_run: bool = False) -> PipelineResult: """Execute the pipeline. Runs each step sequentially, respecting conditions and error policies. Args: dry_run: If True, simulate execution without side effects. Returns: PipelineResult with all step results and aggregate status. Raises: PipelineAbortedError: If a step fails with ``fail_fast`` policy. """ pipeline_result = PipelineResult(name=self._config.name) has_failure = False start = time.monotonic() logger.info( "Pipeline '%s' started (%d steps, on_error=%s%s)", self._config.name, len(self._config.steps), self._config.on_error.value, ", dry_run=True" if dry_run else "", ) for step_config in self._config.steps: # Check condition if not self._should_execute(step_config, has_failure): # Per-step skip is verbose on long pipelines (50+ steps with # conditional execution); demote from INFO to DEBUG. logger.debug( "Step '%s' skipped (when=%s, has_failure=%s)", step_config.name, step_config.when.value, has_failure, ) pipeline_result.results.append(StepResult(name=step_config.name, status=StepStatus.SKIPPED)) continue # Resolve timeout cascade: step timeout > pipeline default effective_config = step_config if step_config.timeout is None: # Apply pipeline default timeout via a new StepConfig effective_config = _with_timeout(step_config, self._config.default_timeout) # Execute step executor = self._executors[step_config.type] result = executor.execute(effective_config, dry_run=dry_run) pipeline_result.results.append(result) logger.info( "Step '%s' -> %s (%.3fs)", result.name, result.status.value, result.duration, ) # Handle failure if result.status in (StepStatus.FAILED, StepStatus.TIMEOUT): has_failure = True if self._config.on_error == ErrorPolicy.FAIL_FAST: # Execute remaining on_failure steps before aborting self._execute_on_failure_steps(step_config, pipeline_result, dry_run) pipeline_result.duration = time.monotonic() - start raise PipelineAbortedError( result.name, result.error or result.status.value, ) pipeline_result.duration = time.monotonic() - start # Pipeline completion is the canonical "operation tool achieved a # success" event : promote to SUCCESS when all steps passed (kstlib # = lib + tool, SUCCESS is reserved for terminal user-facing wins). # On failure, fall back to INFO so the line still appears at the # default level and operators see the duration. if pipeline_result.success: from kstlib.logging import SUCCESS_LEVEL logger.log( SUCCESS_LEVEL, "Pipeline '%s' completed in %.3fs", self._config.name, pipeline_result.duration, ) else: logger.info( "Pipeline '%s' completed in %.3fs (success=False)", self._config.name, pipeline_result.duration, ) return pipeline_result
def _should_execute( self, step: StepConfig, has_failure: bool, ) -> bool: """Determine if a step should execute based on its condition. Args: step: Step configuration with ``when`` condition. has_failure: Whether any previous step has failed. Returns: True if the step should execute. """ if step.when == StepCondition.ON_SUCCESS: return not has_failure if step.when == StepCondition.ON_FAILURE: return has_failure # StepCondition.ALWAYS (default) return True def _execute_on_failure_steps( self, failed_step: StepConfig, pipeline_result: PipelineResult, dry_run: bool, ) -> None: """Execute remaining steps that have ``when: on_failure``. Called during fail_fast abort to allow cleanup steps to run. Args: failed_step: The step that caused the abort. pipeline_result: Pipeline result to append step results to. dry_run: Whether to simulate execution. """ found = False for step_config in self._config.steps: if step_config.name == failed_step.name: found = True continue if not found: continue if step_config.when == StepCondition.ON_FAILURE: effective_config = step_config if step_config.timeout is None: effective_config = _with_timeout(step_config, self._config.default_timeout) executor = self._executors[step_config.type] result = executor.execute(effective_config, dry_run=dry_run) pipeline_result.results.append(result) logger.info( "Cleanup step '%s' -> %s (%.3fs)", result.name, result.status.value, result.duration, ) else: pipeline_result.results.append(StepResult(name=step_config.name, status=StepStatus.SKIPPED))
# ============================================================================ # Config helpers # ============================================================================ def _load_pipeline_config(name: str) -> dict[str, Any]: """Load a pipeline definition from global config. Args: name: Pipeline name. Returns: Raw config dict for the pipeline. Raises: PipelineConfigError: If the pipeline is not found. """ # Lazy imports to avoid circular dependencies try: from kstlib.config import get_config # pylint: disable=import-outside-toplevel except ImportError as exc: raise PipelineConfigError("kstlib.config is required for config-driven pipelines") from exc raw_config: Mapping[str, Any] = get_config() pipeline_section: Mapping[str, Any] = raw_config.get("pipeline", {}) pipelines: Mapping[str, Any] = pipeline_section.get("pipelines", {}) if name not in pipelines: available = ", ".join(sorted(pipelines.keys())) or "(none)" raise PipelineConfigError(f"Pipeline '{name}' not found in config. Available: {available}") raw = pipelines[name] if not isinstance(raw, Mapping): raise PipelineConfigError(f"Pipeline '{name}' must be a mapping, got {type(raw).__name__}") return dict(raw) def _parse_pipeline_config( name: str, data: dict[str, Any], ) -> PipelineConfig: """Parse raw config data into a PipelineConfig. Args: name: Pipeline name. data: Raw config dict. Returns: Validated PipelineConfig. Raises: PipelineConfigError: If config is invalid. """ raw_steps = data.get("steps", []) if not isinstance(raw_steps, list): raise PipelineConfigError(f"Pipeline '{name}': 'steps' must be a list") steps: list[StepConfig] = [] for i, raw_step in enumerate(raw_steps): if not isinstance(raw_step, Mapping): raise PipelineConfigError(f"Pipeline '{name}': step {i} must be a mapping") steps.append(_parse_step_config(name, i, dict(raw_step))) on_error_str = data.get("on_error", "fail_fast") try: on_error = ErrorPolicy(on_error_str) except ValueError: raise PipelineConfigError(f"Pipeline '{name}': invalid on_error {on_error_str!r}") from None default_timeout = data.get("default_timeout", 300.0) try: default_timeout = float(default_timeout) except (TypeError, ValueError): raise PipelineConfigError(f"Pipeline '{name}': invalid default_timeout {default_timeout!r}") from None raw_allowed = data.get("allowed_callable_modules") allowed_callable_modules: tuple[str, ...] | None if raw_allowed is None: allowed_callable_modules = None elif isinstance(raw_allowed, (list, tuple)) and all(isinstance(m, str) for m in raw_allowed): allowed_callable_modules = tuple(raw_allowed) else: raise PipelineConfigError(f"Pipeline '{name}': 'allowed_callable_modules' must be a list of strings") return PipelineConfig( name=name, steps=tuple(steps), on_error=on_error, default_timeout=default_timeout, allowed_callable_modules=allowed_callable_modules, ) def _parse_step_config( pipeline_name: str, index: int, data: dict[str, Any], ) -> StepConfig: """Parse raw step config data into a StepConfig. Args: pipeline_name: Parent pipeline name (for error messages). index: Step index (for error messages). data: Raw step config dict. Returns: Validated StepConfig. Raises: PipelineConfigError: If step config is invalid. """ step_name = data.get("name") if not step_name: raise PipelineConfigError(f"Pipeline '{pipeline_name}': step {index} missing 'name'") raw_type = data.get("type", "shell") try: step_type = StepType(raw_type) except ValueError: raise PipelineConfigError(f"Pipeline '{pipeline_name}': step '{step_name}' invalid type {raw_type!r}") from None raw_when = data.get("when", "always") try: when = StepCondition(raw_when) except ValueError: raise PipelineConfigError(f"Pipeline '{pipeline_name}': step '{step_name}' invalid when {raw_when!r}") from None # Parse args as tuple raw_args = data.get("args", []) if isinstance(raw_args, str): raw_args = [raw_args] args = tuple(str(a) for a in raw_args) # Parse timeout timeout = data.get("timeout") if timeout is not None: try: timeout = float(timeout) except (TypeError, ValueError): raise PipelineConfigError( f"Pipeline '{pipeline_name}': step '{step_name}' invalid timeout {timeout!r}" ) from None return StepConfig( name=step_name, type=step_type, command=data.get("command"), module=data.get("module"), callable=data.get("callable"), args=args, env=data.get("env", {}), working_dir=data.get("working_dir"), timeout=timeout, when=when, ) def _with_timeout(config: StepConfig, timeout: float) -> StepConfig: """Create a copy of a StepConfig with a different timeout. Uses ``object.__setattr__`` since StepConfig is frozen. Args: config: Original step config. timeout: New timeout value. Returns: New StepConfig with the specified timeout. """ new = StepConfig( name=config.name, type=config.type, command=config.command, module=config.module, callable=config.callable, args=config.args, env=config.env, working_dir=config.working_dir, timeout=timeout, when=config.when, ) return new __all__ = [ "PipelineRunner", ]