Pipeline¶
Declarative, config-driven pipeline execution for sequential workflows.
kstlib.pipeline provides a PipelineRunner that executes a sequence of shell commands,
Python modules, and callable functions with support for conditional execution,
error policies, timeout cascade, and dry-run mode.
Tip
Pair this reference with Pipeline for the feature guide.
Quick overview¶
PipelineRunneris the main facade for pipeline executionStepConfigandPipelineConfigdefine the pipeline structureStepResultandPipelineResultcapture execution resultsShellStep,PythonStep,CallableStepare the step executorsConfiguration follows standard priority: constructor args >
kstlib.conf.yml> defaults
Configuration cascade¶
The module consults the loaded config for pipeline definitions:
pipeline:
default_timeout: 300
on_error: fail_fast
pipelines:
morning-monitoring:
steps:
- name: check_services
type: shell
command: systemctl status nginx
timeout: 30
- name: send_report
type: callable
callable: my.alerts:send_summary
when: always
Load a pipeline from config:
from kstlib.pipeline import PipelineRunner
runner = PipelineRunner.from_config("morning-monitoring")
result = runner.run()
Usage patterns¶
Programmatic pipeline¶
from kstlib.pipeline import (
PipelineRunner, PipelineConfig, StepConfig, StepType, ErrorPolicy,
)
config = PipelineConfig(
name="deploy",
steps=(
StepConfig(name="build", type=StepType.SHELL, command="make build"),
StepConfig(name="test", type=StepType.PYTHON, module="pytest"),
StepConfig(name="notify", type=StepType.CALLABLE, callable="my.alerts:send_ok"),
),
on_error=ErrorPolicy.FAIL_FAST,
default_timeout=300,
)
runner = PipelineRunner(config)
result = runner.run()
Dry-run simulation¶
result = runner.run(dry_run=True)
for step in result.results:
print(f"{step.name}: {step.stdout}")
Module reference¶
Declarative pipeline execution for kstlib.
This module provides config-driven pipeline execution for sequential workflows combining shell commands, Python modules, and callable functions.
Pipelines can be defined programmatically or declared in kstlib.conf.yml
and executed with error handling, conditional steps, and dry-run support.
Examples
Programmatic pipeline:
>>> from kstlib.pipeline import PipelineRunner, PipelineConfig, StepConfig, StepType
>>> config = PipelineConfig(
... name="demo",
... steps=(
... StepConfig(name="greet", type=StepType.SHELL, command="echo hello"),
... ),
... )
>>> runner = PipelineRunner(config)
>>> result = runner.run()
Config-driven pipeline:
>>> runner = PipelineRunner.from_config("morning")
>>> result = runner.run()
- class kstlib.pipeline.AbstractStep(*args, **kwargs)[source]
Bases:
ProtocolProtocol defining the interface for pipeline step executors.
All step implementations (ShellStep, PythonStep, CallableStep) must implement this protocol to ensure consistent behavior across step types.
Examples
>>> def run_step(step: AbstractStep, config: StepConfig) -> StepResult: ... return step.execute(config)
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a pipeline step.
- Parameters:
config (StepConfig) – Step configuration with command, env, timeout, etc.
dry_run (bool) – If True, simulate execution without side effects.
- Returns:
StepResult with status, stdout, stderr, duration, etc.
- Return type:
StepResult
- __init__(self, *args, **kwargs)
- class kstlib.pipeline.CallableStep(allowed_modules=None)[source]
Bases:
objectExecute a Python callable as a pipeline step.
Parses the
callabletarget asmodule.path:function_name, imports the module, and calls the function. The return value is captured inStepResult.return_value.Security enforcement (defense-in-depth):
The root module is always checked against
DANGEROUS_MODULES.When
allowed_modulesis provided, the module must match the whitelist (prefix match, identical to transform.chain pattern).When
allowed_modulesisNone, only the blacklist applies (backward compatible default).
- Parameters:
allowed_modules (tuple[str, ...] | None) – Optional whitelist of module paths permitted as callable targets. When
None, no whitelist is enforced (blacklist still applies).
Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = CallableStep() >>> config = StepConfig( ... name="process", ... type=StepType.CALLABLE, ... callable="json:dumps", ... ) >>> result = step.execute(config)
- __init__(self, allowed_modules: 'tuple[str, ...] | None' = None) 'None' -> None[source]
Initialize CallableStep with an optional allow-list of modules.
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a Python callable.
- Parameters:
config (StepConfig) – Step configuration with callable target, args, etc.
dry_run (bool) – If True, log the callable without executing it.
- Returns:
StepResult with return_value, duration, and status.
- Raises:
PipelineConfigError – If the target module is blacklisted or not present in the configured whitelist.
- Return type:
StepResult
- class kstlib.pipeline.ErrorPolicy(value)[source]
-
Error handling policy for a pipeline.
- FAIL_FAST
Abort pipeline on first step failure.
- CONTINUE
Continue executing remaining steps after a failure.
- FAIL_FAST = 'fail_fast'
- CONTINUE = 'continue'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- exception kstlib.pipeline.PipelineAbortedError(step_name, reason)[source]
Bases:
PipelineErrorPipeline execution was aborted due to fail_fast policy.
Raised when a step fails and the error policy is
fail_fast, causing the remaining steps to be skipped.- step_name
Name of the step that caused the abort.
- reason
Description of why the step failed.
- class kstlib.pipeline.PipelineConfig(name, steps, on_error=ErrorPolicy.FAIL_FAST, default_timeout=300.0, allowed_callable_modules=None)[source]
Bases:
objectConfiguration for a complete pipeline.
- name
Pipeline name.
- Type:
- steps
Ordered tuple of step configurations.
- Type:
tuple[kstlib.pipeline.models.StepConfig, …]
- on_error
Error handling policy.
- Type:
kstlib.pipeline.models.ErrorPolicy
- default_timeout
Default timeout for steps without explicit timeout.
- Type:
Examples
>>> config = PipelineConfig( ... name="deploy", ... steps=( ... StepConfig(name="build", type=StepType.SHELL, command="make build"), ... StepConfig(name="test", type=StepType.SHELL, command="make test"), ... ), ... ) >>> len(config.steps) 2
- name: str
- steps: tuple[StepConfig, ...]
- on_error: ErrorPolicy
- default_timeout: float
- __post_init__(self) 'None' -> None[source]
Validate pipeline configuration values.
- Raises:
PipelineConfigError – If configuration is invalid.
- __init__(self, name: 'str', steps: 'tuple[StepConfig, ...]', on_error: 'ErrorPolicy' = <ErrorPolicy.FAIL_FAST: 'fail_fast'>, default_timeout: 'float' = 300.0, allowed_callable_modules: 'tuple[str, ...] | None' = None) None -> None
- exception kstlib.pipeline.PipelineConfigError[source]
Bases:
PipelineError,ValueErrorPipeline configuration is invalid.
Raised when the pipeline or step configuration contains invalid values, missing required fields, or constraint violations.
- exception kstlib.pipeline.PipelineError[source]
Bases:
KstlibErrorBase exception for all pipeline module errors.
All pipeline-specific exceptions inherit from this class, allowing for easy catching of any pipeline error.
- class kstlib.pipeline.PipelineResult(name, results=<factory>, duration=0.0)[source]
Bases:
objectAggregate result of a pipeline execution.
- name
Pipeline name.
- Type:
- results
Ordered list of step results.
- Type:
list[kstlib.pipeline.models.StepResult]
- duration
Total pipeline execution duration in seconds.
- Type:
Examples
>>> result = PipelineResult(name="deploy") >>> result.success True
- name: str
- results: list[StepResult]
- duration: float
- property success: bool
Whether all executed steps succeeded.
- Returns:
True if no step has FAILED or TIMEOUT status.
- property failed_steps: list[StepResult]
Steps that failed or timed out.
- Returns:
List of StepResult with FAILED or TIMEOUT status.
- property skipped_steps: list[StepResult]
Steps that were skipped.
- Returns:
List of StepResult with SKIPPED status.
- __init__(self, name: 'str', results: 'list[StepResult]' = <factory>, duration: 'float' = 0.0) None -> None
- class kstlib.pipeline.PipelineRunner(config)[source]
Bases:
objectExecute a pipeline of sequential steps.
Supports conditional execution (
when), error policies (fail_fast/continue), timeout cascading, and dry-run mode.- Parameters:
config (PipelineConfig) – Pipeline configuration.
Examples
Build a pipeline programmatically:
>>> from kstlib.pipeline.models import ( ... PipelineConfig, StepConfig, StepType, ... ) >>> config = PipelineConfig( ... name="demo", ... steps=( ... StepConfig(name="greet", type=StepType.SHELL, command="echo hello"), ... ), ... ) >>> runner = PipelineRunner(config) >>> result = runner.run()
Load from
kstlib.conf.yml:>>> runner = PipelineRunner.from_config("morning") >>> result = runner.run()
- __init__(self, config: 'PipelineConfig') 'None' -> None[source]
Initialize PipelineRunner.
- Parameters:
config (PipelineConfig) – Pipeline configuration with steps and policies.
- property config: PipelineConfig
Return the pipeline configuration.
- classmethod from_config(name: 'str', **overrides: 'Any') 'PipelineRunner' -> PipelineRunner[source]
Create a PipelineRunner from
kstlib.conf.yml.Loads the pipeline definition from
pipeline.pipelines.<name>in the global configuration.- Parameters:
- Returns:
Configured PipelineRunner instance.
- Raises:
PipelineConfigError – If the pipeline is not found or invalid.
- Return type:
PipelineRunner
Examples
>>> runner = PipelineRunner.from_config("morning")
- run(self, *, dry_run: 'bool' = False) 'PipelineResult' -> PipelineResult[source]
Execute the pipeline.
Runs each step sequentially, respecting conditions and error policies.
- Parameters:
dry_run (bool) – If True, simulate execution without side effects.
- Returns:
PipelineResult with all step results and aggregate status.
- Raises:
PipelineAbortedError – If a step fails with
fail_fastpolicy.- Return type:
PipelineResult
- class kstlib.pipeline.PythonStep[source]
Bases:
objectExecute a Python module as a pipeline step.
Uses
subprocess.runwith[sys.executable, "-m", module, *args]to run a Python module in a subprocess. Does not useshell=True.Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = PythonStep() >>> config = StepConfig( ... name="lint", ... type=StepType.PYTHON, ... module="ruff", ... args=("check", "src/"), ... ) >>> result = step.execute(config)
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a Python module via subprocess.
- Parameters:
config (StepConfig) – Step configuration with module, args, env, timeout, etc.
dry_run (bool) – If True, log the command without executing it.
- Returns:
StepResult with captured stdout, stderr, return code, and duration.
- Return type:
StepResult
- class kstlib.pipeline.ShellStep[source]
Bases:
objectExecute a shell command as a pipeline step.
Uses
subprocess.runwithshell=Trueto execute the command string. Supports environment variable injection, working directory, and timeout.Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = ShellStep() >>> config = StepConfig( ... name="greet", ... type=StepType.SHELL, ... command="echo hello", ... ) >>> result = step.execute(config) >>> result.status <StepStatus.SUCCESS: 'success'>
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a shell command.
- Parameters:
config (StepConfig) – Step configuration with command, env, timeout, etc.
dry_run (bool) – If True, log the command without executing it.
- Returns:
StepResult with captured stdout, stderr, return code, and duration.
- Return type:
StepResult
- class kstlib.pipeline.StepCondition(value)[source]
-
Condition for executing a pipeline step.
- ALWAYS
Execute the step regardless of previous results.
- ON_SUCCESS
Execute only if all previous steps succeeded.
- ON_FAILURE
Execute only if at least one previous step failed.
- ALWAYS = 'always'
- ON_SUCCESS = 'on_success'
- ON_FAILURE = 'on_failure'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- class kstlib.pipeline.StepConfig(name, type, command=None, module=None, callable=None, args=(), env=<factory>, working_dir=None, timeout=None, when=StepCondition.ALWAYS)[source]
Bases:
objectConfiguration for a single pipeline step.
- name
Unique step name within the pipeline.
- Type:
- type
Execution mode (shell, python, callable).
- Type:
kstlib.pipeline.models.StepType
- command
Shell command string (required for shell type).
- Type:
str | None
- module
Python module to execute (required for python type).
- Type:
str | None
- callable
Import target
module.path:function(required for callable type).- Type:
str | None
- working_dir
Working directory for the step.
- Type:
str | None
- timeout
Step timeout in seconds (None uses pipeline default).
- Type:
float | None
- when
Condition for executing this step.
- Type:
kstlib.pipeline.models.StepCondition
Examples
>>> config = StepConfig( ... name="build", ... type=StepType.SHELL, ... command="echo hello", ... ) >>> config.name 'build'
>>> config = StepConfig( ... name="process", ... type=StepType.PYTHON, ... module="my.module", ... args=["--verbose"], ... )
- name: str
- type: StepType
- when: StepCondition
- __post_init__(self) 'None' -> None[source]
Validate step configuration values.
- Raises:
PipelineConfigError – If any configuration value is invalid.
- __init__(self, name: 'str', type: 'StepType', command: 'str | None' = None, module: 'str | None' = None, callable: 'str | None' = None, args: 'tuple[str, ...]' = (), env: 'dict[str, str]' = <factory>, working_dir: 'str | None' = None, timeout: 'float | None' = None, when: 'StepCondition' = <StepCondition.ALWAYS: 'always'>) None -> None
- exception kstlib.pipeline.StepError(step_name, reason)[source]
Bases:
PipelineErrorA pipeline step failed during execution.
- step_name
Name of the step that failed.
- reason
Description of the failure.
- exception kstlib.pipeline.StepImportError(step_name, target)[source]
Bases:
StepErrorFailed to import a callable target for a step.
- step_name
Name of the step with the import failure.
- target
The import target string that failed.
- class kstlib.pipeline.StepResult(name, status, stdout='', stderr='', return_code=None, return_value=None, duration=0.0, error=None)[source]
Bases:
objectResult of a single pipeline step execution.
- name
Step name.
- Type:
- status
Execution result status.
- Type:
kstlib.pipeline.models.StepStatus
- stdout
Standard output captured from the step.
- Type:
- stderr
Standard error captured from the step.
- Type:
- return_code
Process exit code (shell/python steps).
- Type:
int | None
- return_value
Return value (callable steps).
- Type:
- duration
Execution duration in seconds.
- Type:
- error
Error message if the step failed.
- Type:
str | None
Examples
>>> result = StepResult(name="build", status=StepStatus.SUCCESS) >>> result.status <StepStatus.SUCCESS: 'success'>
- name: str
- status: StepStatus
- stdout: str
- stderr: str
- return_value: object
- duration: float
- __init__(self, name: 'str', status: 'StepStatus', stdout: 'str' = '', stderr: 'str' = '', return_code: 'int | None' = None, return_value: 'object' = None, duration: 'float' = 0.0, error: 'str | None' = None) None -> None
- class kstlib.pipeline.StepStatus(value)[source]
-
Result status of a pipeline step.
- SUCCESS
Step completed successfully (exit code 0).
- FAILED
Step failed (non-zero exit code or exception).
- SKIPPED
Step was skipped due to condition or abort.
- TIMEOUT
Step exceeded its timeout limit.
- SUCCESS = 'success'
- FAILED = 'failed'
- SKIPPED = 'skipped'
- TIMEOUT = 'timeout'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- exception kstlib.pipeline.StepTimeoutError(step_name, timeout)[source]
Bases:
StepErrorA pipeline step exceeded its timeout.
- step_name
Name of the step that timed out.
- timeout
The timeout value in seconds.
- class kstlib.pipeline.StepType(value)[source]
-
Execution mode for a pipeline step.
- SHELL
Execute a shell command via
subprocess.run(shell=True).
- PYTHON
Execute a Python module via
python -m module.
- CALLABLE
Import and call a Python function directly.
- SHELL = 'shell'
- PYTHON = 'python'
- CALLABLE = 'callable'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
Runner¶
Pipeline runner for sequential step execution.
Provides the PipelineRunner class that executes a sequence of steps
with support for conditional execution, error policies, dry-run mode,
and config-driven pipeline definitions.
- class kstlib.pipeline.runner.PipelineRunner(config)[source]
Bases:
objectExecute a pipeline of sequential steps.
Supports conditional execution (
when), error policies (fail_fast/continue), timeout cascading, and dry-run mode.- Parameters:
config (PipelineConfig) – Pipeline configuration.
Examples
Build a pipeline programmatically:
>>> from kstlib.pipeline.models import ( ... PipelineConfig, StepConfig, StepType, ... ) >>> config = PipelineConfig( ... name="demo", ... steps=( ... StepConfig(name="greet", type=StepType.SHELL, command="echo hello"), ... ), ... ) >>> runner = PipelineRunner(config) >>> result = runner.run()
Load from
kstlib.conf.yml:>>> runner = PipelineRunner.from_config("morning") >>> result = runner.run()
- __init__(self, config: 'PipelineConfig') 'None' -> None[source]
Initialize PipelineRunner.
- Parameters:
config (PipelineConfig) – Pipeline configuration with steps and policies.
- property config: PipelineConfig
Return the pipeline configuration.
- classmethod from_config(name: 'str', **overrides: 'Any') 'PipelineRunner' -> PipelineRunner[source]
Create a PipelineRunner from
kstlib.conf.yml.Loads the pipeline definition from
pipeline.pipelines.<name>in the global configuration.- Parameters:
- Returns:
Configured PipelineRunner instance.
- Raises:
PipelineConfigError – If the pipeline is not found or invalid.
- Return type:
PipelineRunner
Examples
>>> runner = PipelineRunner.from_config("morning")
- run(self, *, dry_run: 'bool' = False) 'PipelineResult' -> PipelineResult[source]
Execute the pipeline.
Runs each step sequentially, respecting conditions and error policies.
- Parameters:
dry_run (bool) – If True, simulate execution without side effects.
- Returns:
PipelineResult with all step results and aggregate status.
- Raises:
PipelineAbortedError – If a step fails with
fail_fastpolicy.- Return type:
PipelineResult
Models¶
Data models for the kstlib.pipeline module.
This module defines the core data structures used by the pipeline module:
StepType: Enum for step execution mode (shell, python, callable)
ErrorPolicy: Enum for pipeline error handling (fail_fast, continue)
StepCondition: Enum for conditional step execution (always, on_success, on_failure)
StepStatus: Enum for step result status (success, failed, skipped, timeout)
StepConfig: Frozen configuration for a single pipeline step
PipelineConfig: Frozen configuration for an entire pipeline
StepResult: Mutable result of a single step execution
PipelineResult: Mutable aggregate result of a pipeline execution
- class kstlib.pipeline.models.ErrorPolicy(value)[source]
-
Error handling policy for a pipeline.
- FAIL_FAST
Abort pipeline on first step failure.
- CONTINUE
Continue executing remaining steps after a failure.
- FAIL_FAST = 'fail_fast'
- CONTINUE = 'continue'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- class kstlib.pipeline.models.PipelineConfig(name, steps, on_error=ErrorPolicy.FAIL_FAST, default_timeout=300.0, allowed_callable_modules=None)[source]
Bases:
objectConfiguration for a complete pipeline.
- name
Pipeline name.
- Type:
- steps
Ordered tuple of step configurations.
- Type:
tuple[kstlib.pipeline.models.StepConfig, …]
- on_error
Error handling policy.
- Type:
kstlib.pipeline.models.ErrorPolicy
- default_timeout
Default timeout for steps without explicit timeout.
- Type:
Examples
>>> config = PipelineConfig( ... name="deploy", ... steps=( ... StepConfig(name="build", type=StepType.SHELL, command="make build"), ... StepConfig(name="test", type=StepType.SHELL, command="make test"), ... ), ... ) >>> len(config.steps) 2
- name: str
- steps: tuple[StepConfig, ...]
- on_error: ErrorPolicy
- default_timeout: float
- __post_init__(self) 'None' -> None[source]
Validate pipeline configuration values.
- Raises:
PipelineConfigError – If configuration is invalid.
- __init__(self, name: 'str', steps: 'tuple[StepConfig, ...]', on_error: 'ErrorPolicy' = <ErrorPolicy.FAIL_FAST: 'fail_fast'>, default_timeout: 'float' = 300.0, allowed_callable_modules: 'tuple[str, ...] | None' = None) None -> None
- class kstlib.pipeline.models.PipelineResult(name, results=<factory>, duration=0.0)[source]
Bases:
objectAggregate result of a pipeline execution.
- name
Pipeline name.
- Type:
- results
Ordered list of step results.
- Type:
list[kstlib.pipeline.models.StepResult]
- duration
Total pipeline execution duration in seconds.
- Type:
Examples
>>> result = PipelineResult(name="deploy") >>> result.success True
- name: str
- results: list[StepResult]
- duration: float
- property success: bool
Whether all executed steps succeeded.
- Returns:
True if no step has FAILED or TIMEOUT status.
- property failed_steps: list[StepResult]
Steps that failed or timed out.
- Returns:
List of StepResult with FAILED or TIMEOUT status.
- property skipped_steps: list[StepResult]
Steps that were skipped.
- Returns:
List of StepResult with SKIPPED status.
- __init__(self, name: 'str', results: 'list[StepResult]' = <factory>, duration: 'float' = 0.0) None -> None
- class kstlib.pipeline.models.StepCondition(value)[source]
-
Condition for executing a pipeline step.
- ALWAYS
Execute the step regardless of previous results.
- ON_SUCCESS
Execute only if all previous steps succeeded.
- ON_FAILURE
Execute only if at least one previous step failed.
- ALWAYS = 'always'
- ON_SUCCESS = 'on_success'
- ON_FAILURE = 'on_failure'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- class kstlib.pipeline.models.StepConfig(name, type, command=None, module=None, callable=None, args=(), env=<factory>, working_dir=None, timeout=None, when=StepCondition.ALWAYS)[source]
Bases:
objectConfiguration for a single pipeline step.
- name
Unique step name within the pipeline.
- Type:
- type
Execution mode (shell, python, callable).
- Type:
kstlib.pipeline.models.StepType
- command
Shell command string (required for shell type).
- Type:
str | None
- module
Python module to execute (required for python type).
- Type:
str | None
- callable
Import target
module.path:function(required for callable type).- Type:
str | None
- working_dir
Working directory for the step.
- Type:
str | None
- timeout
Step timeout in seconds (None uses pipeline default).
- Type:
float | None
- when
Condition for executing this step.
- Type:
kstlib.pipeline.models.StepCondition
Examples
>>> config = StepConfig( ... name="build", ... type=StepType.SHELL, ... command="echo hello", ... ) >>> config.name 'build'
>>> config = StepConfig( ... name="process", ... type=StepType.PYTHON, ... module="my.module", ... args=["--verbose"], ... )
- name: str
- type: StepType
- when: StepCondition
- __post_init__(self) 'None' -> None[source]
Validate step configuration values.
- Raises:
PipelineConfigError – If any configuration value is invalid.
- __init__(self, name: 'str', type: 'StepType', command: 'str | None' = None, module: 'str | None' = None, callable: 'str | None' = None, args: 'tuple[str, ...]' = (), env: 'dict[str, str]' = <factory>, working_dir: 'str | None' = None, timeout: 'float | None' = None, when: 'StepCondition' = <StepCondition.ALWAYS: 'always'>) None -> None
- class kstlib.pipeline.models.StepResult(name, status, stdout='', stderr='', return_code=None, return_value=None, duration=0.0, error=None)[source]
Bases:
objectResult of a single pipeline step execution.
- name
Step name.
- Type:
- status
Execution result status.
- Type:
kstlib.pipeline.models.StepStatus
- stdout
Standard output captured from the step.
- Type:
- stderr
Standard error captured from the step.
- Type:
- return_code
Process exit code (shell/python steps).
- Type:
int | None
- return_value
Return value (callable steps).
- Type:
- duration
Execution duration in seconds.
- Type:
- error
Error message if the step failed.
- Type:
str | None
Examples
>>> result = StepResult(name="build", status=StepStatus.SUCCESS) >>> result.status <StepStatus.SUCCESS: 'success'>
- name: str
- status: StepStatus
- stdout: str
- stderr: str
- return_value: object
- duration: float
- __init__(self, name: 'str', status: 'StepStatus', stdout: 'str' = '', stderr: 'str' = '', return_code: 'int | None' = None, return_value: 'object' = None, duration: 'float' = 0.0, error: 'str | None' = None) None -> None
- class kstlib.pipeline.models.StepStatus(value)[source]
-
Result status of a pipeline step.
- SUCCESS
Step completed successfully (exit code 0).
- FAILED
Step failed (non-zero exit code or exception).
- SKIPPED
Step was skipped due to condition or abort.
- TIMEOUT
Step exceeded its timeout limit.
- SUCCESS = 'success'
- FAILED = 'failed'
- SKIPPED = 'skipped'
- TIMEOUT = 'timeout'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
- class kstlib.pipeline.models.StepType(value)[source]
-
Execution mode for a pipeline step.
- SHELL
Execute a shell command via
subprocess.run(shell=True).
- PYTHON
Execute a Python module via
python -m module.
- CALLABLE
Import and call a Python function directly.
- SHELL = 'shell'
- PYTHON = 'python'
- CALLABLE = 'callable'
- __format__(self, format_spec)
Returns format using actual value type unless __str__ has been overridden.
Steps¶
Shell step executor for pipeline.
Executes shell commands via subprocess.run(shell=True) with
timeout, environment variable injection, and working directory support.
Multi-line commands are supported natively via YAML folded (>-) or
literal (|) block scalars. The resulting string is passed as-is to
subprocess.run(shell=True), so loops, pipes, and redirections work.
- class kstlib.pipeline.steps.shell.ShellStep[source]
Bases:
objectExecute a shell command as a pipeline step.
Uses
subprocess.runwithshell=Trueto execute the command string. Supports environment variable injection, working directory, and timeout.Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = ShellStep() >>> config = StepConfig( ... name="greet", ... type=StepType.SHELL, ... command="echo hello", ... ) >>> result = step.execute(config) >>> result.status <StepStatus.SUCCESS: 'success'>
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a shell command.
- Parameters:
config (StepConfig) – Step configuration with command, env, timeout, etc.
dry_run (bool) – If True, log the command without executing it.
- Returns:
StepResult with captured stdout, stderr, return code, and duration.
- Return type:
StepResult
Python module step executor for pipeline.
Executes Python modules via subprocess.run([sys.executable, "-m", module]).
Runs in a subprocess (not shell=True) for isolation and security.
- class kstlib.pipeline.steps.python.PythonStep[source]
Bases:
objectExecute a Python module as a pipeline step.
Uses
subprocess.runwith[sys.executable, "-m", module, *args]to run a Python module in a subprocess. Does not useshell=True.Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = PythonStep() >>> config = StepConfig( ... name="lint", ... type=StepType.PYTHON, ... module="ruff", ... args=("check", "src/"), ... ) >>> result = step.execute(config)
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a Python module via subprocess.
- Parameters:
config (StepConfig) – Step configuration with module, args, env, timeout, etc.
dry_run (bool) – If True, log the command without executing it.
- Returns:
StepResult with captured stdout, stderr, return code, and duration.
- Return type:
StepResult
Callable step executor for pipeline.
Imports and calls a Python function directly using importlib.
The callable target format is module.path:function_name.
Note
Callable steps do not support timeout natively. If timeout control is needed, use a shell or python step instead.
- kstlib.pipeline.steps.callable.DANGEROUS_MODULES: frozenset[str] = frozenset({'__main__', 'builtins', 'code', 'compile', 'ctypes', 'importlib', 'marshal', 'nt', 'os', 'pickle', 'posix', 'shutil', 'subprocess', 'sys'})
Modules that are ALWAYS rejected by CallableStep regardless of the caller-supplied
allowed_moduleswhitelist. These modules provide unrestricted OS or interpreter access and must never be importable via a YAML-driven pipeline config.
- class kstlib.pipeline.steps.callable.CallableStep(allowed_modules=None)[source]
Bases:
objectExecute a Python callable as a pipeline step.
Parses the
callabletarget asmodule.path:function_name, imports the module, and calls the function. The return value is captured inStepResult.return_value.Security enforcement (defense-in-depth):
The root module is always checked against
DANGEROUS_MODULES.When
allowed_modulesis provided, the module must match the whitelist (prefix match, identical to transform.chain pattern).When
allowed_modulesisNone, only the blacklist applies (backward compatible default).
- Parameters:
allowed_modules (tuple[str, ...] | None) – Optional whitelist of module paths permitted as callable targets. When
None, no whitelist is enforced (blacklist still applies).
Examples
>>> from kstlib.pipeline.models import StepConfig, StepType >>> step = CallableStep() >>> config = StepConfig( ... name="process", ... type=StepType.CALLABLE, ... callable="json:dumps", ... ) >>> result = step.execute(config)
- __init__(self, allowed_modules: 'tuple[str, ...] | None' = None) 'None' -> None[source]
Initialize CallableStep with an optional allow-list of modules.
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a Python callable.
- Parameters:
config (StepConfig) – Step configuration with callable target, args, etc.
dry_run (bool) – If True, log the callable without executing it.
- Returns:
StepResult with return_value, duration, and status.
- Raises:
PipelineConfigError – If the target module is blacklisted or not present in the configured whitelist.
- Return type:
StepResult
Protocol¶
Abstract base protocol for pipeline steps.
This module defines the protocol that all pipeline step implementations must satisfy, enabling consistent execution across shell, python, and callable step types.
- class kstlib.pipeline.base.AbstractStep(*args, **kwargs)[source]
Bases:
ProtocolProtocol defining the interface for pipeline step executors.
All step implementations (ShellStep, PythonStep, CallableStep) must implement this protocol to ensure consistent behavior across step types.
Examples
>>> def run_step(step: AbstractStep, config: StepConfig) -> StepResult: ... return step.execute(config)
- execute(self, config: 'StepConfig', *, dry_run: 'bool' = False) 'StepResult' -> StepResult[source]
Execute a pipeline step.
- Parameters:
config (StepConfig) – Step configuration with command, env, timeout, etc.
dry_run (bool) – If True, simulate execution without side effects.
- Returns:
StepResult with status, stdout, stderr, duration, etc.
- Return type:
StepResult
- __init__(self, *args, **kwargs)
Validators¶
Input validation for kstlib.pipeline module.
This module provides validation functions for pipeline configuration, implementing deep defense against malformed or malicious input.
Reuses security validators from kstlib.ops.validators for command
and environment variable validation.
- kstlib.pipeline.validators.MAX_MODULE_NAME_LENGTH = 256
Maximum length of a module name.
- kstlib.pipeline.validators.MAX_PIPELINE_STEPS = 50
Maximum number of steps in a single pipeline.
- kstlib.pipeline.validators.MAX_STEP_ARGS = 50
Maximum number of arguments for a step.
- kstlib.pipeline.validators.MAX_STEP_NAME_LENGTH = 64
Maximum step name length.
- kstlib.pipeline.validators.MODULE_NAME_PATTERN = re.compile('^[a-zA-Z_][a-zA-Z0-9_.]*$')
Pattern for valid Python module names.
- kstlib.pipeline.validators.STEP_NAME_PATTERN = re.compile('^[a-zA-Z][a-zA-Z0-9_-]*$')
Pattern for valid step names (same rules as session names).
- kstlib.pipeline.validators.validate_callable_target(target: 'str') 'str' -> str[source]
Validate a callable target string.
Expected format:
module.path:function_name- Parameters:
target (str) – Callable target string to validate.
- Returns:
The validated target (unchanged).
- Raises:
PipelineConfigError – If target is invalid.
- Return type:
Examples
>>> validate_callable_target("mymodule:run") 'mymodule:run' >>> validate_callable_target("my.pkg.module:do_work") 'my.pkg.module:do_work'
- kstlib.pipeline.validators.validate_command(command: 'str | None', *, strict: 'bool' = True) 'str | None' -> str | None[source]
Validate command string.
Rules:
Max 4096 characters.
No null bytes.
strict=True(default): block dangerous shell patterns like semicolons, pipes, redirections, eval, source, etc.strict=False: only basic checks (length, null bytes) for trusted contexts like pipeline config files.
- Parameters:
- Returns:
The validated command (unchanged).
- Raises:
ValueError – If command is invalid or contains dangerous patterns.
- Return type:
str | None
Examples
>>> validate_command("python -m app") 'python -m app' >>> validate_command(None) is None True >>> validate_command("echo a; echo b", strict=False) 'echo a; echo b'
- kstlib.pipeline.validators.validate_env(env: 'dict[str, str]') 'dict[str, str]' -> dict[str, str][source]
Validate environment variables.
Rules: - Max 100 env vars - Key max 128 characters - Value max 32KB - Key must be valid identifier - Dangerous keys blocked (LD_PRELOAD, PYTHONPATH, etc.)
- Parameters:
env (dict[str, str]) – Dictionary of environment variables to validate.
- Returns:
The validated env dict (unchanged).
- Raises:
ValueError – If env vars are invalid.
- Return type:
Examples
>>> validate_env({"APP_ENV": "production"}) {'APP_ENV': 'production'}
- kstlib.pipeline.validators.validate_module_name(module: 'str') 'str' -> str[source]
Validate a Python module name.
- Parameters:
module (str) – Module name to validate (e.g.
my.package.module).- Returns:
The validated module name (unchanged).
- Raises:
PipelineConfigError – If module name is invalid.
- Return type:
Examples
>>> validate_module_name("mymodule") 'mymodule' >>> validate_module_name("my.package.module") 'my.package.module'
- kstlib.pipeline.validators.validate_pipeline_config(*, step_count: 'int', on_error: 'str') 'None' -> None[source]
Validate pipeline-level configuration.
- Parameters:
- Raises:
PipelineConfigError – If configuration is invalid.
- kstlib.pipeline.validators.validate_step_config(*, name: 'str', step_type: 'str', command: 'str | None' = None, module: 'str | None' = None, callable_target: 'str | None' = None, args: 'list[str] | None' = None, env: 'dict[str, str] | None' = None) 'None' -> None[source]
Validate a step configuration.
Checks that the step has the required fields for its type and that all values pass validation.
- Parameters:
- Raises:
PipelineConfigError – If configuration is invalid.
- kstlib.pipeline.validators.validate_step_name(name: 'str') 'str' -> str[source]
Validate and return a step name.
Rules: - Cannot be empty - Max 64 characters (hard limit) - Must start with a letter - Only alphanumeric, underscore, hyphen allowed
- Parameters:
name (str) – Step name to validate.
- Returns:
The validated step name (unchanged).
- Raises:
PipelineConfigError – If name is invalid.
- Return type:
Examples
>>> validate_step_name("build_logs") 'build_logs' >>> validate_step_name("step-01") 'step-01' >>> validate_step_name("") Traceback (most recent call last): ... kstlib.pipeline.exceptions.PipelineConfigError: Step name cannot be empty
Exceptions¶
Specialized exceptions raised by the kstlib.pipeline module.
Exception hierarchy:
KstlibError
PipelineError (base for all pipeline errors)
PipelineConfigError (invalid configuration, also ValueError)
PipelineAbortedError (fail_fast abort)
StepError (step execution error)
StepTimeoutError (step exceeded timeout)
StepImportError (callable import failure)
- exception kstlib.pipeline.exceptions.PipelineAbortedError(step_name, reason)[source]
Bases:
PipelineErrorPipeline execution was aborted due to fail_fast policy.
Raised when a step fails and the error policy is
fail_fast, causing the remaining steps to be skipped.- step_name
Name of the step that caused the abort.
- reason
Description of why the step failed.
- exception kstlib.pipeline.exceptions.PipelineConfigError[source]
Bases:
PipelineError,ValueErrorPipeline configuration is invalid.
Raised when the pipeline or step configuration contains invalid values, missing required fields, or constraint violations.
- exception kstlib.pipeline.exceptions.PipelineError[source]
Bases:
KstlibErrorBase exception for all pipeline module errors.
All pipeline-specific exceptions inherit from this class, allowing for easy catching of any pipeline error.
- exception kstlib.pipeline.exceptions.StepError(step_name, reason)[source]
Bases:
PipelineErrorA pipeline step failed during execution.
- step_name
Name of the step that failed.
- reason
Description of the failure.
- exception kstlib.pipeline.exceptions.StepImportError(step_name, target)[source]
Bases:
StepErrorFailed to import a callable target for a step.
- step_name
Name of the step with the import failure.
- target
The import target string that failed.