Source code for kstlib.ui.spinner

"""Animated spinner utilities for CLI feedback during long operations."""

from __future__ import annotations

import functools
import io
import sys
import threading
import time
from collections import deque
from enum import Enum
from typing import IO, TYPE_CHECKING, Any, ParamSpec, TypeVar

if TYPE_CHECKING:
    from collections.abc import Callable

from rich.console import Console
from rich.text import Text
from typing_extensions import Self

from kstlib.config import ConfigNotLoadedError, get_config
from kstlib.ui.exceptions import SpinnerError

if TYPE_CHECKING:
    import types

    from rich.style import Style

P = ParamSpec("P")
R = TypeVar("R")


[docs] class SpinnerStyle(Enum): """Predefined spinner animation families. Each style defines a sequence of frames that cycle during animation. """ BRAILLE = ("⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷") DOTS = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏") LINE = ("|", "/", "-", "\\") ARROW = ("←", "↖", "↑", "↗", "→", "↘", "↓", "↙") BLOCKS = ("▁", "▂", "▃", "▄", "▅", "▆", "▇", "█", "▇", "▆", "▅", "▄", "▃", "▂") CIRCLE = ("◐", "◓", "◑", "◒") SQUARE = ("◰", "◳", "◲", "◱") MOON = ("🌑", "🌒", "🌓", "🌔", "🌕", "🌖", "🌗", "🌘") CLOCK = ("🕐", "🕑", "🕒", "🕓", "🕔", "🕕", "🕖", "🕗", "🕘", "🕙", "🕚", "🕛")
[docs] class SpinnerPosition(Enum): """Position of the spinner relative to the message text.""" BEFORE = "before" AFTER = "after"
[docs] class SpinnerAnimationType(Enum): """Type of animation to display.""" SPIN = "spin" BOUNCE = "bounce" COLOR_WAVE = "color_wave"
# Default configuration DEFAULT_SPINNER_CONFIG: dict[str, Any] = { "defaults": { "style": "BRAILLE", "position": "before", "animation_type": "spin", "interval": 0.08, "spinner_style": "cyan", "text_style": None, "done_character": "✓", "done_style": "green", "fail_character": "✗", "fail_style": "red", }, "presets": { "minimal": { "style": "LINE", "spinner_style": "dim white", "interval": 0.1, }, "fancy": { "style": "BRAILLE", "spinner_style": "bold cyan", "interval": 0.06, }, "blocks": { "style": "BLOCKS", "spinner_style": "blue", "interval": 0.05, }, "bounce": { "animation_type": "bounce", "interval": 0.08, "spinner_style": "yellow", }, "color_wave": { "animation_type": "color_wave", "interval": 0.1, }, }, } # Color wave palette COLOR_WAVE_COLORS = [ "bright_blue", "cyan", "bright_cyan", "white", "bright_cyan", "cyan", ] # Bounce bar width BOUNCE_WIDTH = 20 BOUNCE_CHAR = "=" BOUNCE_BRACKET_LEFT = "[" BOUNCE_BRACKET_RIGHT = "]" def _load_spinner_config() -> dict[str, Any]: """Load spinner configuration from kstlib.conf.yml with fallback to defaults.""" try: config = get_config().to_dict() ui_config = config.get("ui", {}) spinners_config = ui_config.get("spinners", {}) if spinners_config: # Merge with defaults to ensure all keys exist merged: dict[str, Any] = { "defaults": {**DEFAULT_SPINNER_CONFIG["defaults"]}, "presets": {**DEFAULT_SPINNER_CONFIG["presets"]}, } if "defaults" in spinners_config: merged["defaults"].update(spinners_config["defaults"]) if "presets" in spinners_config: for preset_name, preset_vals in spinners_config["presets"].items(): if preset_name in merged["presets"]: merged["presets"][preset_name].update(preset_vals) else: merged["presets"][preset_name] = dict(preset_vals) return merged except ConfigNotLoadedError: pass return DEFAULT_SPINNER_CONFIG
[docs] class Spinner: """Animated spinner for CLI feedback during long operations. Supports multiple animation styles including character spinners, bouncing bars, and color wave effects. Can be used as a context manager or controlled manually. Args: message: Text to display alongside the spinner. style: Spinner animation style (SpinnerStyle enum or string name). position: Where to place spinner relative to text (before/after). animation_type: Type of animation (spin/bounce/color_wave). interval: Seconds between animation frames. spinner_style: Rich style for the spinner character. text_style: Rich style for the message text. console: Optional Rich console instance. file: Output stream (defaults to sys.stderr). Examples: Create a spinner with default settings: >>> spinner = Spinner("Loading...") >>> spinner.message 'Loading...' Create with custom style: >>> spinner = Spinner("Working", style=SpinnerStyle.DOTS) >>> spinner = Spinner("Building", style="BLOCKS", interval=0.1) Using as a context manager (terminal I/O): >>> with Spinner("Processing...") as s: # doctest: +SKIP ... do_long_operation() ... s.update("Almost done...") Manual control: >>> spinner = Spinner("Working...") # doctest: +SKIP >>> spinner.start() # doctest: +SKIP >>> spinner.stop(success=True) # doctest: +SKIP """
[docs] def __init__( self, message: str = "", *, style: SpinnerStyle | str = SpinnerStyle.BRAILLE, position: SpinnerPosition | str = SpinnerPosition.BEFORE, animation_type: SpinnerAnimationType | str = SpinnerAnimationType.SPIN, interval: float = 0.08, spinner_style: str | Style | None = "cyan", text_style: str | Style | None = None, done_character: str = "✓", done_style: str | Style | None = "green", fail_character: str = "✗", fail_style: str | Style | None = "red", console: Console | None = None, file: IO[str] | None = None, ) -> None: """Initialize spinner with configuration.""" self._message = message self._style = self._resolve_style(style) self._position = self._resolve_position(position) self._animation_type = self._resolve_animation_type(animation_type) self._interval = interval self._spinner_style = spinner_style self._text_style = text_style self._done_character = done_character self._done_style = done_style self._fail_character = fail_character self._fail_style = fail_style self._file = file or sys.stderr self._console = console or Console(file=self._file, force_terminal=True) self._running = False self._thread: threading.Thread | None = None self._lock = threading.Lock() self._frame_index = 0 self._bounce_position = 0 self._bounce_direction = 1 self._color_offset = 0
[docs] @classmethod def from_preset( cls, preset: str, message: str = "", *, console: Console | None = None, **overrides: Any, ) -> Spinner: """Create a spinner from a named preset. Args: preset: Name of the preset (e.g., "minimal", "fancy", "bounce"). message: Text to display alongside the spinner. console: Optional Rich console instance. **overrides: Additional parameters to override preset values. Returns: Configured Spinner instance. Raises: SpinnerError: If preset name is not found. Examples: Create from a built-in preset: >>> spinner = Spinner.from_preset("minimal", "Loading...") >>> spinner = Spinner.from_preset("fancy", "Processing data") Override preset values: >>> spinner = Spinner.from_preset("bounce", "Building", interval=0.05) Invalid preset raises error: >>> Spinner.from_preset("nonexistent") # doctest: +ELLIPSIS Traceback (most recent call last): ... kstlib.ui.exceptions.SpinnerError: Unknown preset 'nonexistent'. ... """ config = _load_spinner_config() presets = config.get("presets", {}) if preset not in presets: available = ", ".join(presets.keys()) raise SpinnerError(f"Unknown preset '{preset}'. Available: {available}") defaults = config.get("defaults", {}).copy() preset_config = presets[preset].copy() merged = {**defaults, **preset_config, **overrides} return cls( message, style=merged.get("style", SpinnerStyle.BRAILLE), position=merged.get("position", SpinnerPosition.BEFORE), animation_type=merged.get("animation_type", SpinnerAnimationType.SPIN), interval=merged.get("interval", 0.08), spinner_style=merged.get("spinner_style"), text_style=merged.get("text_style"), done_character=merged.get("done_character", "✓"), done_style=merged.get("done_style", "green"), fail_character=merged.get("fail_character", "✗"), fail_style=merged.get("fail_style", "red"), console=console, )
[docs] def start(self) -> None: """Start the spinner animation in a background thread.""" if self._running: return self._running = True self._hide_cursor() self._thread = threading.Thread(target=self._animate, daemon=True) self._thread.start()
[docs] def stop(self, *, success: bool = True, final_message: str | None = None) -> None: """Stop the spinner animation. Args: success: If True, show done character; if False, show fail character. final_message: Optional message to display after stopping. """ if not self._running: return self._running = False if self._thread is not None: self._thread.join(timeout=1.0) self._thread = None self._clear_line() self._show_cursor() self._render_final(success=success, final_message=final_message)
[docs] def update(self, message: str) -> None: """Update the spinner message while running. Args: message: New message to display. """ with self._lock: self._message = message
[docs] def log(self, message: str, style: str | None = None) -> None: """Print a message above the spinner without disrupting animation. Use this to display logs, progress info, or any output while the spinner continues running on the bottom line. Args: message: Text to print above the spinner. style: Optional Rich style for the message. """ with self._lock: # Clear spinner line self._file.write("\r\033[K") self._file.flush() # Print the log message if style: self._console.print(f"[{style}]{message}[/{style}]") else: self._console.print(message)
# Spinner will redraw on next frame @property def message(self) -> str: """Current spinner message.""" with self._lock: return self._message
[docs] def __enter__(self) -> Self: """Start spinner when entering context.""" self.start() return self
[docs] def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None, ) -> None: """Stop spinner when exiting context.""" success = exc_type is None self.stop(success=success)
# ------------------------------------------------------------------ # Animation loop # ------------------------------------------------------------------ def _animate(self) -> None: """Run the main animation loop in the background thread.""" while self._running: self._render_frame() time.sleep(self._interval) def _render_frame(self) -> None: """Render a single animation frame.""" self._move_to_line_start() if self._animation_type == SpinnerAnimationType.SPIN: self._render_spin_frame() elif self._animation_type == SpinnerAnimationType.BOUNCE: self._render_bounce_frame() elif self._animation_type == SpinnerAnimationType.COLOR_WAVE: self._render_color_wave_frame() def _render_spin_frame(self) -> None: """Render classic spinner animation frame.""" frames = self._style.value frame_char = frames[self._frame_index % len(frames)] self._frame_index += 1 spinner_text = Text(frame_char, style=self._spinner_style or "") with self._lock: message = self._message message_text = self._styled_message(message) if self._position == SpinnerPosition.BEFORE: output = Text.assemble(spinner_text, " ", message_text) else: output = Text.assemble(message_text, " ", spinner_text) self._console.print(output, end="") def _render_bounce_frame(self) -> None: """Render bouncing bar animation frame.""" # Build the bar: [= ] with = bouncing bar_inner = [" "] * BOUNCE_WIDTH bar_inner[self._bounce_position] = BOUNCE_CHAR # Update bounce position self._bounce_position += self._bounce_direction if self._bounce_position >= BOUNCE_WIDTH - 1: self._bounce_direction = -1 elif self._bounce_position <= 0: self._bounce_direction = 1 bar = BOUNCE_BRACKET_LEFT + "".join(bar_inner) + BOUNCE_BRACKET_RIGHT bar_text = Text(bar, style=self._spinner_style or "") with self._lock: message = self._message message_text = self._styled_message(message) if self._position == SpinnerPosition.BEFORE: output = Text.assemble(bar_text, " ", message_text) else: output = Text.assemble(message_text, " ", bar_text) self._console.print(output, end="") def _render_color_wave_frame(self) -> None: """Render color wave animation through text.""" with self._lock: message = self._message if not message: return output = Text() for i, char in enumerate(message): color_index = (i + self._color_offset) % len(COLOR_WAVE_COLORS) output.append(char, style=COLOR_WAVE_COLORS[color_index]) self._color_offset += 1 self._console.print(output, end="") def _render_final(self, *, success: bool, final_message: str | None) -> None: """Render final state after stopping.""" if self._animation_type == SpinnerAnimationType.COLOR_WAVE: # For color wave, just print the message normally with self._lock: message = final_message or self._message if message: style = self._done_style if success else self._fail_style self._console.print(Text(message, style=style or "")) return char = self._done_character if success else self._fail_character char_style = self._done_style if success else self._fail_style final_char = Text(char, style=char_style or "") with self._lock: message = final_message or self._message message_text = self._styled_message(message) if self._position == SpinnerPosition.BEFORE: output = Text.assemble(final_char, " ", message_text) else: output = Text.assemble(message_text, " ", final_char) self._console.print(output) def _styled_message(self, message: str) -> Text: """Create a styled Text object for the message.""" if self._text_style: return Text(message, style=self._text_style) return Text(message) def _move_to_line_start(self) -> None: """Move cursor to beginning of line without clearing.""" self._file.write("\r") self._file.flush() def _clear_line(self) -> None: """Clear the current console line.""" # Write ANSI sequences directly to file descriptor (Rich doesn't pass them through) self._file.write("\r\033[K") self._file.flush() def _hide_cursor(self) -> None: """Hide terminal cursor to reduce flickering.""" self._file.write("\033[?25l") self._file.flush() def _show_cursor(self) -> None: """Show terminal cursor.""" self._file.write("\033[?25h") self._file.flush() # ------------------------------------------------------------------ # Resolution helpers # ------------------------------------------------------------------ @staticmethod def _resolve_style(style: SpinnerStyle | str) -> SpinnerStyle: """Convert string to SpinnerStyle enum if needed.""" if isinstance(style, SpinnerStyle): return style try: return SpinnerStyle[style.upper()] except KeyError as exc: available = ", ".join(s.name for s in SpinnerStyle) raise SpinnerError(f"Unknown spinner style '{style}'. Available: {available}") from exc @staticmethod def _resolve_position(position: SpinnerPosition | str) -> SpinnerPosition: """Convert string to SpinnerPosition enum if needed.""" if isinstance(position, SpinnerPosition): return position try: return SpinnerPosition(position.lower()) except ValueError as exc: raise SpinnerError(f"Invalid position '{position}'. Use 'before' or 'after'.") from exc @staticmethod def _resolve_animation_type(animation_type: SpinnerAnimationType | str) -> SpinnerAnimationType: """Convert string to SpinnerAnimationType enum if needed.""" if isinstance(animation_type, SpinnerAnimationType): return animation_type try: return SpinnerAnimationType(animation_type.lower()) except ValueError as exc: available = ", ".join(t.value for t in SpinnerAnimationType) raise SpinnerError(f"Invalid animation type '{animation_type}'. Available: {available}") from exc
# ============================================================================== # Decorator for capturing prints # ============================================================================== class _PrintCapture(io.StringIO): """Captures print output and redirects to spinner.log().""" def __init__( self, spinner: Spinner | SpinnerWithLogZone, style: str | None = None, ) -> None: super().__init__() self._spinner: Spinner | SpinnerWithLogZone = spinner self._style = style def write(self, text: str) -> int: """Intercept write calls and send to spinner.log().""" # Filter out empty strings and lone newlines stripped = text.rstrip("\n") if stripped: self._spinner.log(stripped, style=self._style) return len(text)
[docs] def with_spinner( message: str = "Processing...", *, style: SpinnerStyle | str = SpinnerStyle.BRAILLE, log_style: str | None = "dim", capture_prints: bool = True, log_zone_height: int | None = None, **spinner_kwargs: Any, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """Wrap a function with a spinner, capturing its prints. Args: message: Spinner message to display. style: Spinner animation style. log_style: Style for captured print output (None for no style). capture_prints: If True, redirect stdout to spinner.log(). log_zone_height: If set, use SpinnerWithLogZone with fixed height. The spinner stays at top, logs scroll in bounded zone below. **spinner_kwargs: Additional arguments passed to Spinner. Returns: Decorated function. Examples: Basic decorator usage (terminal I/O): >>> @with_spinner("Loading data...") # doctest: +SKIP ... def load_data(): ... return {"data": [1, 2, 3]} >>> result = load_data() # doctest: +SKIP With log capture (prints appear above spinner): >>> @with_spinner("Processing...", log_style="cyan") # doctest: +SKIP ... def process(): ... print("Step 1 complete") # Appears above spinner ... print("Step 2 complete") ... return True Fixed log zone with bounded scrolling: >>> @with_spinner("Building...", log_zone_height=5) # doctest: +SKIP ... def build(): ... for i in range(10): ... print(f"Step {i}") # Scrolls in 5-line zone ... return True """ def decorator(func: Callable[P, R]) -> Callable[P, R]: @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # Choose spinner type based on log_zone_height if log_zone_height is not None: spinner: Spinner | SpinnerWithLogZone = SpinnerWithLogZone( message, style=style, log_zone_height=log_zone_height, **spinner_kwargs, ) else: spinner = Spinner(message, style=style, **spinner_kwargs) with spinner: if capture_prints: capture = _PrintCapture(spinner, style=log_style) old_stdout = sys.stdout sys.stdout = capture try: return func(*args, **kwargs) finally: sys.stdout = old_stdout else: return func(*args, **kwargs) return wrapper return decorator
# ============================================================================== # Spinner with fixed log zone # ==============================================================================
[docs] class SpinnerWithLogZone: """Spinner with a fixed position and a scrollable log zone. The spinner stays fixed at the top while logs scroll in a zone below. When the zone is full, old logs are pushed out automatically. Args: message: Spinner message. log_zone_height: Number of lines for the log zone (default 10). style: Spinner animation style. spinner_style: Rich style for spinner character. console: Optional Rich console. **kwargs: Additional Spinner arguments. Examples: Create with custom log zone height: >>> sz = SpinnerWithLogZone("Building...", log_zone_height=5) >>> sz._log_zone_height 5 Usage as context manager (terminal I/O): >>> with SpinnerWithLogZone("Processing", log_zone_height=3) as sz: # doctest: +SKIP ... sz.log("Step 1 done") ... sz.log("Step 2 done") ... sz.update("Almost finished...") """
[docs] def __init__( self, message: str = "", *, log_zone_height: int = 10, style: SpinnerStyle | str = SpinnerStyle.BRAILLE, spinner_style: str | None = "cyan", console: Console | None = None, file: IO[str] | None = None, interval: float = 0.08, ) -> None: """Initialize the spinner with a fixed-height log zone and the usual Spinner options.""" self._message = message self._log_zone_height = log_zone_height self._style = Spinner._resolve_style(style) self._spinner_style = spinner_style self._interval = interval self._file = file or sys.stderr self._console = console or Console(file=self._file, force_terminal=True) self._logs: deque[str] = deque(maxlen=log_zone_height) self._running = False self._thread: threading.Thread | None = None self._lock = threading.Lock() self._frame_index = 0 self._initialized = False self._logs_dirty = False # Track if logs need redraw self._last_message = "" # Track message changes
[docs] def start(self) -> None: """Start the spinner animation.""" if self._running: return self._running = True self._setup_zone() self._hide_cursor() self._thread = threading.Thread(target=self._animate, daemon=True) self._thread.start()
[docs] def stop(self, *, success: bool = True, final_message: str | None = None) -> None: """Stop the spinner and clean up the display.""" if not self._running: return self._running = False if self._thread is not None: self._thread.join(timeout=1.0) self._thread = None self._show_cursor() self._render_final(success, final_message)
[docs] def update(self, message: str) -> None: """Update the spinner message.""" with self._lock: self._message = message
[docs] def log(self, message: str, style: str | None = None) -> None: """Add a log entry to the scrolling zone.""" with self._lock: if style: self._logs.append(f"[{style}]{message}[/{style}]") else: self._logs.append(message) self._logs_dirty = True
[docs] def __enter__(self) -> Self: """Start spinner when entering context.""" self.start() return self
[docs] def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None, ) -> None: """Stop spinner when exiting context.""" self.stop(success=exc_type is None)
def _setup_zone(self) -> None: """Reserve space for the log zone by printing newlines.""" if self._initialized: return # Print empty lines to reserve space # +1 for spinner line at top self._file.write("\n" * (self._log_zone_height + 1)) self._file.flush() self._initialized = True def _animate(self) -> None: """Animation loop.""" while self._running: self._render_frame() time.sleep(self._interval) def _render_frame(self) -> None: """Render spinner and log zone (optimized: only redraw what changed).""" frames = self._style.value frame_char = frames[self._frame_index % len(frames)] self._frame_index += 1 with self._lock: message = self._message logs_dirty = self._logs_dirty logs = list(self._logs) if logs_dirty else [] message_changed = message != self._last_message self._logs_dirty = False self._last_message = message # Move cursor to spinner line (top of zone) total_lines = self._log_zone_height + 1 self._file.write(f"\033[{total_lines}A") # Move up # Always render spinner line (just overwrite, no clear needed for spinner char) self._file.write("\r") spinner_text = Text(frame_char, style=self._spinner_style or "") msg_text = Text(f" {message}") self._console.print(Text.assemble(spinner_text, msg_text), end="") # Pad with spaces if message got shorter if message_changed: self._file.write("\033[K") # Clear rest of line # Only redraw logs if they changed if logs_dirty: self._file.write("\n") # Move to first log line for i in range(self._log_zone_height): self._file.write("\033[K") # Clear line if i < len(logs): self._console.print(f" {logs[i]}", end="") if i < self._log_zone_height - 1: self._file.write("\n") self._file.write("\n") # Final newline to position cursor at bottom else: # Just move cursor back to bottom without redrawing logs self._file.write(f"\033[{self._log_zone_height}B") # Move down self._file.write("\n") self._file.flush() def _render_final(self, success: bool, final_message: str | None) -> None: """Render final state.""" char = "✓" if success else "✗" char_style = "green" if success else "red" message = final_message or self._message with self._lock: logs = list(self._logs) # Move to top of zone total_lines = self._log_zone_height + 1 self._file.write(f"\033[{total_lines}A") # Final spinner line self._file.write("\r\033[K") self._console.print(f"[{char_style}]{char}[/{char_style}] {message}") # Render remaining logs for i in range(self._log_zone_height): self._file.write("\033[K") if i < len(logs): self._console.print(f" {logs[i]}", end="") self._file.write("\n") self._file.flush() def _hide_cursor(self) -> None: """Hide cursor.""" self._file.write("\033[?25l") self._file.flush() def _show_cursor(self) -> None: """Show cursor.""" self._file.write("\033[?25h") self._file.flush()