WebSocket

Async WebSocket client with proactive connection control for trading applications.

TL;DR

import asyncio
from kstlib.websocket import WebSocketManager

async def main():
    async with WebSocketManager("wss://stream.example.com/ws") as ws:
        async for message in ws.stream():
            print(message)

asyncio.run(main())

Key Features

  • Proactive Control: User-controlled disconnect/reconnect timing

  • Auto-Reconnection: Configurable reconnection strategies

  • Subscription Management: Auto-resubscribe on reconnection

  • Statistics Tracking: Proactive vs reactive disconnect metrics

  • Config-Driven: Integrates with kstlib.conf.yml

The Problem

Traditional WebSocket clients are reactive: they only handle disconnections after they occur. For trading applications, this is problematic:

  • Binance disconnects WebSockets every 24 hours

  • Disconnections during critical moments (order placement, candle close) cause data loss

  • No control over WHEN disconnections happen

The Solution: Proactive Control

WebSocketManager lets you control WHEN to disconnect and reconnect:

def next_candle_in() -> float:
    """Seconds until next 4H candle."""
    ...

ws = WebSocketManager(
    url="wss://stream.binance.com/ws/btcusdt@kline_4h",
    # Disconnect when > 30s until next candle (safe window)
    should_disconnect=lambda: next_candle_in() > 30,
    # Reconnect when < 60s until next candle (prepare for data)
    should_reconnect=lambda: next_candle_in() < 60,
    disconnect_check_interval=5.0,
)

This ensures you’re always connected during critical moments and disconnected during safe windows.

Quick Start

Basic Usage

from kstlib.websocket import WebSocketManager

async with WebSocketManager("wss://example.com/ws") as ws:
    async for message in ws.stream():
        print(message)

With Auto-Reconnection

from kstlib.websocket import WebSocketManager, ReconnectStrategy

ws = WebSocketManager(
    url="wss://example.com/ws",
    reconnect_strategy=ReconnectStrategy.EXPONENTIAL_BACKOFF,
    max_reconnect_attempts=5,
    reconnect_delay=1.0,
    reconnect_delay_max=60.0,
)

With Subscriptions

ws = WebSocketManager(
    url="wss://stream.binance.com/ws",
    subscriptions=[
        {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"]},
        {"method": "SUBSCRIBE", "params": ["ethusdt@kline_1m"]},
    ],
)

# Subscriptions are automatically restored on reconnection

Manual Control

ws = WebSocketManager(url="wss://example.com/ws")

await ws.connect()

# Request graceful disconnect (waits for safe moment)
ws.request_disconnect()

# Force immediate disconnect
await ws.disconnect()

# Schedule reconnection
ws.schedule_reconnect(delay=30.0)

# Check state
print(ws.state)  # ConnectionState.CONNECTED
print(ws.stats)  # WebSocketStats(...)

Connection States

from kstlib.websocket import ConnectionState

ConnectionState.DISCONNECTED  # Initial state
ConnectionState.CONNECTING    # Connection in progress
ConnectionState.CONNECTED     # Active connection
ConnectionState.RECONNECTING  # Restoring lost connection
ConnectionState.CLOSING       # Graceful shutdown
ConnectionState.CLOSED        # Terminal state

State transitions:

DISCONNECTED -> CONNECTING -> CONNECTED
CONNECTED -> RECONNECTING -> CONNECTED (success)
CONNECTED -> RECONNECTING -> DISCONNECTED (failure)
CONNECTED -> CLOSING -> CLOSED

Lifecycle Methods

The library exposes 4 methods to control connection lifecycle, each with distinct semantics. Pick based on your intent:

Method

Use case

State final

is_dead

is_shutdown

is_recoverable

Reconnect via connect()

close()

Graceful end-of-scope (e.g., async with exit, voluntary break)

DISCONNECTED

True

False

True

YES

force_close()

Emergency intentional stop (e.g., critical error caught, immediate halt)

CLOSED

True

True

False

NO (warning + no-op)

shutdown()

Intentional shutdown SIGINT-like (e.g., service stop, CTRL+C handler)

CLOSED

True

True

False

NO (warning + no-op)

kill()

Simulate external server disconnect (test heartbeat/watchdog recovery)

DISCONNECTED

True

False

True

YES (or via auto_reconnect)

When to use each

  • close() : default for long-running consumers using async with. Reconnect remains possible if you want to resume later. Idempotent (safe to call multiple times).

  • force_close() : when something went catastrophically wrong and you want the connection to be irrevocably terminated. Marks is_shutdown=True so external watchdog consumers (using is_recoverable) will NOT restart.

  • shutdown() : when the application is shutting down intentionally (SIGINT, service stop). Equivalent to force_close() plus an explicit shutdown event marker. Watchdog consumers will NOT restart.

  • kill() : test-only simulation of a server-side disconnect. Used to verify reconnection logic. Not for production code.

Watchdog consumer pattern

External watchdog loops should use is_recoverable (not is_dead alone) to distinguish accidental disconnects from intentional shutdowns:

async def watchdog_loop(ws):
    while True:
        await asyncio.sleep(5)
        if ws.is_recoverable:
            await ws.connect()  # Restart accidental disconnect
        elif ws.is_shutdown:
            break  # Intentional shutdown, exit watchdog

Disconnect Reasons

Track why disconnections happened:

from kstlib.websocket import DisconnectReason

# Proactive (user-controlled)
DisconnectReason.USER_REQUESTED      # Manual disconnect
DisconnectReason.SCHEDULED           # Scheduled reconnect
DisconnectReason.CALLBACK_TRIGGERED  # should_disconnect() returned True
DisconnectReason.CONNECTION_LIMIT    # Preemptive (before platform limit)

# Reactive (forced)
DisconnectReason.SERVER_CLOSED   # Server closed connection
DisconnectReason.NETWORK_ERROR   # Network issue
DisconnectReason.PING_TIMEOUT    # No pong response
DisconnectReason.PROTOCOL_ERROR  # Protocol violation

# Check type
reason.is_proactive  # True for user-controlled
reason.is_reactive   # True for forced

Statistics

Monitor connection health:

stats = ws.stats

print(f"Connects: {stats.connects}")
print(f"Disconnects: {stats.disconnects}")
print(f"  Proactive: {stats.proactive_disconnects}")
print(f"  Reactive: {stats.reactive_disconnects}")
print(f"Messages: {stats.messages_received} rx, {stats.messages_sent} tx")
print(f"Bytes: {stats.bytes_received} rx, {stats.bytes_sent} tx")
print(f"Uptime: {stats.uptime:.1f}s")
print(f"Connection time: {stats.connection_time:.1f}s")

Configuration

Settings from kstlib.conf.yml:

websocket:
  # Connection settings
  connect_timeout: 10.0
  ping_interval: 30.0
  ping_timeout: 10.0

  # Reconnection settings
  reconnect_delay: 1.0
  reconnect_delay_max: 60.0
  max_reconnect_attempts: 10

  # Queue settings
  queue_size: 1000

  # Proactive control
  disconnect_check_interval: 5.0

Error Handling

from kstlib.websocket import (
    WebSocketError,
    WebSocketConnectionError,
    WebSocketClosedError,
    WebSocketTimeoutError,
)

try:
    async with WebSocketManager(url) as ws:
        async for message in ws.stream():
            process(message)
except WebSocketConnectionError as e:
    log.error(f"Connection failed: {e.url}, attempts: {e.attempts}")
except WebSocketClosedError as e:
    log.warning(f"Closed: {e.code} - {e.reason}")
except WebSocketTimeoutError as e:
    log.warning(f"Timeout: {e.operation}")
except WebSocketError as e:
    log.error(f"WebSocket error: {e}")

Trading Example

Complete example for Binance kline streaming:

import asyncio
from datetime import datetime
from kstlib.websocket import WebSocketManager, ReconnectStrategy

def seconds_until_candle_close(interval_minutes: int = 240) -> float:
    """Calculate seconds until next candle close."""
    now = datetime.utcnow()
    minutes = now.hour * 60 + now.minute
    next_close = ((minutes // interval_minutes) + 1) * interval_minutes
    seconds_to_close = (next_close - minutes) * 60 - now.second
    return max(0, seconds_to_close)

async def main():
    ws = WebSocketManager(
        url="wss://stream.binance.com:9443/ws/btcusdt@kline_4h",
        reconnect_strategy=ReconnectStrategy.EXPONENTIAL_BACKOFF,
        # Disconnect when > 5 min until candle close
        should_disconnect=lambda: seconds_until_candle_close() > 300,
        # Reconnect when < 2 min until candle close
        should_reconnect=lambda: seconds_until_candle_close() < 120,
        disconnect_check_interval=10.0,
    )

    async with ws:
        async for message in ws.stream():
            kline = message.get("k", {})
            if kline.get("x"):  # Candle closed
                print(f"Candle closed: {kline['c']}")

asyncio.run(main())

See Also