Source code for kstlib.websocket.models
"""WebSocket data models and enumerations.
This module provides the core data structures for the WebSocket manager:
- **ConnectionState**: State machine for connection lifecycle
- **DisconnectReason**: Categorizes disconnection causes (proactive vs reactive)
- **ReconnectStrategy**: Available reconnection strategies
- **WebSocketStats**: Connection and message statistics
Examples:
>>> from kstlib.websocket.models import ConnectionState, DisconnectReason
>>> state = ConnectionState.CONNECTED
>>> reason = DisconnectReason.USER_REQUESTED
>>> reason.is_proactive
True
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from enum import Enum, auto
__all__ = [
"ConnectionState",
"DisconnectReason",
"ReconnectStrategy",
"WebSocketStats",
]
[docs]
class ConnectionState(Enum):
"""WebSocket connection state machine.
State transitions:
DISCONNECTED -> CONNECTING -> CONNECTED
CONNECTED -> RECONNECTING -> CONNECTED (on success)
CONNECTED -> RECONNECTING -> DISCONNECTED (on failure)
CONNECTED -> CLOSING -> CLOSED
Any state -> CLOSED (on force_close)
Attributes:
DISCONNECTED: Initial state, not connected.
CONNECTING: Connection attempt in progress.
CONNECTED: WebSocket connection is active.
RECONNECTING: Attempting to restore lost connection.
CLOSING: Graceful shutdown in progress.
CLOSED: Terminal state, cannot reconnect.
"""
DISCONNECTED = auto()
CONNECTING = auto()
CONNECTED = auto()
RECONNECTING = auto()
CLOSING = auto()
CLOSED = auto()
[docs]
def can_connect(self) -> bool:
"""Check if a connection attempt is allowed from this state.
Returns:
True if connect() can be called from this state.
Examples:
>>> ConnectionState.DISCONNECTED.can_connect()
True
>>> ConnectionState.CONNECTED.can_connect()
False
"""
return self in (ConnectionState.DISCONNECTED, ConnectionState.RECONNECTING)
[docs]
def can_send(self) -> bool:
"""Check if sending messages is allowed from this state.
Returns:
True if send() can be called from this state.
Examples:
>>> ConnectionState.CONNECTED.can_send()
True
>>> ConnectionState.DISCONNECTED.can_send()
False
"""
return self == ConnectionState.CONNECTED
[docs]
def is_terminal(self) -> bool:
"""Check if this is a terminal state.
Returns:
True if no further state transitions are possible.
Examples:
>>> ConnectionState.CLOSED.is_terminal()
True
>>> ConnectionState.DISCONNECTED.is_terminal()
False
"""
return self == ConnectionState.CLOSED
[docs]
class DisconnectReason(Enum):
"""Reason for WebSocket disconnection.
Disconnections are categorized as either proactive (user-controlled)
or reactive (forced by external factors). This distinction is key
for the proactive connection control feature.
Proactive reasons (user-controlled):
USER_REQUESTED: Manual disconnect via request_disconnect()
SCHEDULED: Disconnect triggered by schedule_reconnect()
CALLBACK_TRIGGERED: should_disconnect() callback returned True
CONNECTION_LIMIT: Preemptive disconnect before platform limit
Reactive reasons (forced):
SERVER_CLOSED: Server initiated the close
NETWORK_ERROR: Network connectivity issue
PING_TIMEOUT: No pong response within timeout
PROTOCOL_ERROR: WebSocket protocol violation
"""
# Proactive (user-controlled) disconnections
USER_REQUESTED = auto()
SCHEDULED = auto()
CALLBACK_TRIGGERED = auto()
CONNECTION_LIMIT = auto()
# Reactive (forced) disconnections
SERVER_CLOSED = auto()
NETWORK_ERROR = auto()
PING_TIMEOUT = auto()
PROTOCOL_ERROR = auto()
KILLED = auto() # Simulated external kill (e.g., Binance forced disconnect)
@property
def is_proactive(self) -> bool:
"""Check if this is a proactive (user-controlled) disconnection.
Returns:
True if the disconnection was initiated by the user/application.
Examples:
>>> DisconnectReason.USER_REQUESTED.is_proactive
True
>>> DisconnectReason.NETWORK_ERROR.is_proactive
False
"""
return self in (
DisconnectReason.USER_REQUESTED,
DisconnectReason.SCHEDULED,
DisconnectReason.CALLBACK_TRIGGERED,
DisconnectReason.CONNECTION_LIMIT,
)
@property
def is_reactive(self) -> bool:
"""Check if this is a reactive (forced) disconnection.
Returns:
True if the disconnection was forced by external factors.
Examples:
>>> DisconnectReason.SERVER_CLOSED.is_reactive
True
>>> DisconnectReason.USER_REQUESTED.is_reactive
False
"""
return not self.is_proactive
[docs]
class ReconnectStrategy(Enum):
"""Reconnection strategy after disconnection.
Attributes:
IMMEDIATE: Reconnect immediately without delay.
FIXED_DELAY: Wait a fixed delay before each attempt.
EXPONENTIAL_BACKOFF: Exponentially increasing delays.
CALLBACK_CONTROLLED: Reconnection timing controlled by callback.
"""
IMMEDIATE = auto()
FIXED_DELAY = auto()
EXPONENTIAL_BACKOFF = auto()
CALLBACK_CONTROLLED = auto()
[docs]
@dataclass
class WebSocketStats:
"""WebSocket connection and message statistics.
Tracks both connection lifecycle events and message throughput.
The key distinction is between proactive and reactive disconnections,
which is central to the proactive control feature.
Attributes:
connects: Total successful connection count.
disconnects: Total disconnection count.
proactive_disconnects: Disconnections initiated by user/application.
reactive_disconnects: Disconnections forced by external factors.
messages_received: Total messages received.
messages_sent: Total messages sent.
bytes_received: Total bytes received.
bytes_sent: Total bytes sent.
last_connect_time: Unix timestamp of last successful connection.
last_disconnect_time: Unix timestamp of last disconnection.
last_message_time: Unix timestamp of last message (sent or received).
Examples:
>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.connects
1
>>> stats.record_disconnect(proactive=True)
>>> stats.proactive_disconnects
1
"""
connects: int = 0
disconnects: int = 0
proactive_disconnects: int = 0
reactive_disconnects: int = 0
messages_received: int = 0
messages_sent: int = 0
bytes_received: int = 0
bytes_sent: int = 0
last_connect_time: float = 0.0
last_disconnect_time: float = 0.0
last_message_time: float = 0.0
_start_time: float = field(default_factory=time.monotonic)
[docs]
def record_connect(self) -> None:
"""Record a successful connection.
Examples:
>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.connects
1
"""
self.connects += 1
self.last_connect_time = time.time()
[docs]
def record_disconnect(self, *, proactive: bool = False) -> None:
"""Record a disconnection.
Args:
proactive: True if this was a user-initiated disconnection.
Examples:
>>> stats = WebSocketStats()
>>> stats.record_disconnect(proactive=True)
>>> stats.proactive_disconnects
1
>>> stats.record_disconnect(proactive=False)
>>> stats.reactive_disconnects
1
"""
self.disconnects += 1
self.last_disconnect_time = time.time()
if proactive:
self.proactive_disconnects += 1
else:
self.reactive_disconnects += 1
[docs]
def record_message_received(self, size: int = 0) -> None:
"""Record a received message.
Args:
size: Size of the message in bytes.
Examples:
>>> stats = WebSocketStats()
>>> stats.record_message_received(100)
>>> stats.messages_received
1
>>> stats.bytes_received
100
"""
self.messages_received += 1
self.bytes_received += size
self.last_message_time = time.time()
[docs]
def record_message_sent(self, size: int = 0) -> None:
"""Record a sent message.
Args:
size: Size of the message in bytes.
Examples:
>>> stats = WebSocketStats()
>>> stats.record_message_sent(50)
>>> stats.messages_sent
1
>>> stats.bytes_sent
50
"""
self.messages_sent += 1
self.bytes_sent += size
self.last_message_time = time.time()
@property
def uptime(self) -> float:
"""Time since stats object was created, in seconds.
Returns:
Elapsed time in seconds.
Examples:
>>> import time
>>> stats = WebSocketStats()
>>> time.sleep(0.05)
>>> stats.uptime > 0
True
"""
return time.monotonic() - self._start_time
@property
def connection_time(self) -> float:
"""Time since last connection, in seconds.
Returns zero if never connected.
Returns:
Elapsed time since last connect, or 0 if never connected.
Examples:
>>> stats = WebSocketStats()
>>> stats.connection_time
0.0
>>> stats.record_connect()
>>> stats.connection_time > 0 or stats.connection_time == 0.0
True
"""
if self.last_connect_time == 0.0:
return 0.0
return time.time() - self.last_connect_time
[docs]
def reset(self) -> None:
"""Reset all statistics to zero.
Examples:
>>> stats = WebSocketStats()
>>> stats.record_connect()
>>> stats.record_message_sent(100)
>>> stats.reset()
>>> stats.connects
0
>>> stats.messages_sent
0
"""
self.connects = 0
self.disconnects = 0
self.proactive_disconnects = 0
self.reactive_disconnects = 0
self.messages_received = 0
self.messages_sent = 0
self.bytes_received = 0
self.bytes_sent = 0
self.last_connect_time = 0.0
self.last_disconnect_time = 0.0
self.last_message_time = 0.0
self._start_time = time.monotonic()