WebSocket Subsystem¶
Async WebSocket client with proactive connection control and auto-reconnection.
Tip
Pair this reference with WebSocket for the feature guide.
Quick Overview¶
WebSocketManagerprovides proactive control over connection lifecycle.ConnectionStatetracks the connection state machine.DisconnectReasoncategorizes proactive vs reactive disconnections.ReconnectStrategydefines reconnection behavior.WebSocketStatstracks connection and message statistics.
Core Components¶
WebSocketManager¶
- class kstlib.websocket.manager.WebSocketManager(url, *, should_disconnect=None, should_reconnect=None, on_connect=None, on_disconnect=None, on_disconnect_alert=None, on_message=None, on_alert=None, ping_interval=None, ping_timeout=None, connection_timeout=None, reconnect_strategy=ReconnectStrategy.EXPONENTIAL_BACKOFF, reconnect_delay=None, max_reconnect_delay=None, max_reconnect_attempts=None, auto_reconnect=True, stable_connection_time=None, server_unavailable_delay=None, disconnect_check_interval=None, reconnect_check_interval=None, disconnect_margin=None, disconnect_alert_interval=None, queue_size=None, config=None)[source]
Bases:
objectAsync WebSocket manager with proactive connection control.
This manager provides both reactive (auto-reconnect on failure) and proactive (user-controlled disconnect/reconnect) connection management.
The proactive control feature is the key differentiator: instead of just reacting to disconnections, you can control WHEN to disconnect and reconnect. This is essential for trading where you want to avoid mid-operation cuts.
- url
WebSocket server URL.
- state
Current connection state.
- stats
Connection and message statistics.
Examples
Basic streaming:
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... async for msg in ws.stream(): ... print(msg)
Proactive control:
>>> async def trading(): ... ws = WebSocketManager( ... url="wss://stream.binance.com/ws", ... should_disconnect=lambda: not is_critical_time(), ... should_reconnect=lambda: is_approaching_candle(), ... ) ... async with ws: ... await ws.subscribe("btcusdt@kline_4h") ... async for msg in ws.stream(): ... process(msg)
- __init__(self, url: 'str', *, should_disconnect: 'ShouldDisconnectCallback | None' = None, should_reconnect: 'ShouldReconnectCallback | None' = None, on_connect: 'OnConnectCallback | None' = None, on_disconnect: 'OnDisconnectCallback | None' = None, on_disconnect_alert: 'DisconnectAlertCallback | None' = None, on_message: 'OnMessageCallback | None' = None, on_alert: 'OnAlertCallback | None' = None, ping_interval: 'float | None' = None, ping_timeout: 'float | None' = None, connection_timeout: 'float | None' = None, reconnect_strategy: 'ReconnectStrategy' = <ReconnectStrategy.EXPONENTIAL_BACKOFF: 3>, reconnect_delay: 'float | None' = None, max_reconnect_delay: 'float | None' = None, max_reconnect_attempts: 'int | None' = None, auto_reconnect: 'bool' = True, stable_connection_time: 'float | None' = None, server_unavailable_delay: 'float | None' = None, disconnect_check_interval: 'float | None' = None, reconnect_check_interval: 'float | None' = None, disconnect_margin: 'float | None' = None, disconnect_alert_interval: 'float | None' = None, queue_size: 'int | None' = None, config: 'Mapping[str, Any] | None' = None) 'None' -> None[source]
Initialize WebSocket manager.
- Parameters:
url (str) – WebSocket server URL (wss:// or ws://).
should_disconnect (Callable[[], bool] | None) – Callback returning True when disconnect is desired.
should_reconnect (Callable[[], bool | float] | None) – Callback returning True or delay (seconds) for reconnect.
on_connect (Callable[[], Awaitable[None] | None] | None) – Callback invoked after successful connection.
on_disconnect (Callable[[DisconnectReason], Awaitable[None] | None] | None) – Callback invoked after disconnection with reason.
on_disconnect_alert (Callable[[DisconnectReason, int], Awaitable[None] | None] | None) – Throttled callback for disconnect alerts. Receives (reason, count) where count is the number of disconnects since the last alert. Fires at most once per
disconnect_alert_intervalseconds.on_message (Callable[[Any], Awaitable[None] | None] | None) – Callback invoked for each received message.
on_alert (Callable[[str, str, Mapping[str, Any]], Awaitable[None] | None] | None) – Callback for alerting (channel, message, context).
ping_interval (float | None) – Seconds between ping frames.
ping_timeout (float | None) – Seconds to wait for pong response.
connection_timeout (float | None) – Timeout for initial connection.
reconnect_strategy (ReconnectStrategy) – Strategy for reconnection delays.
reconnect_delay (float | None) – Initial delay between reconnect attempts.
max_reconnect_delay (float | None) – Maximum delay for exponential backoff.
max_reconnect_attempts (int | None) – Maximum consecutive reconnection attempts.
auto_reconnect (bool) – Whether to auto-reconnect on disconnection.
stable_connection_time (float | None) – Seconds of stable connection required before
_reconnect_countis reset to 0. Protects against flapping servers that accept the handshake then close.server_unavailable_delay (float | None) – Seconds to wait before any reconnect attempt after receiving close code 1013 (Try Again Later).
disconnect_check_interval (float | None) – Seconds between should_disconnect checks.
reconnect_check_interval (float | None) – Seconds between should_reconnect checks.
disconnect_margin (float | None) – Seconds before platform limit to disconnect.
disconnect_alert_interval (float | None) – Minimum seconds between calls to
on_disconnect_alert. Aggregated count of skipped disconnects is passed to the callback.queue_size (int | None) – Maximum messages in queue (0 = unlimited).
config (Mapping[str, Any] | None) – Optional config mapping for limits resolution.
- Raises:
ImportError – If websockets package is not installed.
- property url: str
WebSocket server URL.
- property state: ConnectionState
Current connection state.
- property stats: WebSocketStats
Connection and message statistics.
- property is_connected: bool
Check if currently connected.
- property connection_duration: float
Seconds since last successful connection, or 0 if not connected.
- async __aenter__(self) 'Self' -> Self[source]
Enter async context and connect.
- async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]
Exit async context and close connection.
- async connect(self) 'None' -> None[source]
Establish WebSocket connection.
- Raises:
WebSocketConnectionError – If connection fails after retries.
WebSocketTimeoutError – If connection times out.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... await ws.connect() ... try: ... async for msg in ws.stream(): ... print(msg) ... finally: ... await ws.close()
- async request_disconnect(self, *, reconnect_after: 'float | None' = None, reason: 'DisconnectReason' = <DisconnectReason.USER_REQUESTED: 1>) 'None' -> None[source]
Request a controlled disconnection.
This is a proactive method that allows the user to disconnect at a time of their choosing rather than waiting for a forced cut.
- Parameters:
reconnect_after (float | None) – Optional delay before auto-reconnect (seconds).
reason (DisconnectReason) – Reason for the disconnection.
Examples
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... # Disconnect now, reconnect in 5 minutes ... await ws.request_disconnect(reconnect_after=300)
- async schedule_reconnect(self, delay: 'float') 'None' -> None[source]
Schedule a reconnection after a delay.
Use this to programmatically reconnect after a proactive disconnect.
- Parameters:
delay (float) – Seconds to wait before reconnecting.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... await ws.connect() ... await ws.request_disconnect() ... # Reconnect in 5 minutes ... await ws.schedule_reconnect(300)
- async wait_for_reconnect_window(self, should_reconnect: 'ShouldReconnectCallback | None' = None, timeout: 'float | None' = None) 'bool' -> bool[source]
Wait until the reconnection condition is met.
- Parameters:
- Returns:
True if reconnect condition was met, False if timed out.
- Return type:
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... await ws.connect() ... await ws.request_disconnect() ... # Wait for trading window ... if await ws.wait_for_reconnect_window( ... should_reconnect=lambda: is_trading_time(), ... timeout=3600, ... ): ... await ws.connect()
- async force_close(self) 'None' -> None[source]
Emergency close without reconnection (terminal state).
This is for emergency situations where you need to stop immediately. The WebSocket instance becomes unusable after this call.
Key difference from kill() and shutdown(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... await ws.connect() ... await ws.force_close() ... # Cannot reconnect after force_close ... assert ws.state == ConnectionState.CLOSED
- async kill(self) 'None' -> None[source]
Simulate external disconnection (reactive, we are the victim).
This simulates a scenario where the server (e.g., Binance) forcefully disconnects us. The WebSocket “suffers” this disconnection.
Key difference from force_close() and shutdown(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.
Use this to test heartbeat/watchdog recovery mechanisms.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws", auto_reconnect=False) ... await ws.connect() ... await ws.kill() # Simulates Binance kicking us ... assert ws.state == ConnectionState.DISCONNECTED ... # Heartbeat detects is_dead=True and can restart
- async shutdown(self) 'None' -> None[source]
Graceful intentional shutdown (proactive, user-initiated).
This is for clean shutdown scenarios like CTRL+C or service stop. The WebSocket proactively decides to close.
Key difference from kill() and force_close(): - kill(): Reactive. Server kicked us. State=DISCONNECTED. CAN reconnect. - shutdown(): Proactive graceful. User wants to stop. State=CLOSED. CANNOT reconnect. - force_close(): Emergency stop. State=CLOSED. CANNOT reconnect.
Sets the shutdown event so external code (heartbeat, watchdog) knows we’re stopping intentionally and should not try to restart us.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... await ws.connect() ... # In SIGINT handler: ... await ws.shutdown() ... assert ws.is_shutdown # Heartbeat knows not to restart
- property is_shutdown: bool
Check if shutdown has been requested.
- property is_dead: bool
Check if connection is dead (not connected and not reconnecting).
Useful for heartbeat monitoring to detect if the WebSocket needs restart.
- Returns:
True if connection is in a dead state requiring manual intervention.
Examples
>>> async def main(): ... ws = WebSocketManager("wss://example.com/ws") ... if ws.is_dead: ... await ws.connect() # Restart
- property is_recoverable: bool
Check if the connection is recoverable via reconnect.
True when the connection is dead but NOT intentionally shutdown. Watchdog consumers should use this property (rather than
is_dead) to decide whether to restart the connection : intentional shutdowns viashutdown()orforce_close()should NOT be restarted, only accidental disconnects (gracefulclose()end-of-scope or reactivekill()).- Returns:
True if
is_deadis True ANDis_shutdownis False.
Examples
>>> async def watchdog_loop(ws): ... while True: ... await asyncio.sleep(5) ... if ws.is_recoverable: ... await ws.connect() # Restart accidental disconnect ... elif ws.is_shutdown: ... break # Intentional shutdown, exit watchdog
- async close(self) 'None' -> None[source]
Gracefully close the connection (non-terminal, reconnect possible).
Sets state to DISCONNECTED (non-terminal) so reconnection remains possible via explicit
connect()or the auto-reconnect mechanism. Preserves the_auto_reconnectflag. Background tasks are cancelled and the underlying WebSocket is closed cleanly with code 1000.Idempotent : returns immediately if state is already CLOSED or DISCONNECTED (no double close, no exception on repeated calls).
Key difference from
force_close()andshutdown():close(): graceful end-of-scope (e.g.async withexit). State=DISCONNECTED non-terminal. CAN reconnect. Does NOT markis_shutdown.force_close(): emergency intentional stop. State=CLOSED terminal. CANNOT reconnect. Marksis_shutdown=True.shutdown(): intentional shutdown (SIGINT-like). State=CLOSED terminal. CANNOT reconnect. Marksis_shutdown=True.
Examples
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... async for msg in ws.stream(): ... ... ... # async with exited : close() graceful, ws.state == DISCONNECTED ... await ws.connect() # Reconnect possible
- async send(self, data: 'Any') 'None' -> None[source]
Send a message to the WebSocket server.
- Parameters:
data (Any) – Data to send (dict/list will be JSON-encoded).
- Raises:
WebSocketClosedError – If connection is not active.
Examples
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... await ws.send({"type": "ping"})
- async receive(self, timeout: 'float | None' = None) 'Any' -> Any[source]
Receive a single message from the queue.
- Parameters:
timeout (float | None) – Maximum time to wait (seconds).
- Returns:
The received message.
- Raises:
WebSocketTimeoutError – If timeout expires.
WebSocketClosedError – If connection is closed.
- Return type:
Examples
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... msg = await ws.receive(timeout=10)
- async stream(self) 'AsyncIterator[Any]' -> AsyncIterator[Any][source]
Iterate over received messages.
This is the main method for consuming WebSocket messages. It handles reconnection transparently.
- Yields:
Received messages (parsed JSON or raw string).
Examples
>>> async def main(): ... async with WebSocketManager("wss://example.com/ws") as ws: ... async for msg in ws.stream(): ... print(msg)
- async subscribe(self, *channels: 'str') 'None' -> None[source]
Subscribe to one or more channels.
Subscriptions are automatically restored after reconnection.
- Parameters:
channels (str) – Channel names to subscribe to.
Examples
>>> async def main(): ... async with WebSocketManager("wss://stream.binance.com/ws") as ws: ... await ws.subscribe("btcusdt@trade", "ethusdt@trade")
- async unsubscribe(self, *channels: 'str') 'None' -> None[source]
Unsubscribe from one or more channels.
- Parameters:
channels (str) – Channel names to unsubscribe from.
Examples
>>> async def main(): ... async with WebSocketManager("wss://stream.binance.com/ws") as ws: ... await ws.unsubscribe("btcusdt@trade")
- async wait_connected(self, timeout: 'float | None' = None) 'bool' -> bool[source]
Wait until connected.
Models¶
ConnectionState¶
- class kstlib.websocket.models.ConnectionState(value)[source]
Bases:
EnumWebSocket connection state machine.
- State transitions:
DISCONNECTED -> CONNECTING -> CONNECTED CONNECTED -> RECONNECTING -> CONNECTED (on success) CONNECTED -> RECONNECTING -> DISCONNECTED (on failure) CONNECTED -> CLOSING -> CLOSED Any state -> CLOSED (on force_close)
- DISCONNECTED
Initial state, not connected.
- CONNECTING
Connection attempt in progress.
- CONNECTED
WebSocket connection is active.
- RECONNECTING
Attempting to restore lost connection.
- CLOSING
Graceful shutdown in progress.
- CLOSED
Terminal state, cannot reconnect.
- DISCONNECTED = 1
- CONNECTING = 2
- CONNECTED = 3
- RECONNECTING = 4
- CLOSING = 5
- CLOSED = 6
- can_connect(self) 'bool' -> bool[source]
Check if a connection attempt is allowed from this state.
- Returns:
True if connect() can be called from this state.
- Return type:
Examples
>>> ConnectionState.DISCONNECTED.can_connect() True >>> ConnectionState.CONNECTED.can_connect() False
- can_send(self) 'bool' -> bool[source]
Check if sending messages is allowed from this state.
- Returns:
True if send() can be called from this state.
- Return type:
Examples
>>> ConnectionState.CONNECTED.can_send() True >>> ConnectionState.DISCONNECTED.can_send() False
DisconnectReason¶
- class kstlib.websocket.models.DisconnectReason(value)[source]
Bases:
EnumReason for WebSocket disconnection.
Disconnections are categorized as either proactive (user-controlled) or reactive (forced by external factors). This distinction is key for the proactive connection control feature.
- Proactive reasons (user-controlled):
USER_REQUESTED: Manual disconnect via request_disconnect() SCHEDULED: Disconnect triggered by schedule_reconnect() CALLBACK_TRIGGERED: should_disconnect() callback returned True CONNECTION_LIMIT: Preemptive disconnect before platform limit
- Reactive reasons (forced):
SERVER_CLOSED: Server initiated the close NETWORK_ERROR: Network connectivity issue PING_TIMEOUT: No pong response within timeout PROTOCOL_ERROR: WebSocket protocol violation
- USER_REQUESTED = 1
- SCHEDULED = 2
- CALLBACK_TRIGGERED = 3
- CONNECTION_LIMIT = 4
- SERVER_CLOSED = 5
- NETWORK_ERROR = 6
- PING_TIMEOUT = 7
- PROTOCOL_ERROR = 8
- KILLED = 9
- property is_proactive: bool
Check if this is a proactive (user-controlled) disconnection.
- Returns:
True if the disconnection was initiated by the user/application.
Examples
>>> DisconnectReason.USER_REQUESTED.is_proactive True >>> DisconnectReason.NETWORK_ERROR.is_proactive False
- property is_reactive: bool
Check if this is a reactive (forced) disconnection.
- Returns:
True if the disconnection was forced by external factors.
Examples
>>> DisconnectReason.SERVER_CLOSED.is_reactive True >>> DisconnectReason.USER_REQUESTED.is_reactive False
ReconnectStrategy¶
- class kstlib.websocket.models.ReconnectStrategy(value)[source]
Bases:
EnumReconnection strategy after disconnection.
- IMMEDIATE
Reconnect immediately without delay.
- FIXED_DELAY
Wait a fixed delay before each attempt.
- EXPONENTIAL_BACKOFF
Exponentially increasing delays.
- CALLBACK_CONTROLLED
Reconnection timing controlled by callback.
- IMMEDIATE = 1
- FIXED_DELAY = 2
- EXPONENTIAL_BACKOFF = 3
- CALLBACK_CONTROLLED = 4
WebSocketStats¶
- class kstlib.websocket.models.WebSocketStats(connects=0, disconnects=0, proactive_disconnects=0, reactive_disconnects=0, messages_received=0, messages_sent=0, bytes_received=0, bytes_sent=0, last_connect_time=0.0, last_disconnect_time=0.0, last_message_time=0.0, _start_time=<factory>)[source]
Bases:
objectWebSocket connection and message statistics.
Tracks both connection lifecycle events and message throughput. The key distinction is between proactive and reactive disconnections, which is central to the proactive control feature.
- connects
Total successful connection count.
- Type:
- disconnects
Total disconnection count.
- Type:
- proactive_disconnects
Disconnections initiated by user/application.
- Type:
- reactive_disconnects
Disconnections forced by external factors.
- Type:
- messages_received
Total messages received.
- Type:
- messages_sent
Total messages sent.
- Type:
- bytes_received
Total bytes received.
- Type:
- bytes_sent
Total bytes sent.
- Type:
- last_connect_time
Unix timestamp of last successful connection.
- Type:
- last_disconnect_time
Unix timestamp of last disconnection.
- Type:
- last_message_time
Unix timestamp of last message (sent or received).
- Type:
Examples
>>> stats = WebSocketStats() >>> stats.record_connect() >>> stats.connects 1 >>> stats.record_disconnect(proactive=True) >>> stats.proactive_disconnects 1
- connects: int = 0
- disconnects: int = 0
- proactive_disconnects: int = 0
- reactive_disconnects: int = 0
- messages_received: int = 0
- messages_sent: int = 0
- bytes_received: int = 0
- bytes_sent: int = 0
- last_connect_time: float = 0.0
- last_disconnect_time: float = 0.0
- last_message_time: float = 0.0
- record_connect(self) 'None' -> None[source]
Record a successful connection.
Examples
>>> stats = WebSocketStats() >>> stats.record_connect() >>> stats.connects 1
- record_disconnect(self, *, proactive: 'bool' = False) 'None' -> None[source]
Record a disconnection.
- Parameters:
proactive (bool) – True if this was a user-initiated disconnection.
Examples
>>> stats = WebSocketStats() >>> stats.record_disconnect(proactive=True) >>> stats.proactive_disconnects 1 >>> stats.record_disconnect(proactive=False) >>> stats.reactive_disconnects 1
- record_message_received(self, size: 'int' = 0) 'None' -> None[source]
Record a received message.
- Parameters:
size (int) – Size of the message in bytes.
Examples
>>> stats = WebSocketStats() >>> stats.record_message_received(100) >>> stats.messages_received 1 >>> stats.bytes_received 100
- record_message_sent(self, size: 'int' = 0) 'None' -> None[source]
Record a sent message.
- Parameters:
size (int) – Size of the message in bytes.
Examples
>>> stats = WebSocketStats() >>> stats.record_message_sent(50) >>> stats.messages_sent 1 >>> stats.bytes_sent 50
- property uptime: float
Time since stats object was created, in seconds.
- Returns:
Elapsed time in seconds.
Examples
>>> import time >>> stats = WebSocketStats() >>> time.sleep(0.05) >>> stats.uptime > 0 True
- property connection_time: float
Time since last connection, in seconds.
Returns zero if never connected.
- Returns:
Elapsed time since last connect, or 0 if never connected.
Examples
>>> stats = WebSocketStats() >>> stats.connection_time 0.0 >>> stats.record_connect() >>> stats.connection_time > 0 or stats.connection_time == 0.0 True
- reset(self) 'None' -> None[source]
Reset all statistics to zero.
Examples
>>> stats = WebSocketStats() >>> stats.record_connect() >>> stats.record_message_sent(100) >>> stats.reset() >>> stats.connects 0 >>> stats.messages_sent 0
- __init__(self, connects: 'int' = 0, disconnects: 'int' = 0, proactive_disconnects: 'int' = 0, reactive_disconnects: 'int' = 0, messages_received: 'int' = 0, messages_sent: 'int' = 0, bytes_received: 'int' = 0, bytes_sent: 'int' = 0, last_connect_time: 'float' = 0.0, last_disconnect_time: 'float' = 0.0, last_message_time: 'float' = 0.0, _start_time: 'float' = <factory>) None -> None
Configuration Limits¶
WebSocketLimits¶
- class kstlib.limits.WebSocketLimits(ping_interval, ping_timeout, connection_timeout, reconnect_delay, max_reconnect_delay, max_reconnect_attempts, queue_size, disconnect_check_interval, reconnect_check_interval, disconnect_margin, stable_connection_time, server_unavailable_delay, disconnect_alert_interval)[source]
Bases:
objectResolved WebSocket configuration limits.
Includes settings for connection management, reconnection behavior, and proactive control features.
- ping_interval
Seconds between ping frames.
- Type:
- ping_timeout
Seconds to wait for pong response.
- Type:
- connection_timeout
Timeout for initial connection.
- Type:
- reconnect_delay
Initial delay between reconnect attempts.
- Type:
- max_reconnect_delay
Maximum delay for exponential backoff.
- Type:
- max_reconnect_attempts
Maximum consecutive reconnection attempts.
- Type:
- queue_size
Maximum messages in queue (0 = unlimited).
- Type:
- disconnect_check_interval
Seconds between should_disconnect checks.
- Type:
- reconnect_check_interval
Seconds between should_reconnect checks.
- Type:
- disconnect_margin
Seconds before platform limit to disconnect.
- Type:
- stable_connection_time
Seconds of stable connection before resetting reconnect counter.
- Type:
- server_unavailable_delay
Seconds to wait on server code 1013 before reconnect.
- Type:
- disconnect_alert_interval
Seconds between throttled disconnect alerts.
- Type:
- ping_interval: float
- ping_timeout: float
- connection_timeout: float
- reconnect_delay: float
- max_reconnect_delay: float
- max_reconnect_attempts: int
- queue_size: int
- disconnect_check_interval: float
- reconnect_check_interval: float
- disconnect_margin: float
- stable_connection_time: float
- server_unavailable_delay: float
- disconnect_alert_interval: float
- __init__(self, ping_interval: 'float', ping_timeout: 'float', connection_timeout: 'float', reconnect_delay: 'float', max_reconnect_delay: 'float', max_reconnect_attempts: 'int', queue_size: 'int', disconnect_check_interval: 'float', reconnect_check_interval: 'float', disconnect_margin: 'float', stable_connection_time: 'float', server_unavailable_delay: 'float', disconnect_alert_interval: 'float') None -> None
get_websocket_limits¶
- kstlib.limits.get_websocket_limits(config: 'Mapping[str, Any] | None' = None) 'WebSocketLimits' -> WebSocketLimits[source]
Resolve WebSocket limits from config with hard limit enforcement.
- Parameters:
config (Mapping[str, Any] | None) – Optional config mapping. If None, loads from get_config().
- Returns:
WebSocketLimits with resolved values clamped to hard bounds.
- Return type:
WebSocketLimits
Examples
>>> limits = get_websocket_limits() >>> limits.ping_interval 20.0 >>> limits.max_reconnect_attempts 10