Pipeline

Declarative, config-driven pipeline execution for sequential workflows.

kstlib.pipeline provides a PipelineRunner that executes a sequence of shell commands, Python modules, and callable functions with support for conditional execution, error policies, timeout cascade, and dry-run mode.

Tip

Pair this reference with Pipeline for the feature guide.

Quick overview

  • PipelineRunner is the main facade for pipeline execution

  • StepConfig and PipelineConfig define the pipeline structure

  • StepResult and PipelineResult capture execution results

  • ShellStep, PythonStep, CallableStep are the step executors

  • Configuration follows standard priority: constructor args > kstlib.conf.yml > defaults

Configuration cascade

The module consults the loaded config for pipeline definitions:

pipeline:
  default_timeout: 300
  on_error: fail_fast
  pipelines:
    morning-monitoring:
      steps:
        - name: check_services
          type: shell
          command: systemctl status nginx
          timeout: 30
        - name: send_report
          type: callable
          callable: my.alerts:send_summary
          when: always

Load a pipeline from config:

from kstlib.pipeline import PipelineRunner

runner = PipelineRunner.from_config("morning-monitoring")
result = runner.run()

Usage patterns

Programmatic pipeline

from kstlib.pipeline import (
    PipelineRunner, PipelineConfig, StepConfig, StepType, ErrorPolicy,
)

config = PipelineConfig(
    name="deploy",
    steps=(
        StepConfig(name="build", type=StepType.SHELL, command="make build"),
        StepConfig(name="test", type=StepType.PYTHON, module="pytest"),
        StepConfig(name="notify", type=StepType.CALLABLE, callable="my.alerts:send_ok"),
    ),
    on_error=ErrorPolicy.FAIL_FAST,
    default_timeout=300,
)
runner = PipelineRunner(config)
result = runner.run()

Dry-run simulation

result = runner.run(dry_run=True)
for step in result.results:
    print(f"{step.name}: {step.stdout}")

Module reference

Declarative pipeline execution for kstlib.

This module provides config-driven pipeline execution for sequential workflows combining shell commands, Python modules, and callable functions.

Pipelines can be defined programmatically or declared in kstlib.conf.yml and executed with error handling, conditional steps, and dry-run support.

Examples

Programmatic pipeline:

>>> from kstlib.pipeline import PipelineRunner, PipelineConfig, StepConfig, StepType
>>> config = PipelineConfig(
...     name="demo",
...     steps=(
...         StepConfig(name="greet", type=StepType.SHELL, command="echo hello"),
...     ),
... )
>>> runner = PipelineRunner(config)
>>> result = runner.run()  

Config-driven pipeline:

>>> runner = PipelineRunner.from_config("morning")  
>>> result = runner.run()  
class kstlib.pipeline.AbstractStep(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the interface for pipeline step executors.

All step implementations (ShellStep, PythonStep, CallableStep) must implement this protocol to ensure consistent behavior across step types.

Examples

>>> def run_step(step: AbstractStep, config: StepConfig) -> StepResult:
...     return step.execute(config)
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a pipeline step.

Parameters:
  • config (StepConfig) – Step configuration with command, env, timeout, etc.

  • dry_run (bool) – If True, simulate execution without side effects.

Returns:

StepResult with status, stdout, stderr, duration, etc.

Return type:

StepResult

__init__(self, *args, **kwargs)
class kstlib.pipeline.CallableStep(allowed_modules=None)[source]

Bases: object

Execute a Python callable as a pipeline step.

Parses the callable target as module.path:function_name, imports the module, and calls the function. The return value is captured in StepResult.return_value.

Security enforcement (defense-in-depth):

  • The root module is always checked against DANGEROUS_MODULES.

  • When allowed_modules is provided, the module must match the whitelist (prefix match, identical to transform.chain pattern).

  • When allowed_modules is None, only the blacklist applies (backward compatible default).

Parameters:

allowed_modules (tuple[str, ...] | None) – Optional whitelist of module paths permitted as callable targets. When None, no whitelist is enforced (blacklist still applies).

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = CallableStep()
>>> config = StepConfig(
...     name="process",
...     type=StepType.CALLABLE,
...     callable="json:dumps",
... )
>>> result = step.execute(config)  
__init__(self, allowed_modules: 'tuple[str, ...] | None' = None) 'None' -> None[source]

Initialize CallableStep with an optional allow-list of modules.

execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a Python callable.

Parameters:
  • config (StepConfig) – Step configuration with callable target, args, etc.

  • dry_run (bool) – If True, log the callable without executing it.

Returns:

StepResult with return_value, duration, and status.

Raises:

PipelineConfigError – If the target module is blacklisted or not present in the configured whitelist.

Return type:

StepResult

class kstlib.pipeline.ErrorPolicy(value)[source]

Bases: str, Enum

Error handling policy for a pipeline.

FAIL_FAST

Abort pipeline on first step failure.

CONTINUE

Continue executing remaining steps after a failure.

FAIL_FAST = 'fail_fast'
CONTINUE = 'continue'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

exception kstlib.pipeline.PipelineAbortedError(step_name, reason)[source]

Bases: PipelineError

Pipeline execution was aborted due to fail_fast policy.

Raised when a step fails and the error policy is fail_fast, causing the remaining steps to be skipped.

step_name

Name of the step that caused the abort.

reason

Description of why the step failed.

__init__(self, step_name: 'str', reason: 'str') 'None' -> None[source]

Initialize PipelineAbortedError.

Parameters:
  • step_name (str) – Name of the step that caused the abort.

  • reason (str) – Description of why the step failed.

class kstlib.pipeline.PipelineConfig(name, steps, on_error=ErrorPolicy.FAIL_FAST, default_timeout=300.0, allowed_callable_modules=None)[source]

Bases: object

Configuration for a complete pipeline.

name

Pipeline name.

Type:

str

steps

Ordered tuple of step configurations.

Type:

tuple[kstlib.pipeline.models.StepConfig, …]

on_error

Error handling policy.

Type:

kstlib.pipeline.models.ErrorPolicy

default_timeout

Default timeout for steps without explicit timeout.

Type:

float

Examples

>>> config = PipelineConfig(
...     name="deploy",
...     steps=(
...         StepConfig(name="build", type=StepType.SHELL, command="make build"),
...         StepConfig(name="test", type=StepType.SHELL, command="make test"),
...     ),
... )
>>> len(config.steps)
2
name: str
steps: tuple[StepConfig, ...]
on_error: ErrorPolicy
default_timeout: float
allowed_callable_modules: tuple[str, ...] | None
__post_init__(self) 'None' -> None[source]

Validate pipeline configuration values.

Raises:

PipelineConfigError – If configuration is invalid.

__init__(self, name: 'str', steps: 'tuple[StepConfig, ...]', on_error: 'ErrorPolicy' = <ErrorPolicy.FAIL_FAST: 'fail_fast'>, default_timeout: 'float' = 300.0, allowed_callable_modules: 'tuple[str, ...] | None' = None) None -> None
exception kstlib.pipeline.PipelineConfigError[source]

Bases: PipelineError, ValueError

Pipeline configuration is invalid.

Raised when the pipeline or step configuration contains invalid values, missing required fields, or constraint violations.

exception kstlib.pipeline.PipelineError[source]

Bases: KstlibError

Base exception for all pipeline module errors.

All pipeline-specific exceptions inherit from this class, allowing for easy catching of any pipeline error.

class kstlib.pipeline.PipelineResult(name, results=<factory>, duration=0.0)[source]

Bases: object

Aggregate result of a pipeline execution.

name

Pipeline name.

Type:

str

results

Ordered list of step results.

Type:

list[kstlib.pipeline.models.StepResult]

duration

Total pipeline execution duration in seconds.

Type:

float

Examples

>>> result = PipelineResult(name="deploy")
>>> result.success
True
name: str
results: list[StepResult]
duration: float
property success: bool

Whether all executed steps succeeded.

Returns:

True if no step has FAILED or TIMEOUT status.

property failed_steps: list[StepResult]

Steps that failed or timed out.

Returns:

List of StepResult with FAILED or TIMEOUT status.

property skipped_steps: list[StepResult]

Steps that were skipped.

Returns:

List of StepResult with SKIPPED status.

__init__(self, name: 'str', results: 'list[StepResult]' = <factory>, duration: 'float' = 0.0) None -> None
class kstlib.pipeline.PipelineRunner(config)[source]

Bases: object

Execute a pipeline of sequential steps.

Supports conditional execution (when), error policies (fail_fast / continue), timeout cascading, and dry-run mode.

Parameters:

config (PipelineConfig) – Pipeline configuration.

Examples

Build a pipeline programmatically:

>>> from kstlib.pipeline.models import (
...     PipelineConfig, StepConfig, StepType,
... )
>>> config = PipelineConfig(
...     name="demo",
...     steps=(
...         StepConfig(name="greet", type=StepType.SHELL, command="echo hello"),
...     ),
... )
>>> runner = PipelineRunner(config)
>>> result = runner.run()  

Load from kstlib.conf.yml:

>>> runner = PipelineRunner.from_config("morning")  
>>> result = runner.run()  
__init__(self, config: 'PipelineConfig') 'None' -> None[source]

Initialize PipelineRunner.

Parameters:

config (PipelineConfig) – Pipeline configuration with steps and policies.

property config: PipelineConfig

Return the pipeline configuration.

classmethod from_config(name: 'str', **overrides: 'Any') 'PipelineRunner' -> PipelineRunner[source]

Create a PipelineRunner from kstlib.conf.yml.

Loads the pipeline definition from pipeline.pipelines.<name> in the global configuration.

Parameters:
  • name (str) – Pipeline name as defined in config.

  • **overrides (Any) – Override pipeline-level settings (e.g. on_error).

Returns:

Configured PipelineRunner instance.

Raises:

PipelineConfigError – If the pipeline is not found or invalid.

Return type:

PipelineRunner

Examples

>>> runner = PipelineRunner.from_config("morning")  
run(self, *, dry_run: 'bool' = False) 'PipelineResult' -> PipelineResult[source]

Execute the pipeline.

Runs each step sequentially, respecting conditions and error policies.

Parameters:

dry_run (bool) – If True, simulate execution without side effects.

Returns:

PipelineResult with all step results and aggregate status.

Raises:

PipelineAbortedError – If a step fails with fail_fast policy.

Return type:

PipelineResult

class kstlib.pipeline.PythonStep[source]

Bases: object

Execute a Python module as a pipeline step.

Uses subprocess.run with [sys.executable, "-m", module, *args] to run a Python module in a subprocess. Does not use shell=True.

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = PythonStep()
>>> config = StepConfig(
...     name="lint",
...     type=StepType.PYTHON,
...     module="ruff",
...     args=("check", "src/"),
... )
>>> result = step.execute(config)  
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a Python module via subprocess.

Parameters:
  • config (StepConfig) – Step configuration with module, args, env, timeout, etc.

  • dry_run (bool) – If True, log the command without executing it.

Returns:

StepResult with captured stdout, stderr, return code, and duration.

Return type:

StepResult

class kstlib.pipeline.ShellStep[source]

Bases: object

Execute a shell command as a pipeline step.

Uses subprocess.run with shell=True to execute the command string. Supports environment variable injection, working directory, and timeout.

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = ShellStep()
>>> config = StepConfig(
...     name="greet",
...     type=StepType.SHELL,
...     command="echo hello",
... )
>>> result = step.execute(config)  
>>> result.status  
<StepStatus.SUCCESS: 'success'>
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a shell command.

Parameters:
  • config (StepConfig) – Step configuration with command, env, timeout, etc.

  • dry_run (bool) – If True, log the command without executing it.

Returns:

StepResult with captured stdout, stderr, return code, and duration.

Return type:

StepResult

class kstlib.pipeline.StepCondition(value)[source]

Bases: str, Enum

Condition for executing a pipeline step.

ALWAYS

Execute the step regardless of previous results.

ON_SUCCESS

Execute only if all previous steps succeeded.

ON_FAILURE

Execute only if at least one previous step failed.

ALWAYS = 'always'
ON_SUCCESS = 'on_success'
ON_FAILURE = 'on_failure'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

class kstlib.pipeline.StepConfig(name, type, command=None, module=None, callable=None, args=(), env=<factory>, working_dir=None, timeout=None, when=StepCondition.ALWAYS)[source]

Bases: object

Configuration for a single pipeline step.

name

Unique step name within the pipeline.

Type:

str

type

Execution mode (shell, python, callable).

Type:

kstlib.pipeline.models.StepType

command

Shell command string (required for shell type).

Type:

str | None

module

Python module to execute (required for python type).

Type:

str | None

callable

Import target module.path:function (required for callable type).

Type:

str | None

args

Arguments passed to the step.

Type:

tuple[str, …]

env

Environment variables to set for the step.

Type:

dict[str, str]

working_dir

Working directory for the step.

Type:

str | None

timeout

Step timeout in seconds (None uses pipeline default).

Type:

float | None

when

Condition for executing this step.

Type:

kstlib.pipeline.models.StepCondition

Examples

>>> config = StepConfig(
...     name="build",
...     type=StepType.SHELL,
...     command="echo hello",
... )
>>> config.name
'build'
>>> config = StepConfig(
...     name="process",
...     type=StepType.PYTHON,
...     module="my.module",
...     args=["--verbose"],
... )
name: str
type: StepType
command: str | None
module: str | None
callable: str | None
args: tuple[str, ...]
env: dict[str, str]
working_dir: str | None
timeout: float | None
when: StepCondition
__post_init__(self) 'None' -> None[source]

Validate step configuration values.

Raises:

PipelineConfigError – If any configuration value is invalid.

__init__(self, name: 'str', type: 'StepType', command: 'str | None' = None, module: 'str | None' = None, callable: 'str | None' = None, args: 'tuple[str, ...]' = (), env: 'dict[str, str]' = <factory>, working_dir: 'str | None' = None, timeout: 'float | None' = None, when: 'StepCondition' = <StepCondition.ALWAYS: 'always'>) None -> None
exception kstlib.pipeline.StepError(step_name, reason)[source]

Bases: PipelineError

A pipeline step failed during execution.

step_name

Name of the step that failed.

reason

Description of the failure.

__init__(self, step_name: 'str', reason: 'str') 'None' -> None[source]

Initialize StepError.

Parameters:
  • step_name (str) – Name of the step that failed.

  • reason (str) – Description of the failure.

exception kstlib.pipeline.StepImportError(step_name, target)[source]

Bases: StepError

Failed to import a callable target for a step.

step_name

Name of the step with the import failure.

target

The import target string that failed.

__init__(self, step_name: 'str', target: 'str') 'None' -> None[source]

Initialize StepImportError.

Parameters:
  • step_name (str) – Name of the step with the import failure.

  • target (str) – The import target string (e.g. “module.path:function”).

class kstlib.pipeline.StepResult(name, status, stdout='', stderr='', return_code=None, return_value=None, duration=0.0, error=None)[source]

Bases: object

Result of a single pipeline step execution.

name

Step name.

Type:

str

status

Execution result status.

Type:

kstlib.pipeline.models.StepStatus

stdout

Standard output captured from the step.

Type:

str

stderr

Standard error captured from the step.

Type:

str

return_code

Process exit code (shell/python steps).

Type:

int | None

return_value

Return value (callable steps).

Type:

object

duration

Execution duration in seconds.

Type:

float

error

Error message if the step failed.

Type:

str | None

Examples

>>> result = StepResult(name="build", status=StepStatus.SUCCESS)
>>> result.status
<StepStatus.SUCCESS: 'success'>
name: str
status: StepStatus
stdout: str
stderr: str
return_code: int | None
return_value: object
duration: float
error: str | None
__init__(self, name: 'str', status: 'StepStatus', stdout: 'str' = '', stderr: 'str' = '', return_code: 'int | None' = None, return_value: 'object' = None, duration: 'float' = 0.0, error: 'str | None' = None) None -> None
class kstlib.pipeline.StepStatus(value)[source]

Bases: str, Enum

Result status of a pipeline step.

SUCCESS

Step completed successfully (exit code 0).

FAILED

Step failed (non-zero exit code or exception).

SKIPPED

Step was skipped due to condition or abort.

TIMEOUT

Step exceeded its timeout limit.

SUCCESS = 'success'
FAILED = 'failed'
SKIPPED = 'skipped'
TIMEOUT = 'timeout'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

exception kstlib.pipeline.StepTimeoutError(step_name, timeout)[source]

Bases: StepError

A pipeline step exceeded its timeout.

step_name

Name of the step that timed out.

timeout

The timeout value in seconds.

__init__(self, step_name: 'str', timeout: 'float') 'None' -> None[source]

Initialize StepTimeoutError.

Parameters:
  • step_name (str) – Name of the step that timed out.

  • timeout (float) – The timeout value in seconds.

class kstlib.pipeline.StepType(value)[source]

Bases: str, Enum

Execution mode for a pipeline step.

SHELL

Execute a shell command via subprocess.run(shell=True).

PYTHON

Execute a Python module via python -m module.

CALLABLE

Import and call a Python function directly.

SHELL = 'shell'
PYTHON = 'python'
CALLABLE = 'callable'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

Runner

Pipeline runner for sequential step execution.

Provides the PipelineRunner class that executes a sequence of steps with support for conditional execution, error policies, dry-run mode, and config-driven pipeline definitions.

class kstlib.pipeline.runner.PipelineRunner(config)[source]

Bases: object

Execute a pipeline of sequential steps.

Supports conditional execution (when), error policies (fail_fast / continue), timeout cascading, and dry-run mode.

Parameters:

config (PipelineConfig) – Pipeline configuration.

Examples

Build a pipeline programmatically:

>>> from kstlib.pipeline.models import (
...     PipelineConfig, StepConfig, StepType,
... )
>>> config = PipelineConfig(
...     name="demo",
...     steps=(
...         StepConfig(name="greet", type=StepType.SHELL, command="echo hello"),
...     ),
... )
>>> runner = PipelineRunner(config)
>>> result = runner.run()  

Load from kstlib.conf.yml:

>>> runner = PipelineRunner.from_config("morning")  
>>> result = runner.run()  
__init__(self, config: 'PipelineConfig') 'None' -> None[source]

Initialize PipelineRunner.

Parameters:

config (PipelineConfig) – Pipeline configuration with steps and policies.

property config: PipelineConfig

Return the pipeline configuration.

classmethod from_config(name: 'str', **overrides: 'Any') 'PipelineRunner' -> PipelineRunner[source]

Create a PipelineRunner from kstlib.conf.yml.

Loads the pipeline definition from pipeline.pipelines.<name> in the global configuration.

Parameters:
  • name (str) – Pipeline name as defined in config.

  • **overrides (Any) – Override pipeline-level settings (e.g. on_error).

Returns:

Configured PipelineRunner instance.

Raises:

PipelineConfigError – If the pipeline is not found or invalid.

Return type:

PipelineRunner

Examples

>>> runner = PipelineRunner.from_config("morning")  
run(self, *, dry_run: 'bool' = False) 'PipelineResult' -> PipelineResult[source]

Execute the pipeline.

Runs each step sequentially, respecting conditions and error policies.

Parameters:

dry_run (bool) – If True, simulate execution without side effects.

Returns:

PipelineResult with all step results and aggregate status.

Raises:

PipelineAbortedError – If a step fails with fail_fast policy.

Return type:

PipelineResult

Models

Data models for the kstlib.pipeline module.

This module defines the core data structures used by the pipeline module:

  • StepType: Enum for step execution mode (shell, python, callable)

  • ErrorPolicy: Enum for pipeline error handling (fail_fast, continue)

  • StepCondition: Enum for conditional step execution (always, on_success, on_failure)

  • StepStatus: Enum for step result status (success, failed, skipped, timeout)

  • StepConfig: Frozen configuration for a single pipeline step

  • PipelineConfig: Frozen configuration for an entire pipeline

  • StepResult: Mutable result of a single step execution

  • PipelineResult: Mutable aggregate result of a pipeline execution

class kstlib.pipeline.models.ErrorPolicy(value)[source]

Bases: str, Enum

Error handling policy for a pipeline.

FAIL_FAST

Abort pipeline on first step failure.

CONTINUE

Continue executing remaining steps after a failure.

FAIL_FAST = 'fail_fast'
CONTINUE = 'continue'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

class kstlib.pipeline.models.PipelineConfig(name, steps, on_error=ErrorPolicy.FAIL_FAST, default_timeout=300.0, allowed_callable_modules=None)[source]

Bases: object

Configuration for a complete pipeline.

name

Pipeline name.

Type:

str

steps

Ordered tuple of step configurations.

Type:

tuple[kstlib.pipeline.models.StepConfig, …]

on_error

Error handling policy.

Type:

kstlib.pipeline.models.ErrorPolicy

default_timeout

Default timeout for steps without explicit timeout.

Type:

float

Examples

>>> config = PipelineConfig(
...     name="deploy",
...     steps=(
...         StepConfig(name="build", type=StepType.SHELL, command="make build"),
...         StepConfig(name="test", type=StepType.SHELL, command="make test"),
...     ),
... )
>>> len(config.steps)
2
name: str
steps: tuple[StepConfig, ...]
on_error: ErrorPolicy
default_timeout: float
allowed_callable_modules: tuple[str, ...] | None
__post_init__(self) 'None' -> None[source]

Validate pipeline configuration values.

Raises:

PipelineConfigError – If configuration is invalid.

__init__(self, name: 'str', steps: 'tuple[StepConfig, ...]', on_error: 'ErrorPolicy' = <ErrorPolicy.FAIL_FAST: 'fail_fast'>, default_timeout: 'float' = 300.0, allowed_callable_modules: 'tuple[str, ...] | None' = None) None -> None
class kstlib.pipeline.models.PipelineResult(name, results=<factory>, duration=0.0)[source]

Bases: object

Aggregate result of a pipeline execution.

name

Pipeline name.

Type:

str

results

Ordered list of step results.

Type:

list[kstlib.pipeline.models.StepResult]

duration

Total pipeline execution duration in seconds.

Type:

float

Examples

>>> result = PipelineResult(name="deploy")
>>> result.success
True
name: str
results: list[StepResult]
duration: float
property success: bool

Whether all executed steps succeeded.

Returns:

True if no step has FAILED or TIMEOUT status.

property failed_steps: list[StepResult]

Steps that failed or timed out.

Returns:

List of StepResult with FAILED or TIMEOUT status.

property skipped_steps: list[StepResult]

Steps that were skipped.

Returns:

List of StepResult with SKIPPED status.

__init__(self, name: 'str', results: 'list[StepResult]' = <factory>, duration: 'float' = 0.0) None -> None
class kstlib.pipeline.models.StepCondition(value)[source]

Bases: str, Enum

Condition for executing a pipeline step.

ALWAYS

Execute the step regardless of previous results.

ON_SUCCESS

Execute only if all previous steps succeeded.

ON_FAILURE

Execute only if at least one previous step failed.

ALWAYS = 'always'
ON_SUCCESS = 'on_success'
ON_FAILURE = 'on_failure'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

class kstlib.pipeline.models.StepConfig(name, type, command=None, module=None, callable=None, args=(), env=<factory>, working_dir=None, timeout=None, when=StepCondition.ALWAYS)[source]

Bases: object

Configuration for a single pipeline step.

name

Unique step name within the pipeline.

Type:

str

type

Execution mode (shell, python, callable).

Type:

kstlib.pipeline.models.StepType

command

Shell command string (required for shell type).

Type:

str | None

module

Python module to execute (required for python type).

Type:

str | None

callable

Import target module.path:function (required for callable type).

Type:

str | None

args

Arguments passed to the step.

Type:

tuple[str, …]

env

Environment variables to set for the step.

Type:

dict[str, str]

working_dir

Working directory for the step.

Type:

str | None

timeout

Step timeout in seconds (None uses pipeline default).

Type:

float | None

when

Condition for executing this step.

Type:

kstlib.pipeline.models.StepCondition

Examples

>>> config = StepConfig(
...     name="build",
...     type=StepType.SHELL,
...     command="echo hello",
... )
>>> config.name
'build'
>>> config = StepConfig(
...     name="process",
...     type=StepType.PYTHON,
...     module="my.module",
...     args=["--verbose"],
... )
name: str
type: StepType
command: str | None
module: str | None
callable: str | None
args: tuple[str, ...]
env: dict[str, str]
working_dir: str | None
timeout: float | None
when: StepCondition
__post_init__(self) 'None' -> None[source]

Validate step configuration values.

Raises:

PipelineConfigError – If any configuration value is invalid.

__init__(self, name: 'str', type: 'StepType', command: 'str | None' = None, module: 'str | None' = None, callable: 'str | None' = None, args: 'tuple[str, ...]' = (), env: 'dict[str, str]' = <factory>, working_dir: 'str | None' = None, timeout: 'float | None' = None, when: 'StepCondition' = <StepCondition.ALWAYS: 'always'>) None -> None
class kstlib.pipeline.models.StepResult(name, status, stdout='', stderr='', return_code=None, return_value=None, duration=0.0, error=None)[source]

Bases: object

Result of a single pipeline step execution.

name

Step name.

Type:

str

status

Execution result status.

Type:

kstlib.pipeline.models.StepStatus

stdout

Standard output captured from the step.

Type:

str

stderr

Standard error captured from the step.

Type:

str

return_code

Process exit code (shell/python steps).

Type:

int | None

return_value

Return value (callable steps).

Type:

object

duration

Execution duration in seconds.

Type:

float

error

Error message if the step failed.

Type:

str | None

Examples

>>> result = StepResult(name="build", status=StepStatus.SUCCESS)
>>> result.status
<StepStatus.SUCCESS: 'success'>
name: str
status: StepStatus
stdout: str
stderr: str
return_code: int | None
return_value: object
duration: float
error: str | None
__init__(self, name: 'str', status: 'StepStatus', stdout: 'str' = '', stderr: 'str' = '', return_code: 'int | None' = None, return_value: 'object' = None, duration: 'float' = 0.0, error: 'str | None' = None) None -> None
class kstlib.pipeline.models.StepStatus(value)[source]

Bases: str, Enum

Result status of a pipeline step.

SUCCESS

Step completed successfully (exit code 0).

FAILED

Step failed (non-zero exit code or exception).

SKIPPED

Step was skipped due to condition or abort.

TIMEOUT

Step exceeded its timeout limit.

SUCCESS = 'success'
FAILED = 'failed'
SKIPPED = 'skipped'
TIMEOUT = 'timeout'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

class kstlib.pipeline.models.StepType(value)[source]

Bases: str, Enum

Execution mode for a pipeline step.

SHELL

Execute a shell command via subprocess.run(shell=True).

PYTHON

Execute a Python module via python -m module.

CALLABLE

Import and call a Python function directly.

SHELL = 'shell'
PYTHON = 'python'
CALLABLE = 'callable'
__format__(self, format_spec)

Returns format using actual value type unless __str__ has been overridden.

Steps

Shell step executor for pipeline.

Executes shell commands via subprocess.run(shell=True) with timeout, environment variable injection, and working directory support.

Multi-line commands are supported natively via YAML folded (>-) or literal (|) block scalars. The resulting string is passed as-is to subprocess.run(shell=True), so loops, pipes, and redirections work.

class kstlib.pipeline.steps.shell.ShellStep[source]

Bases: object

Execute a shell command as a pipeline step.

Uses subprocess.run with shell=True to execute the command string. Supports environment variable injection, working directory, and timeout.

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = ShellStep()
>>> config = StepConfig(
...     name="greet",
...     type=StepType.SHELL,
...     command="echo hello",
... )
>>> result = step.execute(config)  
>>> result.status  
<StepStatus.SUCCESS: 'success'>
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a shell command.

Parameters:
  • config (StepConfig) – Step configuration with command, env, timeout, etc.

  • dry_run (bool) – If True, log the command without executing it.

Returns:

StepResult with captured stdout, stderr, return code, and duration.

Return type:

StepResult

Python module step executor for pipeline.

Executes Python modules via subprocess.run([sys.executable, "-m", module]). Runs in a subprocess (not shell=True) for isolation and security.

class kstlib.pipeline.steps.python.PythonStep[source]

Bases: object

Execute a Python module as a pipeline step.

Uses subprocess.run with [sys.executable, "-m", module, *args] to run a Python module in a subprocess. Does not use shell=True.

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = PythonStep()
>>> config = StepConfig(
...     name="lint",
...     type=StepType.PYTHON,
...     module="ruff",
...     args=("check", "src/"),
... )
>>> result = step.execute(config)  
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a Python module via subprocess.

Parameters:
  • config (StepConfig) – Step configuration with module, args, env, timeout, etc.

  • dry_run (bool) – If True, log the command without executing it.

Returns:

StepResult with captured stdout, stderr, return code, and duration.

Return type:

StepResult

Callable step executor for pipeline.

Imports and calls a Python function directly using importlib. The callable target format is module.path:function_name.

Note

Callable steps do not support timeout natively. If timeout control is needed, use a shell or python step instead.

kstlib.pipeline.steps.callable.DANGEROUS_MODULES: frozenset[str] = frozenset({'__main__', 'builtins', 'code', 'compile', 'ctypes', 'importlib', 'marshal', 'nt', 'os', 'pickle', 'posix', 'shutil', 'subprocess', 'sys'})

Modules that are ALWAYS rejected by CallableStep regardless of the caller-supplied allowed_modules whitelist. These modules provide unrestricted OS or interpreter access and must never be importable via a YAML-driven pipeline config.

class kstlib.pipeline.steps.callable.CallableStep(allowed_modules=None)[source]

Bases: object

Execute a Python callable as a pipeline step.

Parses the callable target as module.path:function_name, imports the module, and calls the function. The return value is captured in StepResult.return_value.

Security enforcement (defense-in-depth):

  • The root module is always checked against DANGEROUS_MODULES.

  • When allowed_modules is provided, the module must match the whitelist (prefix match, identical to transform.chain pattern).

  • When allowed_modules is None, only the blacklist applies (backward compatible default).

Parameters:

allowed_modules (tuple[str, ...] | None) – Optional whitelist of module paths permitted as callable targets. When None, no whitelist is enforced (blacklist still applies).

Examples

>>> from kstlib.pipeline.models import StepConfig, StepType
>>> step = CallableStep()
>>> config = StepConfig(
...     name="process",
...     type=StepType.CALLABLE,
...     callable="json:dumps",
... )
>>> result = step.execute(config)  
__init__(self, allowed_modules: 'tuple[str, ...] | None' = None) 'None' -> None[source]

Initialize CallableStep with an optional allow-list of modules.

execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a Python callable.

Parameters:
  • config (StepConfig) – Step configuration with callable target, args, etc.

  • dry_run (bool) – If True, log the callable without executing it.

Returns:

StepResult with return_value, duration, and status.

Raises:

PipelineConfigError – If the target module is blacklisted or not present in the configured whitelist.

Return type:

StepResult

Protocol

Abstract base protocol for pipeline steps.

This module defines the protocol that all pipeline step implementations must satisfy, enabling consistent execution across shell, python, and callable step types.

class kstlib.pipeline.base.AbstractStep(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the interface for pipeline step executors.

All step implementations (ShellStep, PythonStep, CallableStep) must implement this protocol to ensure consistent behavior across step types.

Examples

>>> def run_step(step: AbstractStep, config: StepConfig) -> StepResult:
...     return step.execute(config)
execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]

Execute a pipeline step.

Parameters:
  • config (StepConfig) – Step configuration with command, env, timeout, etc.

  • dry_run (bool) – If True, simulate execution without side effects.

Returns:

StepResult with status, stdout, stderr, duration, etc.

Return type:

StepResult

__init__(self, *args, **kwargs)

Validators

Input validation for kstlib.pipeline module.

This module provides validation functions for pipeline configuration, implementing deep defense against malformed or malicious input.

Reuses security validators from kstlib.ops.validators for command and environment variable validation.

kstlib.pipeline.validators.MAX_MODULE_NAME_LENGTH = 256

Maximum length of a module name.

kstlib.pipeline.validators.MAX_PIPELINE_STEPS = 50

Maximum number of steps in a single pipeline.

kstlib.pipeline.validators.MAX_STEP_ARGS = 50

Maximum number of arguments for a step.

kstlib.pipeline.validators.MAX_STEP_NAME_LENGTH = 64

Maximum step name length.

kstlib.pipeline.validators.MODULE_NAME_PATTERN = re.compile('^[a-zA-Z_][a-zA-Z0-9_.]*$')

Pattern for valid Python module names.

kstlib.pipeline.validators.STEP_NAME_PATTERN = re.compile('^[a-zA-Z][a-zA-Z0-9_-]*$')

Pattern for valid step names (same rules as session names).

kstlib.pipeline.validators.validate_callable_target(target: 'str') 'str' -> str[source]

Validate a callable target string.

Expected format: module.path:function_name

Parameters:

target (str) – Callable target string to validate.

Returns:

The validated target (unchanged).

Raises:

PipelineConfigError – If target is invalid.

Return type:

str

Examples

>>> validate_callable_target("mymodule:run")
'mymodule:run'
>>> validate_callable_target("my.pkg.module:do_work")
'my.pkg.module:do_work'
kstlib.pipeline.validators.validate_command(command: 'str | None', *, strict: 'bool' = True) 'str | None' -> str | None[source]

Validate command string.

Rules:

  • Max 4096 characters.

  • No null bytes.

  • strict=True (default): block dangerous shell patterns like semicolons, pipes, redirections, eval, source, etc.

  • strict=False: only basic checks (length, null bytes) for trusted contexts like pipeline config files.

Parameters:
  • command (str | None) – Command string to validate (can be None).

  • strict (bool) – If True, block shell metacharacters. Default True.

Returns:

The validated command (unchanged).

Raises:

ValueError – If command is invalid or contains dangerous patterns.

Return type:

str | None

Examples

>>> validate_command("python -m app")
'python -m app'
>>> validate_command(None) is None
True
>>> validate_command("echo a; echo b", strict=False)
'echo a; echo b'
kstlib.pipeline.validators.validate_env(env: 'dict[str, str]') 'dict[str, str]' -> dict[str, str][source]

Validate environment variables.

Rules: - Max 100 env vars - Key max 128 characters - Value max 32KB - Key must be valid identifier - Dangerous keys blocked (LD_PRELOAD, PYTHONPATH, etc.)

Parameters:

env (dict[str, str]) – Dictionary of environment variables to validate.

Returns:

The validated env dict (unchanged).

Raises:

ValueError – If env vars are invalid.

Return type:

dict[str, str]

Examples

>>> validate_env({"APP_ENV": "production"})
{'APP_ENV': 'production'}
kstlib.pipeline.validators.validate_module_name(module: 'str') 'str' -> str[source]

Validate a Python module name.

Parameters:

module (str) – Module name to validate (e.g. my.package.module).

Returns:

The validated module name (unchanged).

Raises:

PipelineConfigError – If module name is invalid.

Return type:

str

Examples

>>> validate_module_name("mymodule")
'mymodule'
>>> validate_module_name("my.package.module")
'my.package.module'
kstlib.pipeline.validators.validate_pipeline_config(*, step_count: 'int', on_error: 'str') 'None' -> None[source]

Validate pipeline-level configuration.

Parameters:
  • step_count (int) – Number of steps in the pipeline.

  • on_error (str) – Error policy string.

Raises:

PipelineConfigError – If configuration is invalid.

kstlib.pipeline.validators.validate_step_config(*, name: 'str', step_type: 'str', command: 'str | None' = None, module: 'str | None' = None, callable_target: 'str | None' = None, args: 'list[str] | None' = None, env: 'dict[str, str] | None' = None) 'None' -> None[source]

Validate a step configuration.

Checks that the step has the required fields for its type and that all values pass validation.

Parameters:
  • name (str) – Step name.

  • step_type (str) – Step type (shell, python, callable).

  • command (str | None) – Shell command (required for shell type).

  • module (str | None) – Python module (required for python type).

  • callable_target (str | None) – Import target (required for callable type).

  • args (list[str] | None) – Arguments list.

  • env (dict[str, str] | None) – Environment variables.

Raises:

PipelineConfigError – If configuration is invalid.

kstlib.pipeline.validators.validate_step_name(name: 'str') 'str' -> str[source]

Validate and return a step name.

Rules: - Cannot be empty - Max 64 characters (hard limit) - Must start with a letter - Only alphanumeric, underscore, hyphen allowed

Parameters:

name (str) – Step name to validate.

Returns:

The validated step name (unchanged).

Raises:

PipelineConfigError – If name is invalid.

Return type:

str

Examples

>>> validate_step_name("build_logs")
'build_logs'
>>> validate_step_name("step-01")
'step-01'
>>> validate_step_name("")
Traceback (most recent call last):
    ...
kstlib.pipeline.exceptions.PipelineConfigError: Step name cannot be empty

Exceptions

Specialized exceptions raised by the kstlib.pipeline module.

Exception hierarchy:

KstlibError
    PipelineError (base for all pipeline errors)
        PipelineConfigError (invalid configuration, also ValueError)
        PipelineAbortedError (fail_fast abort)
        StepError (step execution error)
            StepTimeoutError (step exceeded timeout)
            StepImportError (callable import failure)
exception kstlib.pipeline.exceptions.PipelineAbortedError(step_name, reason)[source]

Bases: PipelineError

Pipeline execution was aborted due to fail_fast policy.

Raised when a step fails and the error policy is fail_fast, causing the remaining steps to be skipped.

step_name

Name of the step that caused the abort.

reason

Description of why the step failed.

__init__(self, step_name: 'str', reason: 'str') 'None' -> None[source]

Initialize PipelineAbortedError.

Parameters:
  • step_name (str) – Name of the step that caused the abort.

  • reason (str) – Description of why the step failed.

exception kstlib.pipeline.exceptions.PipelineConfigError[source]

Bases: PipelineError, ValueError

Pipeline configuration is invalid.

Raised when the pipeline or step configuration contains invalid values, missing required fields, or constraint violations.

exception kstlib.pipeline.exceptions.PipelineError[source]

Bases: KstlibError

Base exception for all pipeline module errors.

All pipeline-specific exceptions inherit from this class, allowing for easy catching of any pipeline error.

exception kstlib.pipeline.exceptions.StepError(step_name, reason)[source]

Bases: PipelineError

A pipeline step failed during execution.

step_name

Name of the step that failed.

reason

Description of the failure.

__init__(self, step_name: 'str', reason: 'str') 'None' -> None[source]

Initialize StepError.

Parameters:
  • step_name (str) – Name of the step that failed.

  • reason (str) – Description of the failure.

exception kstlib.pipeline.exceptions.StepImportError(step_name, target)[source]

Bases: StepError

Failed to import a callable target for a step.

step_name

Name of the step with the import failure.

target

The import target string that failed.

__init__(self, step_name: 'str', target: 'str') 'None' -> None[source]

Initialize StepImportError.

Parameters:
  • step_name (str) – Name of the step with the import failure.

  • target (str) – The import target string (e.g. “module.path:function”).

exception kstlib.pipeline.exceptions.StepTimeoutError(step_name, timeout)[source]

Bases: StepError

A pipeline step exceeded its timeout.

step_name

Name of the step that timed out.

timeout

The timeout value in seconds.

__init__(self, step_name: 'str', timeout: 'float') 'None' -> None[source]

Initialize StepTimeoutError.

Parameters:
  • step_name (str) – Name of the step that timed out.

  • timeout (float) – The timeout value in seconds.