Source code for kstlib.pipeline.exceptions
"""Specialized exceptions raised by the kstlib.pipeline module.
Exception hierarchy::
KstlibError
PipelineError (base for all pipeline errors)
PipelineConfigError (invalid configuration, also ValueError)
PipelineAbortedError (fail_fast abort)
StepError (step execution error)
StepTimeoutError (step exceeded timeout)
StepImportError (callable import failure)
"""
from __future__ import annotations
from kstlib.config.exceptions import KstlibError
[docs]
class PipelineError(KstlibError):
"""Base exception for all pipeline module errors.
All pipeline-specific exceptions inherit from this class,
allowing for easy catching of any pipeline error.
"""
[docs]
class PipelineConfigError(PipelineError, ValueError):
"""Pipeline configuration is invalid.
Raised when the pipeline or step configuration contains
invalid values, missing required fields, or constraint
violations.
"""
[docs]
class PipelineAbortedError(PipelineError):
"""Pipeline execution was aborted due to fail_fast policy.
Raised when a step fails and the error policy is ``fail_fast``,
causing the remaining steps to be skipped.
Attributes:
step_name: Name of the step that caused the abort.
reason: Description of why the step failed.
"""
[docs]
def __init__(self, step_name: str, reason: str) -> None:
"""Initialize PipelineAbortedError.
Args:
step_name: Name of the step that caused the abort.
reason: Description of why the step failed.
"""
super().__init__(f"Pipeline aborted at step '{step_name}': {reason}")
self.step_name = step_name
self.reason = reason
[docs]
class StepError(PipelineError):
"""A pipeline step failed during execution.
Attributes:
step_name: Name of the step that failed.
reason: Description of the failure.
"""
[docs]
def __init__(self, step_name: str, reason: str) -> None:
"""Initialize StepError.
Args:
step_name: Name of the step that failed.
reason: Description of the failure.
"""
super().__init__(f"Step '{step_name}' failed: {reason}")
self.step_name = step_name
self.reason = reason
[docs]
class StepTimeoutError(StepError):
"""A pipeline step exceeded its timeout.
Attributes:
step_name: Name of the step that timed out.
timeout: The timeout value in seconds.
"""
[docs]
def __init__(self, step_name: str, timeout: float) -> None:
"""Initialize StepTimeoutError.
Args:
step_name: Name of the step that timed out.
timeout: The timeout value in seconds.
"""
super().__init__(step_name, f"exceeded timeout of {timeout}s")
self.timeout = timeout
[docs]
class StepImportError(StepError):
"""Failed to import a callable target for a step.
Attributes:
step_name: Name of the step with the import failure.
target: The import target string that failed.
"""
[docs]
def __init__(self, step_name: str, target: str) -> None:
"""Initialize StepImportError.
Args:
step_name: Name of the step with the import failure.
target: The import target string (e.g. "module.path:function").
"""
super().__init__(step_name, f"cannot import '{target}'")
self.target = target
__all__ = [
"PipelineAbortedError",
"PipelineConfigError",
"PipelineError",
"StepError",
"StepImportError",
"StepTimeoutError",
]