"""Time-based trigger for periodic operations.
This module provides a TimeTrigger class for detecting time boundaries
and scheduling periodic operations based on modulo intervals.
Typical use cases:
- Restart WebSocket connections at market boundaries (4h, 8h candles)
- Execute periodic tasks at fixed intervals aligned to clock time
- Coordinate operations with exchange candlestick closes
Examples:
Basic boundary detection:
>>> from kstlib.helpers import TimeTrigger
>>> trigger = TimeTrigger("4h")
>>> trigger.time_until_next() # doctest: +SKIP
3542.5
With callback for async operations:
>>> import asyncio
>>> async def restart_ws(): # doctest: +SKIP
... print("Restarting WebSocket...")
>>> trigger = TimeTrigger("8h")
>>> await trigger.wait_for_boundary() # doctest: +SKIP
>>> await restart_ws() # doctest: +SKIP
"""
from __future__ import annotations
import asyncio
import re
import threading
from dataclasses import dataclass
from typing import TYPE_CHECKING
import pendulum
from typing_extensions import Self
from kstlib.helpers.exceptions import InvalidModuloError
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
__all__ = ["TimeTrigger", "TimeTriggerStats"]
# Regex for parsing modulo strings like "30m", "4h", "1d"
MODULO_PATTERN = re.compile(r"^(\d+)\s*(s|m|h|d)$", re.IGNORECASE)
# Unit multipliers to seconds
UNIT_SECONDS = {
"s": 1,
"m": 60,
"h": 3600,
"d": 86400,
}
# Hard limits for modulo (deep defense)
HARD_MIN_MODULO_SECONDS = 60 # Minimum 1 minute
HARD_MAX_MODULO_SECONDS = 86400 * 7 # Maximum 1 week
def _parse_modulo(modulo: str) -> int:
"""Parse modulo string to seconds.
Args:
modulo: Duration string like "30m", "4h", "1d".
Returns:
Duration in seconds.
Raises:
InvalidModuloError: If format is invalid or out of bounds.
"""
match = MODULO_PATTERN.match(modulo.strip())
if not match:
msg = (
f"Invalid modulo format: '{modulo}'. "
"Expected format: <number><unit> where unit is s, m, h, or d. "
"Examples: '30m', '4h', '1d'"
)
raise InvalidModuloError(msg)
value = int(match.group(1))
unit = match.group(2).lower()
seconds = value * UNIT_SECONDS[unit]
if seconds < HARD_MIN_MODULO_SECONDS:
msg = f"Modulo too small: {seconds}s < {HARD_MIN_MODULO_SECONDS}s minimum"
raise InvalidModuloError(msg)
if seconds > HARD_MAX_MODULO_SECONDS:
msg = f"Modulo too large: {seconds}s > {HARD_MAX_MODULO_SECONDS}s maximum"
raise InvalidModuloError(msg)
return seconds
@dataclass
class TimeTriggerStats:
"""Statistics for TimeTrigger operations.
Attributes:
triggers_fired: Number of times boundary was triggered.
callbacks_invoked: Number of callback invocations.
last_trigger_at: ISO timestamp of last trigger.
"""
triggers_fired: int = 0
callbacks_invoked: int = 0
last_trigger_at: str | None = None
def record_trigger(self) -> None:
"""Record a trigger event."""
self.triggers_fired += 1
self.last_trigger_at = pendulum.now("UTC").to_iso8601_string()
def record_callback(self) -> None:
"""Record a callback invocation."""
self.callbacks_invoked += 1
[docs]
class TimeTrigger:
"""Time-based trigger for detecting modulo boundaries.
Detects when current time aligns with periodic intervals (boundaries).
Useful for coordinating operations with market candle closes or
scheduling periodic restarts.
Attributes:
modulo: Original modulo string (e.g., "4h").
modulo_seconds: Modulo duration in seconds.
stats: Trigger statistics.
Args:
modulo: Duration string for the interval (e.g., "30m", "4h", "8h", "1d").
timezone: Timezone for calculations (default: UTC).
Raises:
InvalidModuloError: If modulo format is invalid or out of bounds.
Examples:
Create a 4-hour trigger:
>>> trigger = TimeTrigger("4h")
>>> trigger.modulo_seconds
14400
Check boundary status:
>>> trigger.is_at_boundary() # doctest: +SKIP
False
>>> trigger.time_until_next() # doctest: +SKIP
1234.5
Create with different timezone:
>>> trigger = TimeTrigger("1d", timezone="Europe/Paris")
>>> trigger.timezone
'Europe/Paris'
"""
[docs]
def __init__(
self,
modulo: str,
*,
timezone: str = "UTC",
) -> None:
"""Initialize TimeTrigger.
Args:
modulo: Duration string (e.g., "30m", "4h", "1d").
timezone: Timezone for boundary calculations.
"""
self._modulo_str = modulo
self._modulo_seconds = _parse_modulo(modulo)
self._timezone = timezone
self._stats = TimeTriggerStats()
# Async loop state
self._running = False
self._stop_event = threading.Event()
self._async_task: asyncio.Task[None] | None = None
@property
def modulo(self) -> str:
"""Return the original modulo string."""
return self._modulo_str
@property
def modulo_seconds(self) -> int:
"""Return the modulo duration in seconds."""
return self._modulo_seconds
@property
def timezone(self) -> str:
"""Return the timezone used for calculations."""
return self._timezone
@property
def stats(self) -> TimeTriggerStats:
"""Return trigger statistics."""
return self._stats
def _get_current_timestamp(self) -> float:
"""Get current Unix timestamp."""
return pendulum.now(self._timezone).timestamp()
def _seconds_into_period(self) -> float:
"""Get seconds elapsed since last boundary."""
return self._get_current_timestamp() % self._modulo_seconds
[docs]
def time_until_next(self) -> float:
"""Calculate seconds until next boundary.
Returns:
Seconds remaining until the next modulo boundary.
Examples:
>>> trigger = TimeTrigger("4h")
>>> remaining = trigger.time_until_next() # doctest: +SKIP
>>> 0 <= remaining <= 14400 # doctest: +SKIP
True
"""
elapsed = self._seconds_into_period()
if elapsed == 0:
return 0.0
return self._modulo_seconds - elapsed
[docs]
def is_at_boundary(self, margin: float = 1.0) -> bool:
"""Check if current time is at a boundary.
A boundary is when the timestamp is divisible by the modulo.
The margin allows for slight timing imprecision.
Args:
margin: Tolerance in seconds around the boundary (default: 1.0).
Returns:
True if within margin seconds of a boundary.
Examples:
>>> trigger = TimeTrigger("4h")
>>> trigger.is_at_boundary() # doctest: +SKIP
True # If time is 00:00:00, 04:00:00, etc.
>>> trigger.is_at_boundary(margin=5.0) # doctest: +SKIP
True # If time is within 5 seconds of boundary
"""
elapsed = self._seconds_into_period()
# Check if we're near 0 (just passed) or near modulo (about to hit)
return elapsed <= margin or (self._modulo_seconds - elapsed) <= margin
[docs]
def should_trigger(self, margin: float = 30.0) -> bool:
"""Check if trigger should fire (boundary approaching).
Use this to prepare for an upcoming boundary (e.g., start shutdown
sequence before the boundary hits).
Args:
margin: Seconds before boundary to trigger (default: 30.0).
Returns:
True if boundary is within margin seconds.
Examples:
>>> trigger = TimeTrigger("4h")
>>> if trigger.should_trigger(margin=60): # doctest: +SKIP
... print("Boundary in less than 60 seconds!")
"""
remaining = self.time_until_next()
return remaining <= margin
[docs]
def next_boundary(self) -> pendulum.DateTime:
"""Get the datetime of the next boundary.
Returns:
Pendulum DateTime of the next boundary.
Examples:
>>> trigger = TimeTrigger("4h")
>>> next_time = trigger.next_boundary() # doctest: +SKIP
>>> print(next_time.to_iso8601_string()) # doctest: +SKIP
'2024-01-15T08:00:00+00:00'
"""
now = pendulum.now(self._timezone)
seconds_until = self.time_until_next()
return now.add(seconds=seconds_until)
[docs]
def previous_boundary(self) -> pendulum.DateTime:
"""Get the datetime of the previous boundary.
Returns:
Pendulum DateTime of the previous boundary.
Examples:
>>> trigger = TimeTrigger("4h")
>>> prev_time = trigger.previous_boundary() # doctest: +SKIP
>>> print(prev_time.to_iso8601_string()) # doctest: +SKIP
'2024-01-15T04:00:00+00:00'
"""
now = pendulum.now(self._timezone)
elapsed = self._seconds_into_period()
return now.subtract(seconds=elapsed)
[docs]
async def wait_for_boundary(self, margin: float = 0.0) -> None:
"""Wait until the next boundary (async).
Sleeps until the next boundary minus the margin.
Args:
margin: Seconds before boundary to wake up (default: 0.0).
Examples:
>>> import asyncio
>>> trigger = TimeTrigger("30m")
>>> await trigger.wait_for_boundary() # doctest: +SKIP
>>> print("Boundary reached!") # doctest: +SKIP
"""
remaining = self.time_until_next() - margin
if remaining > 0:
await asyncio.sleep(remaining)
self._stats.record_trigger()
[docs]
async def run_on_boundary(
self,
callback: Callable[[], None] | Callable[[], Awaitable[None]],
*,
margin: float = 0.0,
run_immediately: bool = False,
) -> None:
"""Run callback at each boundary (async loop).
Continuously waits for boundaries and invokes the callback.
Call stop() to terminate the loop.
Args:
callback: Function to call at each boundary (sync or async).
margin: Seconds before boundary to invoke callback.
run_immediately: If True, run callback immediately before first wait.
Examples:
>>> import asyncio
>>> async def restart(): # doctest: +SKIP
... print("Restarting...")
>>> trigger = TimeTrigger("4h")
>>> task = asyncio.create_task( # doctest: +SKIP
... trigger.run_on_boundary(restart, margin=30)
... )
>>> # Later: trigger.stop()
"""
self._running = True
self._stop_event.clear()
if run_immediately:
await self._invoke_callback(callback)
while self._running:
await self.wait_for_boundary(margin=margin)
# Re-check after await since stop() may have been called concurrently
if self._stop_event.is_set():
break
await self._invoke_callback(callback)
async def _invoke_callback(
self,
callback: Callable[[], None] | Callable[[], Awaitable[None]],
) -> None:
"""Invoke callback (sync or async)."""
self._stats.record_callback()
result = callback()
if asyncio.iscoroutine(result):
await result
[docs]
def stop(self) -> None:
"""Stop the boundary loop."""
self._running = False
self._stop_event.set()
if self._async_task is not None and not self._async_task.done():
self._async_task.cancel()
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return f"TimeTrigger(modulo={self._modulo_str!r}, timezone={self._timezone!r})"
[docs]
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return self
[docs]
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
"""Async context manager exit."""
self.stop()