Resilience Utilities

Public API for fault tolerance patterns: circuit breaker, rate limiter, graceful shutdown, heartbeat monitoring, and watchdog. These components help build robust services that handle failures gracefully and support clean termination.

Tip

Pair this reference with Resilience for the feature guide.

Quick overview

  • CircuitBreaker implements the circuit breaker pattern to prevent cascading failures

  • RateLimiter provides token bucket rate limiting for request throttling

  • GracefulShutdown manages prioritized cleanup callbacks on process termination

  • Heartbeat provides file-based liveness signaling for external monitoring

  • Watchdog detects thread/process freezes with configurable timeout callbacks

  • All components support both sync and async usage patterns

  • Configuration follows the standard priority chain: constructor args > kstlib.conf.yml > defaults

Configuration cascade

The module consults the loaded config for default values. A minimal config block:

resilience:
  circuit_breaker:
    max_failures: 5
    reset_timeout: 60
  shutdown:
    default_timeout: 30
  heartbeat:
    interval: 10
  watchdog:
    timeout: 30

Override any of these per instance:

from kstlib.resilience import CircuitBreaker

cb = CircuitBreaker(max_failures=3, reset_timeout=10)

Usage patterns

Circuit breaker for external APIs

from kstlib.resilience import CircuitBreaker, CircuitOpenError

cb = CircuitBreaker(max_failures=3, reset_timeout=30)

try:
    result = cb.call(api.fetch, endpoint="/data")
except CircuitOpenError:
    result = fallback_value

Async circuit breaker

result = await cb.acall(async_api.fetch, symbol="BTC/USDT")

Rate limiter for API throttling

from kstlib.resilience import RateLimiter, rate_limiter

# Direct usage
limiter = RateLimiter(rate=10, per=1.0)
limiter.acquire()  # Blocks until token available
call_api()

# As decorator
@rate_limiter(rate=100, per=60.0)  # 100 per minute
def call_service(data: dict) -> dict:
    return api.post(data)

Decorator syntax

from kstlib.resilience import circuit_breaker

@circuit_breaker(max_failures=3)
def call_service(data: dict) -> dict:
    return external_api.post(data)

Graceful shutdown with priorities

from kstlib.resilience import GracefulShutdown

shutdown = GracefulShutdown()
shutdown.register("save", save_state, priority=10)
shutdown.register("close", close_db, priority=20)
shutdown.install()  # Handle SIGTERM/SIGINT

Heartbeat monitoring

from kstlib.resilience import Heartbeat

heartbeat = Heartbeat(
    state_file="/tmp/app.heartbeat",
    interval=10,
    metadata={"version": "1.0"}
)
heartbeat.start()

Watchdog for freeze detection

from kstlib.resilience import Watchdog

def on_freeze():
    print("Thread appears frozen!")

with Watchdog(timeout=30, on_timeout=on_freeze) as wd:
    for item in work_queue:
        wd.ping()  # Must call before timeout
        process(item)

Module reference

Resilience utilities for fault-tolerant applications.

This module provides core components for building resilient systems:

  • Heartbeat: Periodic liveness signaling via state files

  • GracefulShutdown: Orderly shutdown with prioritized callbacks

  • CircuitBreaker: Protect against cascading failures

  • RateLimiter: Token bucket rate limiting for request throttling

  • Watchdog: Detect thread/process freezes and hangs

Examples

Heartbeat for process monitoring:

>>> from kstlib.resilience import Heartbeat
>>> with Heartbeat("/tmp/app.heartbeat") as hb:  
...     run_application()
>>> Heartbeat.is_alive("/tmp/app.heartbeat")  
True

Graceful shutdown with cleanup:

>>> from kstlib.resilience import GracefulShutdown
>>> with GracefulShutdown() as shutdown:  
...     shutdown.register("db", close_database, priority=10)
...     shutdown.register("cache", flush_cache, priority=20)
...     run_application()

Circuit breaker for external calls:

>>> from kstlib.resilience import circuit_breaker
>>> @circuit_breaker(max_failures=3, reset_timeout=30)
... def call_external_api():  
...     return requests.get("http://api.example.com")

Rate limiting API calls:

>>> from kstlib.resilience import rate_limiter, RateLimiter
>>> @rate_limiter(rate=10, per=1.0)  # 10 requests per second
... def call_api():  
...     return requests.get("http://api.example.com")
>>> limiter = RateLimiter(rate=100, per=60.0)  # 100 per minute
>>> limiter.acquire()  
True

Watchdog for freeze detection:

>>> from kstlib.resilience import Watchdog
>>> def on_freeze():  
...     print("Thread frozen!")
>>> with Watchdog(timeout=30, on_timeout=on_freeze) as wd:  
...     while running:
...         wd.ping()
...         do_work()
class kstlib.resilience.CircuitBreaker(*, max_failures=None, reset_timeout=None, half_open_max_calls=None, excluded_exceptions=(), name=None)[source]

Bases: object

Circuit breaker for protecting against cascading failures.

Implements the circuit breaker pattern to prevent repeated calls to a failing service and allow recovery time.

Parameters:
  • max_failures (int | None) – Failures before opening circuit (default from config).

  • reset_timeout (float | None) – Seconds before attempting recovery (default from config).

  • half_open_max_calls (int | None) – Calls allowed in half-open state (default from config).

  • excluded_exceptions (tuple[type[Exception], ...]) – Exceptions that don’t count as failures.

  • name (str | None) – Optional name for the circuit breaker.

Examples

As a decorator:

>>> @circuit_breaker
... def call_api():  
...     return requests.get("http://api.example.com")

With custom settings:

>>> @circuit_breaker(max_failures=3, reset_timeout=30)
... def risky_call():  
...     pass

Direct instantiation:

>>> cb = CircuitBreaker(max_failures=5)
>>> cb.state
<CircuitState.CLOSED: 1>
__init__(self, *, max_failures: 'int | None' = None, reset_timeout: 'float | None' = None, half_open_max_calls: 'int | None' = None, excluded_exceptions: 'tuple[type[Exception], ...]' = (), name: 'str | None' = None) 'None' -> None[source]

Initialize circuit breaker.

Parameters:
  • max_failures (int | None) – Failures before opening circuit. Uses config if None.

  • reset_timeout (float | None) – Seconds before attempting recovery. Uses config if None.

  • half_open_max_calls (int | None) – Calls allowed in half-open state. Uses config if None.

  • excluded_exceptions (tuple[type[Exception], ...]) – Exceptions that don’t count as failures.

  • name (str | None) – Optional name for the circuit breaker.

property state: CircuitState

Return the current circuit state.

property name: str | None

Return the circuit breaker name.

property stats: CircuitStats

Return circuit breaker statistics.

property failure_count: int

Return current failure count.

call(self, func: 'Callable[P, R]', *args: 'P.args', **kwargs: 'P.kwargs') 'R' -> R[source]

Execute a function through the circuit breaker.

Parameters:
  • func (Callable[P, R]) – Function to execute.

  • *args (P.args) – Positional arguments for the function.

  • **kwargs (P.kwargs) – Keyword arguments for the function.

Returns:

Function result.

Raises:

CircuitOpenError – If circuit is open.

Return type:

R

Examples

>>> cb = CircuitBreaker()
>>> result = cb.call(lambda x: x * 2, 5)
>>> result
10
async acall(self, func: 'Callable[P, Awaitable[R]]', *args: 'P.args', **kwargs: 'P.kwargs') 'R' -> R[source]

Execute an async function through the circuit breaker.

Parameters:
  • func (Callable[P, Awaitable[R]]) – Async function to execute.

  • *args (P.args) – Positional arguments for the function.

  • **kwargs (P.kwargs) – Keyword arguments for the function.

Returns:

Function result.

Raises:

CircuitOpenError – If circuit is open.

Return type:

R

Examples

>>> import asyncio
>>> cb = CircuitBreaker()
>>> async def double(x): return x * 2
>>> asyncio.run(cb.acall(double, 5))
10
reset(self) 'None' -> None[source]

Manually reset the circuit breaker to closed state.

Examples

>>> cb = CircuitBreaker(max_failures=1)
>>> try:
...     cb.call(lambda: 1/0)
... except ZeroDivisionError:
...     pass
>>> cb.state.name
'OPEN'
>>> cb.reset()
>>> cb.state.name
'CLOSED'
__call__(self, func: 'Callable[P, R]') 'Callable[P, R] | Callable[P, Awaitable[R]]' -> Callable[P, R] | Callable[P, Awaitable[R]][source]

Use circuit breaker as a decorator.

Parameters:

func (Callable[P, R]) – Function to wrap.

Returns:

Wrapped function with circuit breaker protection.

Return type:

Callable[P, R] | Callable[P, Awaitable[R]]

exception kstlib.resilience.CircuitBreakerError[source]

Bases: KstlibError, RuntimeError

Base exception for circuit breaker errors.

exception kstlib.resilience.CircuitOpenError(message, remaining_seconds)[source]

Bases: CircuitBreakerError

Raised when a call is attempted while the circuit is open.

remaining_seconds

Time until the circuit may transition to half-open.

__init__(self, message: 'str', remaining_seconds: 'float') 'None' -> None[source]

Initialize CircuitOpenError.

Parameters:
  • message (str) – Human-readable error message.

  • remaining_seconds (float) – Seconds until circuit may transition to half-open.

class kstlib.resilience.CircuitState(value)[source]

Bases: Enum

State of the circuit breaker.

States:

CLOSED: Normal operation, requests pass through. OPEN: Circuit tripped, requests fail immediately. HALF_OPEN: Testing if service recovered.

CLOSED = 1
OPEN = 2
HALF_OPEN = 3
class kstlib.resilience.CircuitStats(total_calls=0, successful_calls=0, failed_calls=0, rejected_calls=0, state_changes=0)[source]

Bases: object

Statistics for circuit breaker monitoring.

total_calls

Total number of calls attempted.

Type:

int

successful_calls

Number of successful calls.

Type:

int

failed_calls

Number of failed calls.

Type:

int

rejected_calls

Number of calls rejected due to open circuit.

Type:

int

state_changes

Number of state transitions.

Type:

int

Examples

>>> stats = CircuitStats()
>>> stats.record_success()
>>> stats.record_failure()
>>> stats.record_rejection()
>>> (stats.successful_calls, stats.failed_calls, stats.rejected_calls)
(1, 1, 1)
>>> stats.total_calls
3
total_calls: int = 0
successful_calls: int = 0
failed_calls: int = 0
rejected_calls: int = 0
state_changes: int = 0
record_success(self) 'None' -> None[source]

Record a successful call.

record_failure(self) 'None' -> None[source]

Record a failed call.

record_rejection(self) 'None' -> None[source]

Record a rejected call (circuit open).

record_state_change(self) 'None' -> None[source]

Record a state transition.

__init__(self, total_calls: 'int' = 0, successful_calls: 'int' = 0, failed_calls: 'int' = 0, rejected_calls: 'int' = 0, state_changes: 'int' = 0) None -> None
class kstlib.resilience.CleanupCallback(name, callback, priority=100, timeout=None, is_async=False)[source]

Bases: object

Registered cleanup callback with metadata.

name

Unique identifier for the callback.

Type:

str

callback

The cleanup function (sync or async).

Type:

collections.abc.Callable[[], None] | collections.abc.Callable[[], collections.abc.Coroutine[Any, Any, None]]

priority

Execution order (lower runs first, default 100).

Type:

int

timeout

Per-callback timeout in seconds (None = use global).

Type:

float | None

is_async

Whether the callback is async.

Type:

bool

Examples

>>> cb = CleanupCallback(
...     name="db_close",
...     callback=lambda: None,
...     priority=50,
...     timeout=5.0,
...     is_async=False,
... )
>>> (cb.name, cb.priority, cb.timeout)
('db_close', 50, 5.0)
name: str
callback: Callable[[], None] | Callable[[], Coroutine[Any, Any, None]]
priority: int
timeout: float | None
is_async: bool
__init__(self, name: 'str', callback: 'Callback', priority: 'int' = 100, timeout: 'float | None' = None, is_async: 'bool' = False) None -> None
class kstlib.resilience.GracefulShutdown(*, timeout=None, signals=None, force_exit_code=1)[source]

Bases: object

Graceful shutdown handler with prioritized cleanup callbacks.

Manages orderly shutdown on SIGTERM/SIGINT with timeout enforcement. Callbacks execute in priority order (lower = first).

Parameters:
  • timeout (float | None) – Total timeout for all callbacks (default from config).

  • signals (tuple[signal.Signals, ...] | None) – Signals to handle (default: SIGTERM, SIGINT).

  • force_exit_code (int) – Exit code when timeout exceeded (default: 1).

Examples

Register callbacks with priority ordering:

>>> shutdown = GracefulShutdown(timeout=30)
>>> shutdown.register("cache", lambda: None, priority=20)
>>> shutdown.register("db", lambda: None, priority=10)
>>> [cb.name for cb in shutdown._get_sorted_callbacks()]
['db', 'cache']

Context manager usage (with signals):

>>> with GracefulShutdown() as shutdown:  
...     shutdown.register("cleanup", close_resources)
...     run_application()
__init__(self, *, timeout: 'float | None' = None, signals: 'tuple[signal.Signals, ...] | None' = None, force_exit_code: 'int' = 1) 'None' -> None[source]

Initialize graceful shutdown handler.

Parameters:
  • timeout (float | None) – Total timeout for all callbacks. Uses config if None.

  • signals (tuple[Signals, ...] | None) – Signals to handle. Auto-detects platform if None.

  • force_exit_code (int) – Exit code when timeout exceeded.

property timeout: float

Return the total shutdown timeout in seconds.

property is_shutting_down: bool

Return True if shutdown is in progress.

property is_installed: bool

Return True if signal handlers are installed.

register(self, name: 'str', callback: 'Callback', *, priority: 'int' = 100, timeout: 'float | None' = None) 'None' -> None[source]

Register a cleanup callback.

Parameters:
  • name (str) – Unique identifier for the callback.

  • callback (Callable[[], None] | Callable[[], Coroutine[Any, Any, None]]) – Cleanup function (sync or async).

  • priority (int) – Execution order (lower runs first, default 100).

  • timeout (float | None) – Per-callback timeout (None = use global).

Raises:

ShutdownError – If name already registered or shutdown in progress.

Examples

>>> shutdown = GracefulShutdown()
>>> shutdown.register("db", lambda: print("closing db"), priority=10)
>>> "db" in [cb.name for cb in shutdown._callbacks.values()]
True
unregister(self, name: 'str') 'bool' -> bool[source]

Unregister a cleanup callback.

Parameters:

name (str) – Identifier of callback to remove.

Returns:

True if callback was removed, False if not found.

Return type:

bool

Examples

>>> shutdown = GracefulShutdown()
>>> shutdown.register("test", lambda: None)
>>> shutdown.unregister("test")
True
>>> shutdown.unregister("nonexistent")
False
install(self) 'None' -> None[source]

Install signal handlers.

Raises:

ShutdownError – If handlers already installed.

uninstall(self) 'None' -> None[source]

Restore original signal handlers.

trigger(self) 'None' -> None[source]

Trigger shutdown programmatically.

Useful for testing or triggering shutdown from code. Runs callbacks synchronously in priority order.

async atrigger(self) 'None' -> None[source]

Trigger shutdown programmatically (async version).

Runs callbacks asynchronously in priority order.

wait(self, timeout: 'float | None' = None) 'bool' -> bool[source]

Wait for shutdown signal.

Parameters:

timeout (float | None) – Maximum time to wait (None = wait forever).

Returns:

True if shutdown was triggered, False if timeout.

Return type:

bool

async await_shutdown(self, timeout: 'float | None' = None) 'bool' -> bool[source]

Wait for shutdown signal (async version).

Parameters:

timeout (float | None) – Maximum time to wait (None = wait forever).

Returns:

True if shutdown was triggered, False if timeout.

Return type:

bool

__enter__(self) 'Self' -> Self[source]

Enter sync context manager.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit sync context manager.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit async context manager.

class kstlib.resilience.Heartbeat(state_file=None, *, interval=None, on_missed_beat=None, on_alert=None, target=None, on_target_dead=None, on_beat=None, metadata=None)[source]

Bases: object

Periodic signal to indicate the process is alive.

Writes timestamp to a JSON state file at configurable intervals. Supports both sync and async context managers.

Parameters:
  • state_file (str | Path | None) – Path to the heartbeat state file. If None, no file is written (useful when using on_beat callback for state management).

  • interval (float | None) – Seconds between heartbeats (default from config or 10s).

  • on_missed_beat (Callable[[Exception], None] | None) – Callback invoked when a beat write fails.

  • on_alert (OnAlertCallback | None) – Callback for alerting (channel, message, context).

  • target (HeartbeatTarget | None) – Optional object with is_dead property to monitor.

  • on_target_dead (Callable[[], Awaitable[None] | None] | None) – Callback invoked when target is detected as dead.

  • on_beat (Callable[[], Awaitable[None] | None] | None) – Callback invoked after each successful beat. Can be sync or async. Use this to delegate state writing to an external component.

  • metadata (dict[str, Any] | None) – Optional dict included in each heartbeat.

Examples

Sync context manager:

>>> with Heartbeat("/tmp/bot.heartbeat") as hb:  
...     do_work()

Async context manager:

>>> async with Heartbeat("/tmp/bot.heartbeat") as hb:  
...     await do_async_work()

Check if a process is alive:

>>> Heartbeat.is_alive("/tmp/bot.heartbeat", max_age_seconds=30)  
True

Monitor a WebSocket:

>>> hb = Heartbeat(  
...     "/tmp/bot.heartbeat",
...     target=ws_manager,
...     on_target_dead=lambda: restart_ws(),
... )
__init__(self, state_file: 'str | Path | None' = None, *, interval: 'float | None' = None, on_missed_beat: 'Callable[[Exception], None] | None' = None, on_alert: 'OnAlertCallback | None' = None, target: 'HeartbeatTarget | None' = None, on_target_dead: 'Callable[[], Awaitable[None] | None] | None' = None, on_beat: 'Callable[[], Awaitable[None] | None] | None' = None, metadata: 'dict[str, Any] | None' = None) 'None' -> None[source]

Initialize heartbeat.

Parameters:
  • state_file (Path | str | None) – Path to the heartbeat state file. If None, no file is written.

  • interval (float | None) – Seconds between heartbeats. Uses config default if None.

  • on_missed_beat (Callable[[Exception], None] | None) – Callback invoked when a beat write fails.

  • on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).

  • target (HeartbeatTarget | None) – Optional object with is_dead property to monitor.

  • on_target_dead (Callable[[], Awaitable[None] | None] | None) – Callback invoked when target is detected as dead.

  • on_beat (Callable[[], Awaitable[None] | None] | None) – Callback invoked after each successful beat.

  • metadata (dict[str, Any] | None) – Optional dict included in each heartbeat.

property interval: float

Return the heartbeat interval in seconds.

property state_file: Path | None

Return the path to the state file, or None if not configured.

property is_shutdown: bool

Check if shutdown has been requested.

property target: HeartbeatTarget | None

Return the monitored target, if any.

shutdown(self) 'None' -> None[source]

Signal shutdown and stop gracefully.

Sets the shutdown flag which can be checked by external code to know that we’re shutting down intentionally.

async ashutdown(self) 'None' -> None[source]

Signal shutdown and stop gracefully (async version).

start(self) 'None' -> None[source]

Start the heartbeat background thread.

Raises:

HeartbeatError – If heartbeat is already running.

stop(self) 'None' -> None[source]

Stop the heartbeat and clean up.

Safe to call multiple times or if not started.

beat(self) 'None' -> None[source]

Write a heartbeat immediately (manual trigger).

If state_file is configured, writes to file. If on_beat callback is configured, it will be invoked by the loop (not here).

Raises:

HeartbeatError – If state file is configured and cannot be written.

async astart(self) 'None' -> None[source]

Start the heartbeat using asyncio (async version).

Raises:

HeartbeatError – If heartbeat is already running.

async astop(self) 'None' -> None[source]

Stop the heartbeat (async version).

Safe to call multiple times or if not started.

static read_state(state_file: 'str | Path') 'HeartbeatState | None' -> HeartbeatState | None[source]

Read and parse an existing heartbeat state file.

Parameters:

state_file (str | Path) – Path to heartbeat file.

Returns:

HeartbeatState if file exists and is valid, None otherwise.

Return type:

HeartbeatState | None

Examples

>>> state = Heartbeat.read_state("/tmp/bot.heartbeat")  
>>> if state:  
...     print(f"Last beat: {state.timestamp}")
static is_alive(state_file: 'str | Path', max_age_seconds: 'float' = 30.0) 'bool' -> bool[source]

Check if a process is alive based on its heartbeat.

Parameters:
  • state_file (str | Path) – Path to heartbeat file.

  • max_age_seconds (float) – Maximum age before considering process dead.

Returns:

True if heartbeat exists and is recent enough.

Return type:

bool

Examples

>>> Heartbeat.is_alive("/tmp/bot.heartbeat", max_age_seconds=30)  
True
__enter__(self) 'Self' -> Self[source]

Enter sync context manager.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit sync context manager.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit async context manager.

exception kstlib.resilience.HeartbeatError[source]

Bases: KstlibError, RuntimeError

Raised when the heartbeat encounters an error.

Examples include state file write failure or invalid state file path.

class kstlib.resilience.HeartbeatState(timestamp, pid, hostname, metadata=<factory>)[source]

Bases: object

Represents the state written to the heartbeat file.

timestamp

Last heartbeat time (ISO 8601 UTC).

Type:

str

pid

Process ID.

Type:

int

hostname

Machine hostname.

Type:

str

metadata

Optional application-specific data.

Type:

dict[str, Any]

Examples

>>> state = HeartbeatState(
...     timestamp="2026-01-12T10:00:00+00:00",
...     pid=1234,
...     hostname="myhost",
... )
>>> state.pid
1234
timestamp: str
pid: int
hostname: str
metadata: dict[str, Any]
to_dict(self) 'dict[str, Any]' -> dict[str, Any][source]

Serialize to JSON-compatible dictionary.

Returns:

Dictionary representation of the heartbeat state.

Return type:

dict[str, Any]

classmethod from_dict(data: 'dict[str, Any]') 'HeartbeatState' -> HeartbeatState[source]

Deserialize from dictionary.

Parameters:

data (dict[str, Any]) – Dictionary with heartbeat state fields.

Returns:

HeartbeatState instance.

Raises:

KeyError – If required fields are missing.

Return type:

HeartbeatState

__init__(self, timestamp: 'str', pid: 'int', hostname: 'str', metadata: 'dict[str, Any]' = <factory>) None -> None
exception kstlib.resilience.RateLimitError[source]

Bases: KstlibError, RuntimeError

Base exception for rate limiter errors.

exception kstlib.resilience.RateLimitExceededError(message, retry_after)[source]

Bases: RateLimitError

Raised when rate limit is exceeded and blocking is disabled.

retry_after

Seconds until a token will be available.

__init__(self, message: 'str', retry_after: 'float') 'None' -> None[source]

Initialize RateLimitExceededError.

Parameters:
  • message (str) – Human-readable error message.

  • retry_after (float) – Seconds until a token will be available.

class kstlib.resilience.RateLimiter(rate, per=1.0, *, burst=None, name=None)[source]

Bases: object

Token bucket rate limiter for controlling request throughput.

Implements the token bucket algorithm where tokens are added at a fixed rate and each request consumes one token. Allows bursts up to the bucket capacity.

Parameters:
  • rate (float) – Maximum number of tokens (requests) allowed per period.

  • per (float) – Time period in seconds (default 1.0 = per second).

  • burst (float | None) – Initial tokens available. If None, starts full (burst = rate).

  • name (str | None) – Optional name for logging and monitoring.

Examples

Basic usage - 10 requests per second:

>>> limiter = RateLimiter(rate=10, per=1.0)
>>> int(limiter.tokens)  # Starts full
10

With custom burst capacity:

>>> limiter = RateLimiter(rate=10, per=1.0, burst=5)
>>> int(limiter.tokens)  # Starts with 5 tokens
5

Rate limiting API calls:

>>> limiter = RateLimiter(rate=100, per=60.0)  # 100 per minute
>>> for _ in range(5):
...     if limiter.try_acquire():
...         pass  # call_api()
>>> limiter.stats.total_acquired
5
__init__(self, rate: 'float', per: 'float' = 1.0, *, burst: 'float | None' = None, name: 'str | None' = None) 'None' -> None[source]

Initialize rate limiter.

Parameters:
  • rate (float) – Maximum tokens (requests) per period.

  • per (float) – Period duration in seconds.

  • burst (float | None) – Initial token count. Defaults to rate (full bucket).

  • name (str | None) – Optional name for identification.

Raises:

ValueError – If rate or per is not positive.

property rate: float

Maximum tokens per period.

property per: float

Period duration in seconds.

property tokens: float

Current available tokens (after refill).

property stats: RateLimiterStats

Statistics for this rate limiter.

property name: str | None

Name of this rate limiter.

time_until_token(self) 'float' -> float[source]

Calculate time until at least 1 token will be available.

Returns:

Seconds until a token is available. Returns 0.0 if token available now.

Return type:

float

Examples

>>> limiter = RateLimiter(rate=10, per=1.0)
>>> limiter.time_until_token()  # Tokens available
0.0
acquire(self, *, blocking: 'bool' = True, timeout: 'float | None' = None) 'bool' -> bool[source]

Acquire a token from the bucket.

Parameters:
  • blocking (bool) – If True, wait until a token is available.

  • timeout (float | None) – Maximum time to wait in seconds (None = wait forever).

Returns:

True if token was acquired, False if non-blocking and no token.

Raises:

RateLimitExceededError – If timeout exceeded while waiting.

Return type:

bool

Examples

>>> limiter = RateLimiter(rate=10, per=1.0)
>>> limiter.acquire()  # Blocks if needed
True
>>> limiter.acquire(blocking=False)  # Returns immediately
True
try_acquire(self) 'bool' -> bool[source]

Try to acquire a token without blocking.

Returns:

True if token was acquired, False otherwise.

Return type:

bool

Examples

>>> limiter = RateLimiter(rate=2, per=1.0)
>>> limiter.try_acquire()
True
>>> limiter.try_acquire()
True
>>> limiter.try_acquire()  # No tokens left
False
async acquire_async(self, *, timeout: 'float | None' = None) 'bool' -> bool[source]

Acquire a token asynchronously.

Parameters:

timeout (float | None) – Maximum time to wait in seconds.

Returns:

True when token is acquired.

Raises:

RateLimitExceededError – If timeout exceeded.

Return type:

bool

Examples

>>> import asyncio
>>> limiter = RateLimiter(rate=10, per=1.0)
>>> asyncio.run(limiter.acquire_async())
True
reset(self) 'None' -> None[source]

Reset the rate limiter to full capacity.

Examples

>>> limiter = RateLimiter(rate=5, per=1.0)
>>> for _ in range(5):
...     limiter.try_acquire()
True
True
True
True
True
>>> limiter.try_acquire()
False
>>> limiter.reset()
>>> limiter.try_acquire()
True
__enter__(self) 'Self' -> Self[source]

Enter context manager, acquiring a token.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit context manager.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager, acquiring a token.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit async context manager.

__repr__(self) 'str' -> str[source]

Return string representation.

class kstlib.resilience.RateLimiterStats(total_acquired=0, total_rejected=0, total_waited=0.0)[source]

Bases: object

Statistics for rate limiter monitoring.

total_acquired

Total number of tokens successfully acquired.

Type:

int

total_rejected

Total number of acquire attempts that were rejected.

Type:

int

total_waited

Total time spent waiting for tokens (seconds).

Type:

float

Examples

>>> stats = RateLimiterStats()
>>> stats.record_acquired()
>>> stats.record_rejected()
>>> stats.record_wait(0.5)
>>> (stats.total_acquired, stats.total_rejected)
(1, 1)
>>> stats.total_waited
0.5
total_acquired: int = 0
total_rejected: int = 0
total_waited: float = 0.0
record_acquired(self) 'None' -> None[source]

Record a successful token acquisition.

record_rejected(self) 'None' -> None[source]

Record a rejected acquisition attempt.

record_wait(self, seconds: 'float') 'None' -> None[source]

Record time spent waiting for a token.

__init__(self, total_acquired: 'int' = 0, total_rejected: 'int' = 0, total_waited: 'float' = 0.0) None -> None
exception kstlib.resilience.ShutdownError[source]

Bases: KstlibError, RuntimeError

Raised when graceful shutdown encounters an error.

Examples include cleanup callback failure or timeout exceeded.

class kstlib.resilience.Watchdog(*, timeout=None, on_timeout=None, on_alert=None, name=None)[source]

Bases: object

Monitor thread/process health and detect freezes or hangs.

Implements a watchdog timer that must be periodically “pinged” to prevent timeout. If no ping is received within the timeout period, the on_timeout callback is invoked.

Parameters:
  • timeout (float | None) – Seconds of inactivity before triggering timeout. If None, uses config default (30s).

  • on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback invoked when timeout is detected. Can be sync or async function.

  • name (str | None) – Optional identifier for logging and monitoring.

Examples

Basic usage:

>>> watchdog = Watchdog(timeout=30)
>>> watchdog.timeout
30

With callback:

>>> def alert():
...     print("Watchdog triggered!")
>>> wd = Watchdog(timeout=10, on_timeout=alert, name="worker")
>>> wd.name
'worker'

As context manager:

>>> with Watchdog(timeout=30) as wd:  
...     wd.ping()
...     do_work()
__init__(self, *, timeout: 'float | None' = None, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, on_alert: 'OnAlertCallback | None' = None, name: 'str | None' = None) 'None' -> None[source]

Initialize watchdog.

Parameters:
  • timeout (float | None) – Seconds before timeout triggers. Clamped to [1, 3600].

  • on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback for timeout events (sync or async).

  • on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).

  • name (str | None) – Optional identifier.

property timeout: float

Timeout duration in seconds.

property name: str | None

Watchdog identifier.

property stats: WatchdogStats

Statistics for this watchdog.

property is_running: bool

Return True if watchdog is actively monitoring.

property is_triggered: bool

Return True if timeout has been triggered.

property seconds_since_ping: float

Return seconds since last ping.

property is_shutdown: bool

Check if shutdown has been requested.

property state_file: Path | None

Return the state file path if monitoring a heartbeat file.

classmethod from_state_file(state_file: 'str | Path', *, check_interval: 'float | None' = None, max_age: 'float' = 30.0, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, on_alert: 'OnAlertCallback | None' = None, name: 'str | None' = None) 'Self' -> Self[source]

Create a watchdog that monitors a heartbeat state file.

Instead of requiring periodic ping() calls, this watchdog checks if a heartbeat state file is being updated regularly.

Parameters:
  • state_file (str | Path) – Path to the heartbeat JSON state file.

  • check_interval (float | None) – Seconds between file checks (defaults to max_age/2).

  • max_age (float) – Maximum age in seconds before triggering timeout (default: 30s).

  • on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback for timeout events.

  • on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).

  • name (str | None) – Optional identifier.

Returns:

Configured Watchdog instance.

Return type:

Self

Examples

>>> wd = Watchdog.from_state_file(  
...     "/tmp/bot.heartbeat",
...     max_age=30.0,  # Trigger if no heartbeat for 30 seconds
...     on_timeout=restart_bot,
... )
>>> await wd.astart()  
shutdown(self) 'None' -> None[source]

Signal shutdown and stop gracefully.

async ashutdown(self) 'None' -> None[source]

Signal shutdown and stop gracefully (async version).

ping(self) 'None' -> None[source]

Reset the watchdog timer.

Call this periodically to indicate the monitored code is still alive. Must be called more frequently than the timeout interval.

Examples

>>> watchdog = Watchdog(timeout=30)
>>> watchdog.ping()  # Reset timer
async aping(self) 'None' -> None[source]

Async version of ping().

Examples

>>> import asyncio
>>> async def example():
...     watchdog = Watchdog(timeout=30)
...     await watchdog.aping()
>>> asyncio.run(example())
start(self) 'None' -> None[source]

Start watchdog monitoring in a background thread.

Raises:

RuntimeError – If watchdog is already running.

Examples

>>> watchdog = Watchdog(timeout=30)
>>> watchdog.start()
>>> watchdog.is_running
True
>>> watchdog.stop()
stop(self) 'None' -> None[source]

Stop watchdog monitoring.

Safe to call multiple times or when not running.

Examples

>>> watchdog = Watchdog(timeout=30)
>>> watchdog.start()
>>> watchdog.stop()
>>> watchdog.is_running
False
async astart(self) 'None' -> None[source]

Start watchdog monitoring asynchronously.

Raises:

RuntimeError – If watchdog is already running.

async astop(self) 'None' -> None[source]

Stop watchdog monitoring asynchronously.

Safe to call multiple times or when not running.

reset(self) 'None' -> None[source]

Reset watchdog state without stopping.

Clears triggered flag and resets timer.

__enter__(self) 'Self' -> Self[source]

Enter context manager, starting watchdog.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit context manager, stopping watchdog.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager, starting watchdog.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]

Exit async context manager, stopping watchdog.

__repr__(self) 'str' -> str[source]

Return string representation.

exception kstlib.resilience.WatchdogError[source]

Bases: KstlibError, RuntimeError

Base exception for watchdog errors.

class kstlib.resilience.WatchdogStats(pings_total=0, timeouts_triggered=0, last_ping_time=None, start_time=None)[source]

Bases: object

Statistics for watchdog monitoring.

pings_total

Total number of ping calls.

Type:

int

timeouts_triggered

Number of timeout events detected.

Type:

int

last_ping_time

Timestamp of last activity (monotonic).

Type:

float | None

start_time

Timestamp when watchdog started (monotonic).

Type:

float | None

Examples

>>> stats = WatchdogStats()
>>> stats.record_ping()
>>> stats.pings_total
1
pings_total: int = 0
timeouts_triggered: int = 0
last_ping_time: float | None = None
start_time: float | None = None
record_ping(self) 'None' -> None[source]

Record a ping event.

record_timeout(self) 'None' -> None[source]

Record a timeout event.

record_start(self) 'None' -> None[source]

Record watchdog start.

property uptime: float

Return seconds since watchdog started.

__init__(self, pings_total: 'int' = 0, timeouts_triggered: 'int' = 0, last_ping_time: 'float | None' = None, start_time: 'float | None' = None) None -> None
exception kstlib.resilience.WatchdogTimeoutError(message, seconds_inactive)[source]

Bases: WatchdogError

Raised when watchdog detects inactivity timeout.

seconds_inactive

Time since last ping/activity.

__init__(self, message: 'str', seconds_inactive: 'float') 'None' -> None[source]

Initialize WatchdogTimeoutError.

Parameters:
  • message (str) – Human-readable error message.

  • seconds_inactive (float) – Seconds since last activity.

kstlib.resilience.circuit_breaker(func: 'Callable[P, R] | None' = None, *, max_failures: 'int | None' = None, reset_timeout: 'float | None' = None, half_open_max_calls: 'int | None' = None, excluded_exceptions: 'tuple[type[Exception], ...]' = (), name: 'str | None' = None) 'Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]' -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]][source]

Circuit breaker decorator for functions.

Can be used with or without arguments:

Examples

Without arguments (uses config defaults):

>>> @circuit_breaker
... def api_call():  
...     pass

With arguments:

>>> @circuit_breaker(max_failures=3, reset_timeout=30)
... def api_call():  
...     pass

Exclude specific exceptions:

>>> @circuit_breaker(excluded_exceptions=(ValueError,))
... def validate():  
...     pass
kstlib.resilience.rate_limiter(fn: 'Callable[P, R] | None' = None, *, rate: 'float' = 10.0, per: 'float' = 1.0, burst: 'float | None' = None, blocking: 'bool' = True, timeout: 'float | None' = None, name: 'str | None' = None) 'Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]' -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]][source]

Rate limit calls to the decorated function.

Can be used with or without arguments:

  • @rate_limiter - Uses defaults (10 requests/second)

  • @rate_limiter(rate=5, per=1.0) - 5 requests per second

Parameters:
  • fn (Callable[P, R] | None) – Function to decorate (when used without parentheses).

  • rate (float) – Maximum calls per period (default 10).

  • per (float) – Period in seconds (default 1.0).

  • burst (float | None) – Initial capacity (default = rate).

  • blocking (bool) – If True, wait for token. If False, raise on limit.

  • timeout (float | None) – Maximum wait time in seconds.

  • name (str | None) – Name for the rate limiter.

Returns:

Decorated function that respects rate limits.

Raises:

RateLimitExceededError – If blocking=False and rate limit exceeded.

Return type:

Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]

Examples

Default rate limiting (10/sec):

>>> @rate_limiter
... def call_api():  
...     pass

Custom rate:

>>> @rate_limiter(rate=100, per=60.0)  # 100 per minute
... def call_api():  
...     pass

Non-blocking mode:

>>> @rate_limiter(rate=5, blocking=False)
... def fast_api():  
...     pass  # Raises RateLimitExceededError if limit hit
kstlib.resilience.watchdog_context(timeout: 'float | None' = None, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, *, raise_on_timeout: 'bool' = False, name: 'str | None' = None) 'Watchdog' -> Watchdog[source]

Create a watchdog context for monitoring code blocks.

This is a convenience function that creates a Watchdog instance. Use with ‘with’ statement for automatic start/stop.

Parameters:
  • timeout (float | None) – Seconds before timeout triggers.

  • on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Optional callback for timeout events.

  • raise_on_timeout (bool) – If True, raise WatchdogTimeoutError on timeout.

  • name (str | None) – Optional identifier.

Returns:

Watchdog instance for use as context manager.

Return type:

Watchdog

Examples

>>> with watchdog_context(timeout=30) as wd:  
...     for item in items:
...         wd.ping()
...         process(item)

Exceptions

Specialized exceptions raised by the kstlib.resilience module.

exception kstlib.resilience.exceptions.CircuitBreakerError[source]

Bases: KstlibError, RuntimeError

Base exception for circuit breaker errors.

exception kstlib.resilience.exceptions.CircuitOpenError(message, remaining_seconds)[source]

Bases: CircuitBreakerError

Raised when a call is attempted while the circuit is open.

remaining_seconds

Time until the circuit may transition to half-open.

__init__(self, message: 'str', remaining_seconds: 'float') 'None' -> None[source]

Initialize CircuitOpenError.

Parameters:
  • message (str) – Human-readable error message.

  • remaining_seconds (float) – Seconds until circuit may transition to half-open.

exception kstlib.resilience.exceptions.HeartbeatError[source]

Bases: KstlibError, RuntimeError

Raised when the heartbeat encounters an error.

Examples include state file write failure or invalid state file path.

exception kstlib.resilience.exceptions.RateLimitError[source]

Bases: KstlibError, RuntimeError

Base exception for rate limiter errors.

exception kstlib.resilience.exceptions.RateLimitExceededError(message, retry_after)[source]

Bases: RateLimitError

Raised when rate limit is exceeded and blocking is disabled.

retry_after

Seconds until a token will be available.

__init__(self, message: 'str', retry_after: 'float') 'None' -> None[source]

Initialize RateLimitExceededError.

Parameters:
  • message (str) – Human-readable error message.

  • retry_after (float) – Seconds until a token will be available.

exception kstlib.resilience.exceptions.ShutdownError[source]

Bases: KstlibError, RuntimeError

Raised when graceful shutdown encounters an error.

Examples include cleanup callback failure or timeout exceeded.

exception kstlib.resilience.exceptions.WatchdogError[source]

Bases: KstlibError, RuntimeError

Base exception for watchdog errors.

exception kstlib.resilience.exceptions.WatchdogTimeoutError(message, seconds_inactive)[source]

Bases: WatchdogError

Raised when watchdog detects inactivity timeout.

seconds_inactive

Time since last ping/activity.

__init__(self, message: 'str', seconds_inactive: 'float') 'None' -> None[source]

Initialize WatchdogTimeoutError.

Parameters:
  • message (str) – Human-readable error message.

  • seconds_inactive (float) – Seconds since last activity.