Resilience Utilities¶
Public API for fault tolerance patterns: circuit breaker, rate limiter, graceful shutdown, heartbeat monitoring, and watchdog. These components help build robust services that handle failures gracefully and support clean termination.
Tip
Pair this reference with Resilience for the feature guide.
Quick overview¶
CircuitBreakerimplements the circuit breaker pattern to prevent cascading failuresRateLimiterprovides token bucket rate limiting for request throttlingGracefulShutdownmanages prioritized cleanup callbacks on process terminationHeartbeatprovides file-based liveness signaling for external monitoringWatchdogdetects thread/process freezes with configurable timeout callbacksAll components support both sync and async usage patterns
Configuration follows the standard priority chain: constructor args >
kstlib.conf.yml> defaults
Configuration cascade¶
The module consults the loaded config for default values. A minimal config block:
resilience:
circuit_breaker:
max_failures: 5
reset_timeout: 60
shutdown:
default_timeout: 30
heartbeat:
interval: 10
watchdog:
timeout: 30
Override any of these per instance:
from kstlib.resilience import CircuitBreaker
cb = CircuitBreaker(max_failures=3, reset_timeout=10)
Usage patterns¶
Circuit breaker for external APIs¶
from kstlib.resilience import CircuitBreaker, CircuitOpenError
cb = CircuitBreaker(max_failures=3, reset_timeout=30)
try:
result = cb.call(api.fetch, endpoint="/data")
except CircuitOpenError:
result = fallback_value
Async circuit breaker¶
result = await cb.acall(async_api.fetch, symbol="BTC/USDT")
Rate limiter for API throttling¶
from kstlib.resilience import RateLimiter, rate_limiter
# Direct usage
limiter = RateLimiter(rate=10, per=1.0)
limiter.acquire() # Blocks until token available
call_api()
# As decorator
@rate_limiter(rate=100, per=60.0) # 100 per minute
def call_service(data: dict) -> dict:
return api.post(data)
Decorator syntax¶
from kstlib.resilience import circuit_breaker
@circuit_breaker(max_failures=3)
def call_service(data: dict) -> dict:
return external_api.post(data)
Graceful shutdown with priorities¶
from kstlib.resilience import GracefulShutdown
shutdown = GracefulShutdown()
shutdown.register("save", save_state, priority=10)
shutdown.register("close", close_db, priority=20)
shutdown.install() # Handle SIGTERM/SIGINT
Heartbeat monitoring¶
from kstlib.resilience import Heartbeat
heartbeat = Heartbeat(
state_file="/tmp/app.heartbeat",
interval=10,
metadata={"version": "1.0"}
)
heartbeat.start()
Watchdog for freeze detection¶
from kstlib.resilience import Watchdog
def on_freeze():
print("Thread appears frozen!")
with Watchdog(timeout=30, on_timeout=on_freeze) as wd:
for item in work_queue:
wd.ping() # Must call before timeout
process(item)
Module reference¶
Resilience utilities for fault-tolerant applications.
This module provides core components for building resilient systems:
Heartbeat: Periodic liveness signaling via state files
GracefulShutdown: Orderly shutdown with prioritized callbacks
CircuitBreaker: Protect against cascading failures
RateLimiter: Token bucket rate limiting for request throttling
Watchdog: Detect thread/process freezes and hangs
Examples
Heartbeat for process monitoring:
>>> from kstlib.resilience import Heartbeat
>>> with Heartbeat("/tmp/app.heartbeat") as hb:
... run_application()
>>> Heartbeat.is_alive("/tmp/app.heartbeat")
True
Graceful shutdown with cleanup:
>>> from kstlib.resilience import GracefulShutdown
>>> with GracefulShutdown() as shutdown:
... shutdown.register("db", close_database, priority=10)
... shutdown.register("cache", flush_cache, priority=20)
... run_application()
Circuit breaker for external calls:
>>> from kstlib.resilience import circuit_breaker
>>> @circuit_breaker(max_failures=3, reset_timeout=30)
... def call_external_api():
... return requests.get("http://api.example.com")
Rate limiting API calls:
>>> from kstlib.resilience import rate_limiter, RateLimiter
>>> @rate_limiter(rate=10, per=1.0) # 10 requests per second
... def call_api():
... return requests.get("http://api.example.com")
>>> limiter = RateLimiter(rate=100, per=60.0) # 100 per minute
>>> limiter.acquire()
True
Watchdog for freeze detection:
>>> from kstlib.resilience import Watchdog
>>> def on_freeze():
... print("Thread frozen!")
>>> with Watchdog(timeout=30, on_timeout=on_freeze) as wd:
... while running:
... wd.ping()
... do_work()
- class kstlib.resilience.CircuitBreaker(*, max_failures=None, reset_timeout=None, half_open_max_calls=None, excluded_exceptions=(), name=None)[source]
Bases:
objectCircuit breaker for protecting against cascading failures.
Implements the circuit breaker pattern to prevent repeated calls to a failing service and allow recovery time.
- Parameters:
max_failures (int | None) – Failures before opening circuit (default from config).
reset_timeout (float | None) – Seconds before attempting recovery (default from config).
half_open_max_calls (int | None) – Calls allowed in half-open state (default from config).
excluded_exceptions (tuple[type[Exception], ...]) – Exceptions that don’t count as failures.
name (str | None) – Optional name for the circuit breaker.
Examples
As a decorator:
>>> @circuit_breaker ... def call_api(): ... return requests.get("http://api.example.com")
With custom settings:
>>> @circuit_breaker(max_failures=3, reset_timeout=30) ... def risky_call(): ... pass
Direct instantiation:
>>> cb = CircuitBreaker(max_failures=5) >>> cb.state <CircuitState.CLOSED: 1>
- __init__(self, *, max_failures: 'int | None' = None, reset_timeout: 'float | None' = None, half_open_max_calls: 'int | None' = None, excluded_exceptions: 'tuple[type[Exception], ...]' = (), name: 'str | None' = None) 'None' -> None[source]
Initialize circuit breaker.
- Parameters:
max_failures (int | None) – Failures before opening circuit. Uses config if None.
reset_timeout (float | None) – Seconds before attempting recovery. Uses config if None.
half_open_max_calls (int | None) – Calls allowed in half-open state. Uses config if None.
excluded_exceptions (tuple[type[Exception], ...]) – Exceptions that don’t count as failures.
name (str | None) – Optional name for the circuit breaker.
- property state: CircuitState
Return the current circuit state.
- property stats: CircuitStats
Return circuit breaker statistics.
- property failure_count: int
Return current failure count.
- call(self, func: 'Callable[P, R]', *args: 'P.args', **kwargs: 'P.kwargs') 'R' -> R[source]
Execute a function through the circuit breaker.
- Parameters:
func (Callable[P, R]) – Function to execute.
*args (P.args) – Positional arguments for the function.
**kwargs (P.kwargs) – Keyword arguments for the function.
- Returns:
Function result.
- Raises:
CircuitOpenError – If circuit is open.
- Return type:
R
Examples
>>> cb = CircuitBreaker() >>> result = cb.call(lambda x: x * 2, 5) >>> result 10
- async acall(self, func: 'Callable[P, Awaitable[R]]', *args: 'P.args', **kwargs: 'P.kwargs') 'R' -> R[source]
Execute an async function through the circuit breaker.
- Parameters:
func (Callable[P, Awaitable[R]]) – Async function to execute.
*args (P.args) – Positional arguments for the function.
**kwargs (P.kwargs) – Keyword arguments for the function.
- Returns:
Function result.
- Raises:
CircuitOpenError – If circuit is open.
- Return type:
R
Examples
>>> import asyncio >>> cb = CircuitBreaker() >>> async def double(x): return x * 2 >>> asyncio.run(cb.acall(double, 5)) 10
- reset(self) 'None' -> None[source]
Manually reset the circuit breaker to closed state.
Examples
>>> cb = CircuitBreaker(max_failures=1) >>> try: ... cb.call(lambda: 1/0) ... except ZeroDivisionError: ... pass >>> cb.state.name 'OPEN' >>> cb.reset() >>> cb.state.name 'CLOSED'
- __call__(self, func: 'Callable[P, R]') 'Callable[P, R] | Callable[P, Awaitable[R]]' -> Callable[P, R] | Callable[P, Awaitable[R]][source]
Use circuit breaker as a decorator.
- Parameters:
func (Callable[P, R]) – Function to wrap.
- Returns:
Wrapped function with circuit breaker protection.
- Return type:
Callable[P, R] | Callable[P, Awaitable[R]]
- exception kstlib.resilience.CircuitBreakerError[source]
Bases:
KstlibError,RuntimeErrorBase exception for circuit breaker errors.
- exception kstlib.resilience.CircuitOpenError(message, remaining_seconds)[source]
Bases:
CircuitBreakerErrorRaised when a call is attempted while the circuit is open.
- remaining_seconds
Time until the circuit may transition to half-open.
- class kstlib.resilience.CircuitState(value)[source]
Bases:
EnumState of the circuit breaker.
- States:
CLOSED: Normal operation, requests pass through. OPEN: Circuit tripped, requests fail immediately. HALF_OPEN: Testing if service recovered.
- CLOSED = 1
- OPEN = 2
- HALF_OPEN = 3
- class kstlib.resilience.CircuitStats(total_calls=0, successful_calls=0, failed_calls=0, rejected_calls=0, state_changes=0)[source]
Bases:
objectStatistics for circuit breaker monitoring.
- total_calls
Total number of calls attempted.
- Type:
- successful_calls
Number of successful calls.
- Type:
- failed_calls
Number of failed calls.
- Type:
- rejected_calls
Number of calls rejected due to open circuit.
- Type:
- state_changes
Number of state transitions.
- Type:
Examples
>>> stats = CircuitStats() >>> stats.record_success() >>> stats.record_failure() >>> stats.record_rejection() >>> (stats.successful_calls, stats.failed_calls, stats.rejected_calls) (1, 1, 1) >>> stats.total_calls 3
- total_calls: int = 0
- successful_calls: int = 0
- failed_calls: int = 0
- rejected_calls: int = 0
- state_changes: int = 0
- record_success(self) 'None' -> None[source]
Record a successful call.
- record_failure(self) 'None' -> None[source]
Record a failed call.
- record_rejection(self) 'None' -> None[source]
Record a rejected call (circuit open).
- record_state_change(self) 'None' -> None[source]
Record a state transition.
- __init__(self, total_calls: 'int' = 0, successful_calls: 'int' = 0, failed_calls: 'int' = 0, rejected_calls: 'int' = 0, state_changes: 'int' = 0) None -> None
- class kstlib.resilience.CleanupCallback(name, callback, priority=100, timeout=None, is_async=False)[source]
Bases:
objectRegistered cleanup callback with metadata.
- name
Unique identifier for the callback.
- Type:
- callback
The cleanup function (sync or async).
- Type:
collections.abc.Callable[[], None] | collections.abc.Callable[[], collections.abc.Coroutine[Any, Any, None]]
- priority
Execution order (lower runs first, default 100).
- Type:
- timeout
Per-callback timeout in seconds (None = use global).
- Type:
float | None
- is_async
Whether the callback is async.
- Type:
Examples
>>> cb = CleanupCallback( ... name="db_close", ... callback=lambda: None, ... priority=50, ... timeout=5.0, ... is_async=False, ... ) >>> (cb.name, cb.priority, cb.timeout) ('db_close', 50, 5.0)
- name: str
- priority: int
- is_async: bool
- __init__(self, name: 'str', callback: 'Callback', priority: 'int' = 100, timeout: 'float | None' = None, is_async: 'bool' = False) None -> None
- class kstlib.resilience.GracefulShutdown(*, timeout=None, signals=None, force_exit_code=1)[source]
Bases:
objectGraceful shutdown handler with prioritized cleanup callbacks.
Manages orderly shutdown on SIGTERM/SIGINT with timeout enforcement. Callbacks execute in priority order (lower = first).
- Parameters:
timeout (float | None) – Total timeout for all callbacks (default from config).
signals (tuple[signal.Signals, ...] | None) – Signals to handle (default: SIGTERM, SIGINT).
force_exit_code (int) – Exit code when timeout exceeded (default: 1).
Examples
Register callbacks with priority ordering:
>>> shutdown = GracefulShutdown(timeout=30) >>> shutdown.register("cache", lambda: None, priority=20) >>> shutdown.register("db", lambda: None, priority=10) >>> [cb.name for cb in shutdown._get_sorted_callbacks()] ['db', 'cache']
Context manager usage (with signals):
>>> with GracefulShutdown() as shutdown: ... shutdown.register("cleanup", close_resources) ... run_application()
- __init__(self, *, timeout: 'float | None' = None, signals: 'tuple[signal.Signals, ...] | None' = None, force_exit_code: 'int' = 1) 'None' -> None[source]
Initialize graceful shutdown handler.
- property timeout: float
Return the total shutdown timeout in seconds.
- property is_shutting_down: bool
Return True if shutdown is in progress.
- property is_installed: bool
Return True if signal handlers are installed.
- register(self, name: 'str', callback: 'Callback', *, priority: 'int' = 100, timeout: 'float | None' = None) 'None' -> None[source]
Register a cleanup callback.
- Parameters:
- Raises:
ShutdownError – If name already registered or shutdown in progress.
Examples
>>> shutdown = GracefulShutdown() >>> shutdown.register("db", lambda: print("closing db"), priority=10) >>> "db" in [cb.name for cb in shutdown._callbacks.values()] True
- unregister(self, name: 'str') 'bool' -> bool[source]
Unregister a cleanup callback.
- Parameters:
name (str) – Identifier of callback to remove.
- Returns:
True if callback was removed, False if not found.
- Return type:
Examples
>>> shutdown = GracefulShutdown() >>> shutdown.register("test", lambda: None) >>> shutdown.unregister("test") True >>> shutdown.unregister("nonexistent") False
- install(self) 'None' -> None[source]
Install signal handlers.
- Raises:
ShutdownError – If handlers already installed.
- uninstall(self) 'None' -> None[source]
Restore original signal handlers.
- trigger(self) 'None' -> None[source]
Trigger shutdown programmatically.
Useful for testing or triggering shutdown from code. Runs callbacks synchronously in priority order.
- async atrigger(self) 'None' -> None[source]
Trigger shutdown programmatically (async version).
Runs callbacks asynchronously in priority order.
- wait(self, timeout: 'float | None' = None) 'bool' -> bool[source]
Wait for shutdown signal.
- async await_shutdown(self, timeout: 'float | None' = None) 'bool' -> bool[source]
Wait for shutdown signal (async version).
- __enter__(self) 'Self' -> Self[source]
Enter sync context manager.
- __exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]
Exit sync context manager.
- async __aenter__(self) 'Self' -> Self[source]
Enter async context manager.
- async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]
Exit async context manager.
- class kstlib.resilience.Heartbeat(state_file=None, *, interval=None, on_missed_beat=None, on_alert=None, target=None, on_target_dead=None, on_beat=None, metadata=None)[source]
Bases:
objectPeriodic signal to indicate the process is alive.
Writes timestamp to a JSON state file at configurable intervals. Supports both sync and async context managers.
- Parameters:
state_file (str | Path | None) – Path to the heartbeat state file. If None, no file is written (useful when using on_beat callback for state management).
interval (float | None) – Seconds between heartbeats (default from config or 10s).
on_missed_beat (Callable[[Exception], None] | None) – Callback invoked when a beat write fails.
on_alert (OnAlertCallback | None) – Callback for alerting (channel, message, context).
target (HeartbeatTarget | None) – Optional object with is_dead property to monitor.
on_target_dead (Callable[[], Awaitable[None] | None] | None) – Callback invoked when target is detected as dead.
on_beat (Callable[[], Awaitable[None] | None] | None) – Callback invoked after each successful beat. Can be sync or async. Use this to delegate state writing to an external component.
metadata (dict[str, Any] | None) – Optional dict included in each heartbeat.
Examples
Sync context manager:
>>> with Heartbeat("/tmp/bot.heartbeat") as hb: ... do_work()
Async context manager:
>>> async with Heartbeat("/tmp/bot.heartbeat") as hb: ... await do_async_work()
Check if a process is alive:
>>> Heartbeat.is_alive("/tmp/bot.heartbeat", max_age_seconds=30) True
Monitor a WebSocket:
>>> hb = Heartbeat( ... "/tmp/bot.heartbeat", ... target=ws_manager, ... on_target_dead=lambda: restart_ws(), ... )
- __init__(self, state_file: 'str | Path | None' = None, *, interval: 'float | None' = None, on_missed_beat: 'Callable[[Exception], None] | None' = None, on_alert: 'OnAlertCallback | None' = None, target: 'HeartbeatTarget | None' = None, on_target_dead: 'Callable[[], Awaitable[None] | None] | None' = None, on_beat: 'Callable[[], Awaitable[None] | None] | None' = None, metadata: 'dict[str, Any] | None' = None) 'None' -> None[source]
Initialize heartbeat.
- Parameters:
state_file (Path | str | None) – Path to the heartbeat state file. If None, no file is written.
interval (float | None) – Seconds between heartbeats. Uses config default if None.
on_missed_beat (Callable[[Exception], None] | None) – Callback invoked when a beat write fails.
on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).
target (HeartbeatTarget | None) – Optional object with is_dead property to monitor.
on_target_dead (Callable[[], Awaitable[None] | None] | None) – Callback invoked when target is detected as dead.
on_beat (Callable[[], Awaitable[None] | None] | None) – Callback invoked after each successful beat.
metadata (dict[str, Any] | None) – Optional dict included in each heartbeat.
- property interval: float
Return the heartbeat interval in seconds.
- property is_shutdown: bool
Check if shutdown has been requested.
- property target: HeartbeatTarget | None
Return the monitored target, if any.
- shutdown(self) 'None' -> None[source]
Signal shutdown and stop gracefully.
Sets the shutdown flag which can be checked by external code to know that we’re shutting down intentionally.
- async ashutdown(self) 'None' -> None[source]
Signal shutdown and stop gracefully (async version).
- start(self) 'None' -> None[source]
Start the heartbeat background thread.
- Raises:
HeartbeatError – If heartbeat is already running.
- stop(self) 'None' -> None[source]
Stop the heartbeat and clean up.
Safe to call multiple times or if not started.
- beat(self) 'None' -> None[source]
Write a heartbeat immediately (manual trigger).
If state_file is configured, writes to file. If on_beat callback is configured, it will be invoked by the loop (not here).
- Raises:
HeartbeatError – If state file is configured and cannot be written.
- async astart(self) 'None' -> None[source]
Start the heartbeat using asyncio (async version).
- Raises:
HeartbeatError – If heartbeat is already running.
- async astop(self) 'None' -> None[source]
Stop the heartbeat (async version).
Safe to call multiple times or if not started.
- static read_state(state_file: 'str | Path') 'HeartbeatState | None' -> HeartbeatState | None[source]
Read and parse an existing heartbeat state file.
- Parameters:
- Returns:
HeartbeatState if file exists and is valid, None otherwise.
- Return type:
HeartbeatState | None
Examples
>>> state = Heartbeat.read_state("/tmp/bot.heartbeat") >>> if state: ... print(f"Last beat: {state.timestamp}")
- static is_alive(state_file: 'str | Path', max_age_seconds: 'float' = 30.0) 'bool' -> bool[source]
Check if a process is alive based on its heartbeat.
- Parameters:
- Returns:
True if heartbeat exists and is recent enough.
- Return type:
Examples
>>> Heartbeat.is_alive("/tmp/bot.heartbeat", max_age_seconds=30) True
- __enter__(self) 'Self' -> Self[source]
Enter sync context manager.
- __exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]
Exit sync context manager.
- async __aenter__(self) 'Self' -> Self[source]
Enter async context manager.
- async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]
Exit async context manager.
- exception kstlib.resilience.HeartbeatError[source]
Bases:
KstlibError,RuntimeErrorRaised when the heartbeat encounters an error.
Examples include state file write failure or invalid state file path.
- class kstlib.resilience.HeartbeatState(timestamp, pid, hostname, metadata=<factory>)[source]
Bases:
objectRepresents the state written to the heartbeat file.
- timestamp
Last heartbeat time (ISO 8601 UTC).
- Type:
- pid
Process ID.
- Type:
- hostname
Machine hostname.
- Type:
Examples
>>> state = HeartbeatState( ... timestamp="2026-01-12T10:00:00+00:00", ... pid=1234, ... hostname="myhost", ... ) >>> state.pid 1234
- timestamp: str
- pid: int
- hostname: str
- to_dict(self) 'dict[str, Any]' -> dict[str, Any][source]
Serialize to JSON-compatible dictionary.
- classmethod from_dict(data: 'dict[str, Any]') 'HeartbeatState' -> HeartbeatState[source]
Deserialize from dictionary.
- __init__(self, timestamp: 'str', pid: 'int', hostname: 'str', metadata: 'dict[str, Any]' = <factory>) None -> None
- exception kstlib.resilience.RateLimitError[source]
Bases:
KstlibError,RuntimeErrorBase exception for rate limiter errors.
- exception kstlib.resilience.RateLimitExceededError(message, retry_after)[source]
Bases:
RateLimitErrorRaised when rate limit is exceeded and blocking is disabled.
- retry_after
Seconds until a token will be available.
- class kstlib.resilience.RateLimiter(rate, per=1.0, *, burst=None, name=None)[source]
Bases:
objectToken bucket rate limiter for controlling request throughput.
Implements the token bucket algorithm where tokens are added at a fixed rate and each request consumes one token. Allows bursts up to the bucket capacity.
- Parameters:
Examples
Basic usage - 10 requests per second:
>>> limiter = RateLimiter(rate=10, per=1.0) >>> int(limiter.tokens) # Starts full 10
With custom burst capacity:
>>> limiter = RateLimiter(rate=10, per=1.0, burst=5) >>> int(limiter.tokens) # Starts with 5 tokens 5
Rate limiting API calls:
>>> limiter = RateLimiter(rate=100, per=60.0) # 100 per minute >>> for _ in range(5): ... if limiter.try_acquire(): ... pass # call_api() >>> limiter.stats.total_acquired 5
- __init__(self, rate: 'float', per: 'float' = 1.0, *, burst: 'float | None' = None, name: 'str | None' = None) 'None' -> None[source]
Initialize rate limiter.
- Parameters:
- Raises:
ValueError – If rate or per is not positive.
- property rate: float
Maximum tokens per period.
- property per: float
Period duration in seconds.
- property tokens: float
Current available tokens (after refill).
- property stats: RateLimiterStats
Statistics for this rate limiter.
- time_until_token(self) 'float' -> float[source]
Calculate time until at least 1 token will be available.
- Returns:
Seconds until a token is available. Returns 0.0 if token available now.
- Return type:
Examples
>>> limiter = RateLimiter(rate=10, per=1.0) >>> limiter.time_until_token() # Tokens available 0.0
- acquire(self, *, blocking: 'bool' = True, timeout: 'float | None' = None) 'bool' -> bool[source]
Acquire a token from the bucket.
- Parameters:
- Returns:
True if token was acquired, False if non-blocking and no token.
- Raises:
RateLimitExceededError – If timeout exceeded while waiting.
- Return type:
Examples
>>> limiter = RateLimiter(rate=10, per=1.0) >>> limiter.acquire() # Blocks if needed True >>> limiter.acquire(blocking=False) # Returns immediately True
- try_acquire(self) 'bool' -> bool[source]
Try to acquire a token without blocking.
- Returns:
True if token was acquired, False otherwise.
- Return type:
Examples
>>> limiter = RateLimiter(rate=2, per=1.0) >>> limiter.try_acquire() True >>> limiter.try_acquire() True >>> limiter.try_acquire() # No tokens left False
- async acquire_async(self, *, timeout: 'float | None' = None) 'bool' -> bool[source]
Acquire a token asynchronously.
- Parameters:
timeout (float | None) – Maximum time to wait in seconds.
- Returns:
True when token is acquired.
- Raises:
RateLimitExceededError – If timeout exceeded.
- Return type:
Examples
>>> import asyncio >>> limiter = RateLimiter(rate=10, per=1.0) >>> asyncio.run(limiter.acquire_async()) True
- reset(self) 'None' -> None[source]
Reset the rate limiter to full capacity.
Examples
>>> limiter = RateLimiter(rate=5, per=1.0) >>> for _ in range(5): ... limiter.try_acquire() True True True True True >>> limiter.try_acquire() False >>> limiter.reset() >>> limiter.try_acquire() True
- __enter__(self) 'Self' -> Self[source]
Enter context manager, acquiring a token.
- __exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]
Exit context manager.
- async __aenter__(self) 'Self' -> Self[source]
Enter async context manager, acquiring a token.
- async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]
Exit async context manager.
- __repr__(self) 'str' -> str[source]
Return string representation.
- class kstlib.resilience.RateLimiterStats(total_acquired=0, total_rejected=0, total_waited=0.0)[source]
Bases:
objectStatistics for rate limiter monitoring.
- total_acquired
Total number of tokens successfully acquired.
- Type:
- total_rejected
Total number of acquire attempts that were rejected.
- Type:
- total_waited
Total time spent waiting for tokens (seconds).
- Type:
Examples
>>> stats = RateLimiterStats() >>> stats.record_acquired() >>> stats.record_rejected() >>> stats.record_wait(0.5) >>> (stats.total_acquired, stats.total_rejected) (1, 1) >>> stats.total_waited 0.5
- total_acquired: int = 0
- total_rejected: int = 0
- total_waited: float = 0.0
- record_acquired(self) 'None' -> None[source]
Record a successful token acquisition.
- record_rejected(self) 'None' -> None[source]
Record a rejected acquisition attempt.
- record_wait(self, seconds: 'float') 'None' -> None[source]
Record time spent waiting for a token.
- __init__(self, total_acquired: 'int' = 0, total_rejected: 'int' = 0, total_waited: 'float' = 0.0) None -> None
- exception kstlib.resilience.ShutdownError[source]
Bases:
KstlibError,RuntimeErrorRaised when graceful shutdown encounters an error.
Examples include cleanup callback failure or timeout exceeded.
- class kstlib.resilience.Watchdog(*, timeout=None, on_timeout=None, on_alert=None, name=None)[source]
Bases:
objectMonitor thread/process health and detect freezes or hangs.
Implements a watchdog timer that must be periodically “pinged” to prevent timeout. If no ping is received within the timeout period, the on_timeout callback is invoked.
- Parameters:
timeout (float | None) – Seconds of inactivity before triggering timeout. If None, uses config default (30s).
on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback invoked when timeout is detected. Can be sync or async function.
name (str | None) – Optional identifier for logging and monitoring.
Examples
Basic usage:
>>> watchdog = Watchdog(timeout=30) >>> watchdog.timeout 30
With callback:
>>> def alert(): ... print("Watchdog triggered!") >>> wd = Watchdog(timeout=10, on_timeout=alert, name="worker") >>> wd.name 'worker'
As context manager:
>>> with Watchdog(timeout=30) as wd: ... wd.ping() ... do_work()
- __init__(self, *, timeout: 'float | None' = None, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, on_alert: 'OnAlertCallback | None' = None, name: 'str | None' = None) 'None' -> None[source]
Initialize watchdog.
- Parameters:
timeout (float | None) – Seconds before timeout triggers. Clamped to [1, 3600].
on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback for timeout events (sync or async).
on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).
name (str | None) – Optional identifier.
- property timeout: float
Timeout duration in seconds.
- property stats: WatchdogStats
Statistics for this watchdog.
- property is_running: bool
Return True if watchdog is actively monitoring.
- property is_triggered: bool
Return True if timeout has been triggered.
- property seconds_since_ping: float
Return seconds since last ping.
- property is_shutdown: bool
Check if shutdown has been requested.
- classmethod from_state_file(state_file: 'str | Path', *, check_interval: 'float | None' = None, max_age: 'float' = 30.0, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, on_alert: 'OnAlertCallback | None' = None, name: 'str | None' = None) 'Self' -> Self[source]
Create a watchdog that monitors a heartbeat state file.
Instead of requiring periodic ping() calls, this watchdog checks if a heartbeat state file is being updated regularly.
- Parameters:
state_file (str | Path) – Path to the heartbeat JSON state file.
check_interval (float | None) – Seconds between file checks (defaults to max_age/2).
max_age (float) – Maximum age in seconds before triggering timeout (default: 30s).
on_timeout (Callable[[], None] | Callable[[], Awaitable[None]] | None) – Callback for timeout events.
on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).
name (str | None) – Optional identifier.
- Returns:
Configured Watchdog instance.
- Return type:
Self
Examples
>>> wd = Watchdog.from_state_file( ... "/tmp/bot.heartbeat", ... max_age=30.0, # Trigger if no heartbeat for 30 seconds ... on_timeout=restart_bot, ... ) >>> await wd.astart()
- shutdown(self) 'None' -> None[source]
Signal shutdown and stop gracefully.
- async ashutdown(self) 'None' -> None[source]
Signal shutdown and stop gracefully (async version).
- ping(self) 'None' -> None[source]
Reset the watchdog timer.
Call this periodically to indicate the monitored code is still alive. Must be called more frequently than the timeout interval.
Examples
>>> watchdog = Watchdog(timeout=30) >>> watchdog.ping() # Reset timer
- async aping(self) 'None' -> None[source]
Async version of ping().
Examples
>>> import asyncio >>> async def example(): ... watchdog = Watchdog(timeout=30) ... await watchdog.aping() >>> asyncio.run(example())
- start(self) 'None' -> None[source]
Start watchdog monitoring in a background thread.
- Raises:
RuntimeError – If watchdog is already running.
Examples
>>> watchdog = Watchdog(timeout=30) >>> watchdog.start() >>> watchdog.is_running True >>> watchdog.stop()
- stop(self) 'None' -> None[source]
Stop watchdog monitoring.
Safe to call multiple times or when not running.
Examples
>>> watchdog = Watchdog(timeout=30) >>> watchdog.start() >>> watchdog.stop() >>> watchdog.is_running False
- async astart(self) 'None' -> None[source]
Start watchdog monitoring asynchronously.
- Raises:
RuntimeError – If watchdog is already running.
- async astop(self) 'None' -> None[source]
Stop watchdog monitoring asynchronously.
Safe to call multiple times or when not running.
- reset(self) 'None' -> None[source]
Reset watchdog state without stopping.
Clears triggered flag and resets timer.
- __enter__(self) 'Self' -> Self[source]
Enter context manager, starting watchdog.
- __exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]
Exit context manager, stopping watchdog.
- async __aenter__(self) 'Self' -> Self[source]
Enter async context manager, starting watchdog.
- async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'object') 'None' -> None[source]
Exit async context manager, stopping watchdog.
- __repr__(self) 'str' -> str[source]
Return string representation.
- exception kstlib.resilience.WatchdogError[source]
Bases:
KstlibError,RuntimeErrorBase exception for watchdog errors.
- class kstlib.resilience.WatchdogStats(pings_total=0, timeouts_triggered=0, last_ping_time=None, start_time=None)[source]
Bases:
objectStatistics for watchdog monitoring.
- pings_total
Total number of ping calls.
- Type:
- timeouts_triggered
Number of timeout events detected.
- Type:
- last_ping_time
Timestamp of last activity (monotonic).
- Type:
float | None
- start_time
Timestamp when watchdog started (monotonic).
- Type:
float | None
Examples
>>> stats = WatchdogStats() >>> stats.record_ping() >>> stats.pings_total 1
- pings_total: int = 0
- timeouts_triggered: int = 0
- record_ping(self) 'None' -> None[source]
Record a ping event.
- record_timeout(self) 'None' -> None[source]
Record a timeout event.
- record_start(self) 'None' -> None[source]
Record watchdog start.
- property uptime: float
Return seconds since watchdog started.
- __init__(self, pings_total: 'int' = 0, timeouts_triggered: 'int' = 0, last_ping_time: 'float | None' = None, start_time: 'float | None' = None) None -> None
- exception kstlib.resilience.WatchdogTimeoutError(message, seconds_inactive)[source]
Bases:
WatchdogErrorRaised when watchdog detects inactivity timeout.
- seconds_inactive
Time since last ping/activity.
- kstlib.resilience.circuit_breaker(func: 'Callable[P, R] | None' = None, *, max_failures: 'int | None' = None, reset_timeout: 'float | None' = None, half_open_max_calls: 'int | None' = None, excluded_exceptions: 'tuple[type[Exception], ...]' = (), name: 'str | None' = None) 'Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]' -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]][source]
Circuit breaker decorator for functions.
Can be used with or without arguments:
Examples
Without arguments (uses config defaults):
>>> @circuit_breaker ... def api_call(): ... pass
With arguments:
>>> @circuit_breaker(max_failures=3, reset_timeout=30) ... def api_call(): ... pass
Exclude specific exceptions:
>>> @circuit_breaker(excluded_exceptions=(ValueError,)) ... def validate(): ... pass
- kstlib.resilience.rate_limiter(fn: 'Callable[P, R] | None' = None, *, rate: 'float' = 10.0, per: 'float' = 1.0, burst: 'float | None' = None, blocking: 'bool' = True, timeout: 'float | None' = None, name: 'str | None' = None) 'Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]' -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]][source]
Rate limit calls to the decorated function.
Can be used with or without arguments:
@rate_limiter- Uses defaults (10 requests/second)@rate_limiter(rate=5, per=1.0)- 5 requests per second
- Parameters:
fn (Callable[P, R] | None) – Function to decorate (when used without parentheses).
rate (float) – Maximum calls per period (default 10).
per (float) – Period in seconds (default 1.0).
burst (float | None) – Initial capacity (default = rate).
blocking (bool) – If True, wait for token. If False, raise on limit.
timeout (float | None) – Maximum wait time in seconds.
name (str | None) – Name for the rate limiter.
- Returns:
Decorated function that respects rate limits.
- Raises:
RateLimitExceededError – If blocking=False and rate limit exceeded.
- Return type:
Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]
Examples
Default rate limiting (10/sec):
>>> @rate_limiter ... def call_api(): ... pass
Custom rate:
>>> @rate_limiter(rate=100, per=60.0) # 100 per minute ... def call_api(): ... pass
Non-blocking mode:
>>> @rate_limiter(rate=5, blocking=False) ... def fast_api(): ... pass # Raises RateLimitExceededError if limit hit
- kstlib.resilience.watchdog_context(timeout: 'float | None' = None, on_timeout: 'Callable[[], None] | Callable[[], Awaitable[None]] | None' = None, *, raise_on_timeout: 'bool' = False, name: 'str | None' = None) 'Watchdog' -> Watchdog[source]
Create a watchdog context for monitoring code blocks.
This is a convenience function that creates a Watchdog instance. Use with ‘with’ statement for automatic start/stop.
- Parameters:
- Returns:
Watchdog instance for use as context manager.
- Return type:
Watchdog
Examples
>>> with watchdog_context(timeout=30) as wd: ... for item in items: ... wd.ping() ... process(item)
Exceptions¶
Specialized exceptions raised by the kstlib.resilience module.
- exception kstlib.resilience.exceptions.CircuitBreakerError[source]
Bases:
KstlibError,RuntimeErrorBase exception for circuit breaker errors.
- exception kstlib.resilience.exceptions.CircuitOpenError(message, remaining_seconds)[source]
Bases:
CircuitBreakerErrorRaised when a call is attempted while the circuit is open.
- remaining_seconds
Time until the circuit may transition to half-open.
- exception kstlib.resilience.exceptions.HeartbeatError[source]
Bases:
KstlibError,RuntimeErrorRaised when the heartbeat encounters an error.
Examples include state file write failure or invalid state file path.
- exception kstlib.resilience.exceptions.RateLimitError[source]
Bases:
KstlibError,RuntimeErrorBase exception for rate limiter errors.
- exception kstlib.resilience.exceptions.RateLimitExceededError(message, retry_after)[source]
Bases:
RateLimitErrorRaised when rate limit is exceeded and blocking is disabled.
- retry_after
Seconds until a token will be available.
- exception kstlib.resilience.exceptions.ShutdownError[source]
Bases:
KstlibError,RuntimeErrorRaised when graceful shutdown encounters an error.
Examples include cleanup callback failure or timeout exceeded.
- exception kstlib.resilience.exceptions.WatchdogError[source]
Bases:
KstlibError,RuntimeErrorBase exception for watchdog errors.
- exception kstlib.resilience.exceptions.WatchdogTimeoutError(message, seconds_inactive)[source]
Bases:
WatchdogErrorRaised when watchdog detects inactivity timeout.
- seconds_inactive
Time since last ping/activity.