WebSocket Subsystem

Async WebSocket client with proactive connection control and auto-reconnection.

Tip

Pair this reference with WebSocket for the feature guide.

Quick Overview

  • WebSocketManager provides proactive control over connection lifecycle.

  • ConnectionState tracks the connection state machine.

  • DisconnectReason categorizes proactive vs reactive disconnections.

  • ReconnectStrategy defines reconnection behavior.

  • WebSocketStats tracks connection and message statistics.


Core Components

WebSocketManager

class kstlib.websocket.manager.WebSocketManager(url, *, should_disconnect=None, should_reconnect=None, on_connect=None, on_disconnect=None, on_disconnect_alert=None, on_message=None, on_alert=None, ping_interval=None, ping_timeout=None, connection_timeout=None, reconnect_strategy=ReconnectStrategy.EXPONENTIAL_BACKOFF, reconnect_delay=None, max_reconnect_delay=None, max_reconnect_attempts=None, auto_reconnect=True, stable_connection_time=None, server_unavailable_delay=None, disconnect_check_interval=None, reconnect_check_interval=None, disconnect_margin=None, disconnect_alert_interval=None, queue_size=None, config=None)[source]

Bases: object

Async WebSocket manager with proactive connection control.

This manager provides both reactive (auto-reconnect on failure) and proactive (user-controlled disconnect/reconnect) connection management.

The proactive control feature is the key differentiator: instead of just reacting to disconnections, you can control WHEN to disconnect and reconnect. This is essential for trading where you want to avoid mid-operation cuts.

url

WebSocket server URL.

state

Current connection state.

stats

Connection and message statistics.

Examples

Basic streaming:

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         async for msg in ws.stream():
...             print(msg)

Proactive control:

>>> async def trading():  
...     ws = WebSocketManager(
...         url="wss://stream.binance.com/ws",
...         should_disconnect=lambda: not is_critical_time(),
...         should_reconnect=lambda: is_approaching_candle(),
...     )
...     async with ws:
...         await ws.subscribe("btcusdt@kline_4h")
...         async for msg in ws.stream():
...             process(msg)
__init__(self, url: 'str', *, should_disconnect: 'ShouldDisconnectCallback | None' = None, should_reconnect: 'ShouldReconnectCallback | None' = None, on_connect: 'OnConnectCallback | None' = None, on_disconnect: 'OnDisconnectCallback | None' = None, on_disconnect_alert: 'DisconnectAlertCallback | None' = None, on_message: 'OnMessageCallback | None' = None, on_alert: 'OnAlertCallback | None' = None, ping_interval: 'float | None' = None, ping_timeout: 'float | None' = None, connection_timeout: 'float | None' = None, reconnect_strategy: 'ReconnectStrategy' = <ReconnectStrategy.EXPONENTIAL_BACKOFF: 3>, reconnect_delay: 'float | None' = None, max_reconnect_delay: 'float | None' = None, max_reconnect_attempts: 'int | None' = None, auto_reconnect: 'bool' = True, stable_connection_time: 'float | None' = None, server_unavailable_delay: 'float | None' = None, disconnect_check_interval: 'float | None' = None, reconnect_check_interval: 'float | None' = None, disconnect_margin: 'float | None' = None, disconnect_alert_interval: 'float | None' = None, queue_size: 'int | None' = None, config: 'Mapping[str, Any] | None' = None) 'None' -> None[source]

Initialize WebSocket manager.

Parameters:
  • url (str) – WebSocket server URL (wss:// or ws://).

  • should_disconnect (Callable[[], bool] | None) – Callback returning True when disconnect is desired.

  • should_reconnect (Callable[[], bool | float] | None) – Callback returning True or delay (seconds) for reconnect.

  • on_connect (Callable[[], Awaitable[None] | None] | None) – Callback invoked after successful connection.

  • on_disconnect (Callable[[DisconnectReason], Awaitable[None] | None] | None) – Callback invoked after disconnection with reason.

  • on_disconnect_alert (Callable[[DisconnectReason, int], Awaitable[None] | None] | None) – Throttled callback for disconnect alerts. Receives (reason, count) where count is the number of disconnects since the last alert. Fires at most once per disconnect_alert_interval seconds.

  • on_message (Callable[[Any], Awaitable[None] | None] | None) – Callback invoked for each received message.

  • on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).

  • ping_interval (float | None) – Seconds between ping frames.

  • ping_timeout (float | None) – Seconds to wait for pong response.

  • connection_timeout (float | None) – Timeout for initial connection.

  • reconnect_strategy (ReconnectStrategy) – Strategy for reconnection delays.

  • reconnect_delay (float | None) – Initial delay between reconnect attempts.

  • max_reconnect_delay (float | None) – Maximum delay for exponential backoff.

  • max_reconnect_attempts (int | None) – Maximum consecutive reconnection attempts.

  • auto_reconnect (bool) – Whether to auto-reconnect on disconnection.

  • stable_connection_time (float | None) – Seconds of stable connection required before _reconnect_count is reset to 0. Protects against flapping servers that accept the handshake then close.

  • server_unavailable_delay (float | None) – Seconds to wait before any reconnect attempt after receiving close code 1013 (Try Again Later).

  • disconnect_check_interval (float | None) – Seconds between should_disconnect checks.

  • reconnect_check_interval (float | None) – Seconds between should_reconnect checks.

  • disconnect_margin (float | None) – Seconds before platform limit to disconnect.

  • disconnect_alert_interval (float | None) – Minimum seconds between calls to on_disconnect_alert. Aggregated count of skipped disconnects is passed to the callback.

  • queue_size (int | None) – Maximum messages in queue (0 = unlimited).

  • config (Mapping[str, Any] | None) – Optional config mapping for limits resolution.

Raises:

ImportError – If websockets package is not installed.

property url: str

WebSocket server URL.

property state: ConnectionState

Current connection state.

property stats: WebSocketStats

Connection and message statistics.

property is_connected: bool

Check if currently connected.

property subscriptions: frozenset[str]

Current active subscriptions.

property connection_duration: float

Seconds since last successful connection, or 0 if not connected.

async __aenter__(self) 'Self' -> Self[source]

Enter async context and connect.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit async context and close connection.

async connect(self) 'None' -> None[source]

Establish WebSocket connection.

Raises:

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     await ws.connect()
...     try:
...         async for msg in ws.stream():
...             print(msg)
...     finally:
...         await ws.close()
async request_disconnect(self, *, reconnect_after: 'float | None' = None, reason: 'DisconnectReason' = <DisconnectReason.USER_REQUESTED: 1>) 'None' -> None[source]

Request a controlled disconnection.

This is a proactive method that allows the user to disconnect at a time of their choosing rather than waiting for a forced cut.

Parameters:
  • reconnect_after (float | None) – Optional delay before auto-reconnect (seconds).

  • reason (DisconnectReason) – Reason for the disconnection.

Examples

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         # Disconnect now, reconnect in 5 minutes
...         await ws.request_disconnect(reconnect_after=300)
async schedule_reconnect(self, delay: 'float') 'None' -> None[source]

Schedule a reconnection after a delay.

Use this to programmatically reconnect after a proactive disconnect.

Parameters:

delay (float) – Seconds to wait before reconnecting.

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     await ws.connect()
...     await ws.request_disconnect()
...     # Reconnect in 5 minutes
...     await ws.schedule_reconnect(300)
async wait_for_reconnect_window(self, should_reconnect: 'ShouldReconnectCallback | None' = None, timeout: 'float | None' = None) 'bool' -> bool[source]

Wait until the reconnection condition is met.

Parameters:
  • should_reconnect (Callable[[], bool | float] | None) – Custom callback (overrides instance callback).

  • timeout (float | None) – Maximum time to wait (seconds).

Returns:

True if reconnect condition was met, False if timed out.

Return type:

bool

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     await ws.connect()
...     await ws.request_disconnect()
...     # Wait for trading window
...     if await ws.wait_for_reconnect_window(
...         should_reconnect=lambda: is_trading_time(),
...         timeout=3600,
...     ):
...         await ws.connect()
async force_close(self) 'None' -> None[source]

Emergency close without reconnection (terminal state).

This is for emergency situations where you need to stop immediately. The WebSocket instance becomes unusable after this call.

Key difference from kill() and shutdown(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     await ws.connect()
...     await ws.force_close()
...     # Cannot reconnect after force_close
...     assert ws.state == ConnectionState.CLOSED
async kill(self) 'None' -> None[source]

Simulate external disconnection (reactive, we are the victim).

This simulates a scenario where the server (e.g., Binance) forcefully disconnects us. The WebSocket “suffers” this disconnection.

Key difference from force_close() and shutdown(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.

Use this to test heartbeat/watchdog recovery mechanisms.

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws", auto_reconnect=False)
...     await ws.connect()
...     await ws.kill()  # Simulates Binance kicking us
...     assert ws.state == ConnectionState.DISCONNECTED
...     # Heartbeat detects is_dead=True and can restart
async shutdown(self) 'None' -> None[source]

Graceful intentional shutdown (proactive, user-initiated).

This is for clean shutdown scenarios like CTRL+C or service stop. The WebSocket proactively decides to close.

Key difference from kill() and force_close(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.

Sets the shutdown event so external code (heartbeat, watchdog) knows we’re stopping intentionally and should not try to restart us.

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     await ws.connect()
...     # In SIGINT handler:
...     await ws.shutdown()
...     assert ws.is_shutdown  # Heartbeat knows not to restart
property is_shutdown: bool

Check if shutdown has been requested.

property is_dead: bool

Check if connection is dead (not connected and not reconnecting).

Useful for heartbeat monitoring to detect if the WebSocket needs restart.

Returns:

True if connection is in a dead state requiring manual intervention.

Examples

>>> async def main():  
...     ws = WebSocketManager("wss://example.com/ws")
...     if ws.is_dead:
...         await ws.connect()  # Restart
property is_recoverable: bool

Check if the connection is recoverable via reconnect.

True when the connection is dead but NOT intentionally shutdown. Watchdog consumers should use this property (rather than is_dead) to decide whether to restart the connection : intentional shutdowns via shutdown() or force_close() should NOT be restarted, only accidental disconnects (graceful close() end-of-scope or reactive kill()).

Returns:

True if is_dead is True AND is_shutdown is False.

Examples

>>> async def watchdog_loop(ws):  
...     while True:
...         await asyncio.sleep(5)
...         if ws.is_recoverable:
...             await ws.connect()  # Restart accidental disconnect
...         elif ws.is_shutdown:
...             break  # Intentional shutdown, exit watchdog
async close(self) 'None' -> None[source]

Gracefully close the connection (non-terminal, reconnect possible).

Sets state to DISCONNECTED (non-terminal) so reconnection remains possible via explicit connect() or the auto-reconnect mechanism. Preserves the _auto_reconnect flag. Background tasks are cancelled and the underlying WebSocket is closed cleanly with code 1000.

Idempotent : returns immediately if state is already CLOSED or DISCONNECTED (no double close, no exception on repeated calls).

Key difference from force_close() and shutdown() :

  • close() : graceful end-of-scope (e.g. async with exit). State=DISCONNECTED non-terminal. CAN reconnect. Does NOT mark is_shutdown.

  • force_close() : emergency intentional stop. State=CLOSED terminal. CANNOT reconnect. Marks is_shutdown=True.

  • shutdown() : intentional shutdown (SIGINT-like). State=CLOSED terminal. CANNOT reconnect. Marks is_shutdown=True.

Examples

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         async for msg in ws.stream():
...             ...
...     # async with exited : close() graceful, ws.state == DISCONNECTED
...     await ws.connect()  # Reconnect possible
async send(self, data: 'Any') 'None' -> None[source]

Send a message to the WebSocket server.

Parameters:

data (Any) – Data to send (dict/list will be JSON-encoded).

Raises:

WebSocketClosedError – If connection is not active.

Examples

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         await ws.send({"type": "ping"})
async receive(self, timeout: 'float | None' = None) 'Any' -> Any[source]

Receive a single message from the queue.

Parameters:

timeout (float | None) – Maximum time to wait (seconds).

Returns:

The received message.

Raises:
Return type:

Any

Examples

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         msg = await ws.receive(timeout=10)
async stream(self) 'AsyncIterator[Any]' -> AsyncIterator[Any][source]

Iterate over received messages.

This is the main method for consuming WebSocket messages. It handles reconnection transparently.

Yields:

Received messages (parsed JSON or raw string).

Examples

>>> async def main():  
...     async with WebSocketManager("wss://example.com/ws") as ws:
...         async for msg in ws.stream():
...             print(msg)
async subscribe(self, *channels: 'str') 'None' -> None[source]

Subscribe to one or more channels.

Subscriptions are automatically restored after reconnection.

Parameters:

channels (str) – Channel names to subscribe to.

Examples

>>> async def main():  
...     async with WebSocketManager("wss://stream.binance.com/ws") as ws:
...         await ws.subscribe("btcusdt@trade", "ethusdt@trade")
async unsubscribe(self, *channels: 'str') 'None' -> None[source]

Unsubscribe from one or more channels.

Parameters:

channels (str) – Channel names to unsubscribe from.

Examples

>>> async def main():  
...     async with WebSocketManager("wss://stream.binance.com/ws") as ws:
...         await ws.unsubscribe("btcusdt@trade")
async wait_connected(self, timeout: 'float | None' = None) 'bool' -> bool[source]

Wait until connected.

Parameters:

timeout (float | None) – Maximum time to wait (seconds).

Returns:

True if connected, False if timed out.

Return type:

bool

async wait_disconnected(self, timeout: 'float | None' = None) 'bool' -> bool[source]

Wait until disconnected.

Parameters:

timeout (float | None) – Maximum time to wait (seconds).

Returns:

True if disconnected, False if timed out.

Return type:

bool


Models

ConnectionState

class kstlib.websocket.models.ConnectionState(value)[source]

Bases: Enum

WebSocket connection state machine.

State transitions:

DISCONNECTED -> CONNECTING -> CONNECTED CONNECTED -> RECONNECTING -> CONNECTED (on success) CONNECTED -> RECONNECTING -> DISCONNECTED (on failure) CONNECTED -> CLOSING -> CLOSED Any state -> CLOSED (on force_close)

DISCONNECTED

Initial state, not connected.

CONNECTING

Connection attempt in progress.

CONNECTED

WebSocket connection is active.

RECONNECTING

Attempting to restore lost connection.

CLOSING

Graceful shutdown in progress.

CLOSED

Terminal state, cannot reconnect.

DISCONNECTED = 1
CONNECTING = 2
CONNECTED = 3
RECONNECTING = 4
CLOSING = 5
CLOSED = 6
can_connect(self) 'bool' -> bool[source]

Check if a connection attempt is allowed from this state.

Returns:

True if connect() can be called from this state.

Return type:

bool

Examples

>>> ConnectionState.DISCONNECTED.can_connect()
True
>>> ConnectionState.CONNECTED.can_connect()
False
can_send(self) 'bool' -> bool[source]

Check if sending messages is allowed from this state.

Returns:

True if send() can be called from this state.

Return type:

bool

Examples

>>> ConnectionState.CONNECTED.can_send()
True
>>> ConnectionState.DISCONNECTED.can_send()
False
is_terminal(self) 'bool' -> bool[source]

Check if this is a terminal state.

Returns:

True if no further state transitions are possible.

Return type:

bool

Examples

>>> ConnectionState.CLOSED.is_terminal()
True
>>> ConnectionState.DISCONNECTED.is_terminal()
False

DisconnectReason

class kstlib.websocket.models.DisconnectReason(value)[source]

Bases: Enum

Reason for WebSocket disconnection.

Disconnections are categorized as either proactive (user-controlled) or reactive (forced by external factors). This distinction is key for the proactive connection control feature.

Proactive reasons (user-controlled):

USER_REQUESTED: Manual disconnect via request_disconnect() SCHEDULED: Disconnect triggered by schedule_reconnect() CALLBACK_TRIGGERED: should_disconnect() callback returned True CONNECTION_LIMIT: Preemptive disconnect before platform limit

Reactive reasons (forced):

SERVER_CLOSED: Server initiated the close NETWORK_ERROR: Network connectivity issue PING_TIMEOUT: No pong response within timeout PROTOCOL_ERROR: WebSocket protocol violation

USER_REQUESTED = 1
SCHEDULED = 2
CALLBACK_TRIGGERED = 3
CONNECTION_LIMIT = 4
SERVER_CLOSED = 5
NETWORK_ERROR = 6
PING_TIMEOUT = 7
PROTOCOL_ERROR = 8
KILLED = 9
property is_proactive: bool

Check if this is a proactive (user-controlled) disconnection.

Returns:

True if the disconnection was initiated by the user/application.

Examples

>>> DisconnectReason.USER_REQUESTED.is_proactive
True
>>> DisconnectReason.NETWORK_ERROR.is_proactive
False
property is_reactive: bool

Check if this is a reactive (forced) disconnection.

Returns:

True if the disconnection was forced by external factors.

Examples

>>> DisconnectReason.SERVER_CLOSED.is_reactive
True
>>> DisconnectReason.USER_REQUESTED.is_reactive
False

ReconnectStrategy

class kstlib.websocket.models.ReconnectStrategy(value)[source]

Bases: Enum

Reconnection strategy after disconnection.

IMMEDIATE

Reconnect immediately without delay.

FIXED_DELAY

Wait a fixed delay before each attempt.

EXPONENTIAL_BACKOFF

Exponentially increasing delays.

CALLBACK_CONTROLLED

Reconnection timing controlled by callback.

IMMEDIATE = 1
FIXED_DELAY = 2
EXPONENTIAL_BACKOFF = 3
CALLBACK_CONTROLLED = 4

WebSocketStats

class kstlib.websocket.models.WebSocketStats(connects=0, disconnects=0, proactive_disconnects=0, reactive_disconnects=0, messages_received=0, messages_sent=0, bytes_received=0, bytes_sent=0, last_connect_time=0.0, last_disconnect_time=0.0, last_message_time=0.0, _start_time=<factory>)[source]

Bases: object

WebSocket connection and message statistics.

Tracks both connection lifecycle events and message throughput. The key distinction is between proactive and reactive disconnections, which is central to the proactive control feature.

connects

Total successful connection count.

Type:

int

disconnects

Total disconnection count.

Type:

int

proactive_disconnects

Disconnections initiated by user/application.

Type:

int

reactive_disconnects

Disconnections forced by external factors.

Type:

int

messages_received

Total messages received.

Type:

int

messages_sent

Total messages sent.

Type:

int

bytes_received

Total bytes received.

Type:

int

bytes_sent

Total bytes sent.

Type:

int

last_connect_time

Unix timestamp of last successful connection.

Type:

float

last_disconnect_time

Unix timestamp of last disconnection.

Type:

float

last_message_time

Unix timestamp of last message (sent or received).

Type:

float

Examples

>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.connects
1
>>> stats.record_disconnect(proactive=True)
>>> stats.proactive_disconnects
1
connects: int = 0
disconnects: int = 0
proactive_disconnects: int = 0
reactive_disconnects: int = 0
messages_received: int = 0
messages_sent: int = 0
bytes_received: int = 0
bytes_sent: int = 0
last_connect_time: float = 0.0
last_disconnect_time: float = 0.0
last_message_time: float = 0.0
record_connect(self) 'None' -> None[source]

Record a successful connection.

Examples

>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.connects
1
record_disconnect(self, *, proactive: 'bool' = False) 'None' -> None[source]

Record a disconnection.

Parameters:

proactive (bool) – True if this was a user-initiated disconnection.

Examples

>>> stats = WebSocketStats()
>>> stats.record_disconnect(proactive=True)
>>> stats.proactive_disconnects
1
>>> stats.record_disconnect(proactive=False)
>>> stats.reactive_disconnects
1
record_message_received(self, size: 'int' = 0) 'None' -> None[source]

Record a received message.

Parameters:

size (int) – Size of the message in bytes.

Examples

>>> stats = WebSocketStats()
>>> stats.record_message_received(100)
>>> stats.messages_received
1
>>> stats.bytes_received
100
record_message_sent(self, size: 'int' = 0) 'None' -> None[source]

Record a sent message.

Parameters:

size (int) – Size of the message in bytes.

Examples

>>> stats = WebSocketStats()
>>> stats.record_message_sent(50)
>>> stats.messages_sent
1
>>> stats.bytes_sent
50
property uptime: float

Time since stats object was created, in seconds.

Returns:

Elapsed time in seconds.

Examples

>>> import time
>>> stats = WebSocketStats()
>>> time.sleep(0.05)
>>> stats.uptime > 0
True
property connection_time: float

Time since last connection, in seconds.

Returns zero if never connected.

Returns:

Elapsed time since last connect, or 0 if never connected.

Examples

>>> stats = WebSocketStats()
>>> stats.connection_time
0.0
>>> stats.record_connect()
>>> stats.connection_time > 0 or stats.connection_time == 0.0
True
reset(self) 'None' -> None[source]

Reset all statistics to zero.

Examples

>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.record_message_sent(100)
>>> stats.reset()
>>> stats.connects
0
>>> stats.messages_sent
0
__init__(self, connects: 'int' = 0, disconnects: 'int' = 0, proactive_disconnects: 'int' = 0, reactive_disconnects: 'int' = 0, messages_received: 'int' = 0, messages_sent: 'int' = 0, bytes_received: 'int' = 0, bytes_sent: 'int' = 0, last_connect_time: 'float' = 0.0, last_disconnect_time: 'float' = 0.0, last_message_time: 'float' = 0.0, _start_time: 'float' = <factory>) None -> None

Configuration Limits

WebSocketLimits

class kstlib.limits.WebSocketLimits(ping_interval, ping_timeout, connection_timeout, reconnect_delay, max_reconnect_delay, max_reconnect_attempts, queue_size, disconnect_check_interval, reconnect_check_interval, disconnect_margin, stable_connection_time, server_unavailable_delay, disconnect_alert_interval)[source]

Bases: object

Resolved WebSocket configuration limits.

Includes settings for connection management, reconnection behavior, and proactive control features.

ping_interval

Seconds between ping frames.

Type:

float

ping_timeout

Seconds to wait for pong response.

Type:

float

connection_timeout

Timeout for initial connection.

Type:

float

reconnect_delay

Initial delay between reconnect attempts.

Type:

float

max_reconnect_delay

Maximum delay for exponential backoff.

Type:

float

max_reconnect_attempts

Maximum consecutive reconnection attempts.

Type:

int

queue_size

Maximum messages in queue (0 = unlimited).

Type:

int

disconnect_check_interval

Seconds between should_disconnect checks.

Type:

float

reconnect_check_interval

Seconds between should_reconnect checks.

Type:

float

disconnect_margin

Seconds before platform limit to disconnect.

Type:

float

stable_connection_time

Seconds of stable connection before resetting reconnect counter.

Type:

float

server_unavailable_delay

Seconds to wait on server code 1013 before reconnect.

Type:

float

disconnect_alert_interval

Seconds between throttled disconnect alerts.

Type:

float

ping_interval: float
ping_timeout: float
connection_timeout: float
reconnect_delay: float
max_reconnect_delay: float
max_reconnect_attempts: int
queue_size: int
disconnect_check_interval: float
reconnect_check_interval: float
disconnect_margin: float
stable_connection_time: float
server_unavailable_delay: float
disconnect_alert_interval: float
__init__(self, ping_interval: 'float', ping_timeout: 'float', connection_timeout: 'float', reconnect_delay: 'float', max_reconnect_delay: 'float', max_reconnect_attempts: 'int', queue_size: 'int', disconnect_check_interval: 'float', reconnect_check_interval: 'float', disconnect_margin: 'float', stable_connection_time: 'float', server_unavailable_delay: 'float', disconnect_alert_interval: 'float') None -> None

get_websocket_limits

kstlib.limits.get_websocket_limits(config: 'Mapping[str, Any] | None' = None) 'WebSocketLimits' -> WebSocketLimits[source]

Resolve WebSocket limits from config with hard limit enforcement.

Parameters:

config (Mapping[str, Any] | None) – Optional config mapping. If None, loads from get_config().

Returns:

WebSocketLimits with resolved values clamped to hard bounds.

Return type:

WebSocketLimits

Examples

>>> limits = get_websocket_limits()
>>> limits.ping_interval
20.0
>>> limits.max_reconnect_attempts
10