"""Session manager facade for unified backend access.
This module provides the SessionManager class, a config-driven facade
that abstracts the underlying backend (tmux or container) and provides
a unified interface for session management.
Features:
- Automatic backend selection based on configuration
- Config-driven session creation from kstlib.conf.yml
- Unified API for start, stop, attach, status, and logs
- Support for both tmux and container backends
Example:
>>> from kstlib.ops import SessionManager
>>> # Local dev with tmux
>>> session = SessionManager("dev", backend="tmux")
>>> session.start("python -m app") # doctest: +SKIP
>>> session.attach() # doctest: +SKIP
>>> # From config file
>>> session = SessionManager.from_config("astro") # doctest: +SKIP
>>> session.start() # doctest: +SKIP
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from kstlib.ops.container import ContainerRunner
from kstlib.ops.exceptions import (
ContainerRuntimeNotFoundError,
OpsError,
SessionAmbiguousError,
TmuxNotFoundError,
)
from kstlib.ops.models import BackendType, SessionConfig, SessionState, SessionStatus
from kstlib.ops.tmux import TmuxRunner
from kstlib.ops.validators import validate_session_name
if TYPE_CHECKING:
from kstlib.ops.base import AbstractRunner
logger = logging.getLogger(__name__)
[docs]
def auto_detect_backend(
name: str,
*,
socket_name: str | None = None,
) -> BackendType | None:
"""Auto-detect which backend a session exists in.
Checks both tmux and container backends to find where a session with
the given name exists. Skips backends that are not available (binary
not found).
Args:
name: Session name to search for.
socket_name: Custom tmux socket name to check.
Returns:
BackendType if found in exactly one backend, None if not found.
Raises:
SessionAmbiguousError: If session exists in multiple backends.
Examples:
>>> # Session exists in tmux only
>>> backend = auto_detect_backend("mybot") # doctest: +SKIP
>>> backend == BackendType.TMUX # doctest: +SKIP
True
>>> # Session not found
>>> auto_detect_backend("nonexistent") is None # doctest: +SKIP
True
"""
found_in: list[str] = []
# Check tmux backend (skip if not installed)
try:
tmux_runner = TmuxRunner(socket_name=socket_name)
if tmux_runner.exists(name):
found_in.append("tmux")
except TmuxNotFoundError:
logger.debug("tmux not found, skipping tmux backend check")
# Check container backend (skip if not installed)
try:
container_runner = ContainerRunner()
if container_runner.exists(name):
found_in.append("container")
except ContainerRuntimeNotFoundError:
logger.debug("container runtime not found, skipping container backend check")
# Return based on findings
if len(found_in) == 0:
return None
if len(found_in) == 1:
return BackendType(found_in[0])
# Found in multiple backends
raise SessionAmbiguousError(name, found_in)
[docs]
class SessionConfigError(OpsError):
"""Configuration error for session management.
Raised when session configuration is invalid or missing required fields.
"""
[docs]
class SessionManager:
"""Config-driven session manager with backend abstraction.
Provides a unified interface for managing sessions across different
backends (tmux, container). The backend can be specified directly
or loaded from kstlib.conf.yml configuration.
Args:
name: Unique session name.
backend: Backend type ("tmux" or "container").
**kwargs: Backend-specific options (image, volumes, ports, etc.).
Attributes:
name: The session name.
backend: The backend type being used.
config: The full session configuration.
Examples:
>>> # Direct instantiation with tmux
>>> session = SessionManager("dev", backend="tmux")
>>> session.start("python app.py") # doctest: +SKIP
>>> session.attach() # doctest: +SKIP
>>> # Direct instantiation with container
>>> session = SessionManager(
... "prod",
... backend="container",
... image="app:latest",
... volumes=["./data:/app/data"],
... )
>>> session.start() # doctest: +SKIP
>>> # From config file (recommended)
>>> session = SessionManager.from_config("astro") # doctest: +SKIP
"""
[docs]
def __init__(
self,
name: str,
*,
backend: str | BackendType = BackendType.TMUX,
**kwargs: Any,
) -> None:
"""Initialize SessionManager.
Args:
name: Unique session name.
backend: Backend type ("tmux" or "container").
**kwargs: Backend-specific options.
Raises:
SessionConfigError: If configuration is invalid.
"""
# Validate session name early for better error messages
try:
validate_session_name(name)
except ValueError as e:
raise SessionConfigError(str(e)) from None
self._name = name
# Normalize backend type
if isinstance(backend, BackendType):
self._backend = backend
else:
# Must be a string - convert to BackendType
try:
self._backend = BackendType(backend.lower())
except ValueError:
raise SessionConfigError(f"Invalid backend '{backend}'. Must be 'tmux' or 'container'.") from None
# Build session config (validation happens in SessionConfig.__post_init__)
try:
self._config = SessionConfig(
name=name,
backend=self._backend,
command=kwargs.get("command"),
working_dir=kwargs.get("working_dir"),
env=kwargs.get("env", {}),
image=kwargs.get("image"),
volumes=kwargs.get("volumes", []),
ports=kwargs.get("ports", []),
runtime=kwargs.get("runtime"),
log_volume=kwargs.get("log_volume"),
)
except ValueError as e:
raise SessionConfigError(str(e)) from None
# Initialize the appropriate runner
self._socket_name: str | None = kwargs.get("socket_name")
self._runner: AbstractRunner
if self._backend == BackendType.TMUX:
tmux_binary = kwargs.get("tmux_binary", "tmux")
self._runner = TmuxRunner(
binary=tmux_binary,
socket_name=self._socket_name,
)
else:
runtime = kwargs.get("runtime") # None = auto-detect
self._runner = ContainerRunner(runtime=runtime)
@property
def name(self) -> str:
"""Return the session name."""
return self._name
@property
def backend(self) -> BackendType:
"""Return the backend type."""
return self._backend
@property
def config(self) -> SessionConfig:
"""Return the session configuration."""
return self._config
[docs]
@classmethod
def from_config(
cls,
name: str,
) -> SessionManager:
"""Create SessionManager from kstlib configuration.
Loads session configuration from kstlib.conf.yml under the
ops.sessions.{name} key.
Args:
name: Session name to load from config.
Returns:
SessionManager configured from the config file.
Raises:
SessionConfigError: If session not found in config.
Example:
Config file (kstlib.conf.yml)::
ops:
sessions:
astro:
backend: tmux
command: "python -m astro.bot"
working_dir: "/opt/astro"
Usage::
>>> session = SessionManager.from_config("astro") # doctest: +SKIP
"""
from kstlib.config import get_config
config = get_config()
# Navigate to ops.sessions (Box is dynamically typed)
ops_config: dict[str, Any] = config.get("ops", {}) # type: ignore[no-untyped-call]
sessions_config: dict[str, Any] = ops_config.get("sessions", {})
if name not in sessions_config:
available = list(sessions_config)
raise SessionConfigError(f"Session '{name}' not found in config. Available sessions: {available or 'none'}")
session_data = sessions_config[name]
# Get defaults from ops config
default_backend = ops_config.get("default_backend", "tmux")
tmux_binary = ops_config.get("tmux_binary", "tmux")
container_runtime = ops_config.get("container_runtime") # None = auto-detect
# Build kwargs from session config
backend = session_data.get("backend", default_backend)
kwargs: dict[str, Any] = {
"backend": backend,
"command": session_data.get("command"),
"working_dir": session_data.get("working_dir"),
"env": session_data.get("env", {}),
"image": session_data.get("image"),
"volumes": session_data.get("volumes", []),
"ports": session_data.get("ports", []),
"log_volume": session_data.get("log_volume"),
"tmux_binary": tmux_binary,
"runtime": session_data.get("runtime", container_runtime),
"socket_name": session_data.get("socket_name"),
}
return cls(name, **kwargs)
[docs]
def start(
self,
command: str | None = None,
**kwargs: Any,
) -> SessionStatus:
"""Start the session.
Args:
command: Command to run (overrides config).
**kwargs: Additional options to override config.
Returns:
SessionStatus with current state.
Raises:
SessionExistsError: If session already exists.
SessionStartError: If session failed to start.
"""
# Build effective config with overrides
config_dict = {
"name": self._name,
"backend": self._backend,
"command": command or self._config.command,
"working_dir": kwargs.get("working_dir", self._config.working_dir),
"env": {**self._config.env, **kwargs.get("env", {})},
"image": kwargs.get("image", self._config.image),
"volumes": kwargs.get("volumes", list(self._config.volumes)),
"ports": kwargs.get("ports", list(self._config.ports)),
"runtime": kwargs.get("runtime", self._config.runtime),
"log_volume": kwargs.get("log_volume", self._config.log_volume),
}
effective_config = SessionConfig(**config_dict)
return self._runner.start(effective_config)
[docs]
def stop(
self,
*,
graceful: bool = True,
timeout: int = 10,
) -> bool:
"""Stop the session.
Args:
graceful: If True, attempt graceful shutdown first.
timeout: Seconds to wait for graceful shutdown.
Returns:
True if stopped successfully.
Raises:
SessionNotFoundError: If session does not exist.
SessionStopError: If session could not be stopped.
"""
return self._runner.stop(self._name, graceful=graceful, timeout=timeout)
[docs]
def attach(self) -> None:
"""Attach to the session.
This method replaces the current process. It does not return on success.
Raises:
SessionNotFoundError: If session does not exist.
SessionAttachError: If attachment failed.
"""
self._runner.attach(self._name)
[docs]
def status(self) -> SessionStatus:
"""Get current session status.
Returns:
SessionStatus with current state.
Raises:
SessionNotFoundError: If session does not exist.
"""
return self._runner.status(self._name)
[docs]
def logs(self, lines: int = 100) -> str:
"""Get recent session logs.
Args:
lines: Number of lines to retrieve.
Returns:
Log output as string (ANSI codes preserved).
Raises:
SessionNotFoundError: If session does not exist.
"""
return self._runner.logs(self._name, lines=lines)
[docs]
def exists(self) -> bool:
"""Check if the session exists.
Returns:
True if session exists, False otherwise.
"""
return self._runner.exists(self._name)
[docs]
def is_running(self) -> bool:
"""Check if the session is currently running.
Returns:
True if running, False otherwise.
"""
if not self.exists():
return False
try:
status = self.status()
return status.state == SessionState.RUNNING
except Exception:
return False
__all__ = [
"SessionConfigError",
"SessionManager",
"auto_detect_backend",
]