Source code for kstlib.ops.container

"""Container session runner for production environments.

This module provides the ContainerRunner class for managing Podman/Docker
containers, enabling persistent processes with pseudo-terminal support.

Features:
- Create named containers with custom images and volumes
- Attach to running containers (replaces current process)
- Retrieve container logs with ANSI codes preserved
- Support for both Podman and Docker runtimes
- Automatic log volume mounting for post-mortem analysis

Example:
    >>> from kstlib.ops import SessionConfig, BackendType
    >>> from kstlib.ops.container import ContainerRunner
    >>> runner = ContainerRunner(runtime="podman")  # doctest: +SKIP
    >>> config = SessionConfig(  # doctest: +SKIP
    ...     name="bot",
    ...     backend=BackendType.CONTAINER,
    ...     image="bot:latest",
    ...     volumes=["./data:/app/data"],
    ... )
    >>> status = runner.start(config)  # doctest: +SKIP
    >>> runner.attach("bot")  # Replaces process  # doctest: +SKIP

"""

from __future__ import annotations

import json
import logging
import shlex
import shutil
import subprocess
from typing import TYPE_CHECKING, Any

from kstlib.ops.exceptions import (
    ContainerRuntimeNotFoundError,
    SessionAttachError,
    SessionExistsError,
    SessionNotFoundError,
    SessionStartError,
    SessionStopError,
)
from kstlib.ops.models import BackendType, SessionConfig, SessionState, SessionStatus
from kstlib.ops.validators import validate_command

if TYPE_CHECKING:
    from collections.abc import Sequence

logger = logging.getLogger(__name__)


[docs] class ContainerRunner: """Container runner for Podman/Docker containers. Manages containers for running persistent processes with pseudo-terminal support for TUI applications. Args: runtime: Container runtime to use ("podman" or "docker"). Attributes: runtime: The container runtime name. binary: The validated runtime binary path. Examples: >>> runner = ContainerRunner() # Uses podman by default # doctest: +SKIP >>> runner = ContainerRunner(runtime="docker") # doctest: +SKIP >>> config = SessionConfig( ... name="app", ... backend=BackendType.CONTAINER, ... image="python:3.10-slim", ... ) >>> status = runner.start(config) # doctest: +SKIP """
[docs] def __init__(self, runtime: str | None = None) -> None: """Initialize ContainerRunner. Args: runtime: Container runtime ("podman", "docker", or None for auto-detect). Auto-detection tries podman first, then docker. """ if runtime is None: # Auto-detect: try podman first, then docker if shutil.which("podman"): self._runtime = "podman" elif shutil.which("docker"): self._runtime = "docker" else: self._runtime = "podman" # Will fail with clear error message else: self._runtime = runtime self._binary_path: str | None = None
@property def runtime(self) -> str: """Return the configured runtime name.""" return self._runtime @property def binary(self) -> str: """Return validated container runtime binary path. Raises: ContainerRuntimeNotFoundError: If runtime is not installed. """ if self._binary_path is None: path = shutil.which(self._runtime) if path is None: raise ContainerRuntimeNotFoundError( f"Container runtime '{self._runtime}' not found in PATH. " f"Install {self._runtime}: https://{'podman.io' if self._runtime == 'podman' else 'docker.com'}" ) self._binary_path = path return self._binary_path def _run( self, args: Sequence[str], *, check: bool = False, ) -> subprocess.CompletedProcess[str]: """Run a container command. Args: args: Command arguments (without binary name). check: Whether to raise on non-zero exit. Returns: CompletedProcess with stdout/stderr. Raises: ContainerRuntimeNotFoundError: If runtime is not installed. """ cmd = [self.binary, *args] logger.debug("Running: %s", " ".join(cmd)) return subprocess.run( cmd, capture_output=True, text=True, check=check, ) def _inspect(self, name: str) -> dict[str, Any] | None: """Inspect a container and return its metadata. Args: name: Container name. Returns: Container metadata dict or None if not found. """ result = self._run(["inspect", name, "--format", "json"]) if result.returncode != 0: return None try: data = json.loads(result.stdout) if isinstance(data, list) and len(data) > 0: return data[0] # type: ignore[no-any-return] return None except json.JSONDecodeError: return None def _restart_stopped(self, name: str) -> SessionStatus: """Restart a stopped container. Args: name: Container name to restart. Returns: SessionStatus after restart. Raises: SessionStartError: If restart failed. """ logger.info("Restarting stopped container: %s", name) result = self._run(["start", name]) if result.returncode != 0: raise SessionStartError( name, "container", f"Failed to restart: {result.stderr}", ) return self.status(name) def _build_run_args(self, config: SessionConfig) -> list[str]: """Build command arguments for container run. Args: config: Session configuration. Returns: List of command arguments. """ args = [ "run", "-d", # Detached "--name", config.name, "-it", # Interactive with pseudo-terminal (for TUI support) ] # Working directory if config.working_dir: args.extend(["-w", config.working_dir]) # Environment variables for key, value in config.env.items(): args.extend(["-e", f"{key}={value}"]) # Volumes for volume in config.volumes: args.extend(["-v", volume]) # Log volume (auto-mount for post-mortem analysis) if config.log_volume: args.extend(["-v", config.log_volume]) # Port mappings for port in config.ports: args.extend(["-p", port]) # Image args.append(config.image) # type: ignore[arg-type] # Command (optional) if config.command: args.extend(shlex.split(config.command)) return args
[docs] def start(self, config: SessionConfig) -> SessionStatus: """Create and start a new container. Args: config: Session configuration with image and options. Returns: SessionStatus with state information. Raises: SessionExistsError: If container already exists and is running. SessionStartError: If container failed to start. ContainerRuntimeNotFoundError: If runtime is not installed. """ # Check if container already exists if self.exists(config.name): info = self._inspect(config.name) if info and info.get("State", {}).get("Running", False): raise SessionExistsError(config.name, "container") # Container exists but stopped - restart it return self._restart_stopped(config.name) if not config.image: raise SessionStartError( config.name, "container", "Container image is required", ) args = self._build_run_args(config) result = self._run(args) if result.returncode != 0: raise SessionStartError( config.name, "container", result.stderr.strip() or "Unknown error", ) logger.info("Started container: %s", config.name) return self.status(config.name)
[docs] def stop( self, name: str, *, graceful: bool = True, timeout: int = 10, ) -> bool: """Stop a running container. Args: name: Container name to stop. graceful: If True, use stop with timeout. If False, use kill. timeout: Seconds to wait for graceful shutdown. Returns: True if stopped, False if not running. Raises: SessionNotFoundError: If container doesn't exist. SessionStopError: If container couldn't be stopped. """ if not self.exists(name): raise SessionNotFoundError(name, "container") cmd = ["stop", "-t", str(timeout), name] if graceful else ["kill", name] result = self._run(cmd) if result.returncode != 0: # Check if container already stopped info = self._inspect(name) if info and not info.get("State", {}).get("Running", False): logger.info("Container already stopped: %s", name) return True raise SessionStopError( name, "container", result.stderr.strip() or "Unknown error", ) # Remove the container after stopping self._run(["rm", name]) logger.info("Stopped container: %s", name) return True
[docs] def attach(self, name: str) -> None: """Attach to a running container. This method replaces the current process with container attach. It does not return on success. Args: name: Container name to attach to. Raises: SessionNotFoundError: If container doesn't exist. SessionAttachError: If attachment failed. Note: Use Ctrl+P Ctrl+Q to detach from the container. """ if not self.exists(name): raise SessionNotFoundError(name, "container") # Check if running info = self._inspect(name) if info and not info.get("State", {}).get("Running", False): raise SessionAttachError( name, "container", "Container is not running", ) binary = self.binary logger.info("Attaching to container: %s", name) # Attach to container (interactive) # On Windows, os.execvp doesn't work well with paths containing spaces # Use subprocess.run which handles this correctly try: result = subprocess.run( [binary, "attach", name], check=False, ) # Docker returns exit code 1 when detaching with Ctrl+P Ctrl+Q # This is normal behavior, not an error if result.returncode not in (0, 1): raise SessionAttachError( name, "container", f"Attach exited with code {result.returncode}", ) except OSError as e: raise SessionAttachError(name, "container", str(e)) from e
[docs] def status(self, name: str) -> SessionStatus: """Get status of a container. Args: name: Container name to query. Returns: SessionStatus with current state. Raises: SessionNotFoundError: If container doesn't exist. """ info = self._inspect(name) if info is None: raise SessionNotFoundError(name, "container") state_info = info.get("State", {}) running = state_info.get("Running", False) exited = state_info.get("Status") == "exited" if running: state = SessionState.RUNNING elif exited: state = SessionState.EXITED else: state = SessionState.STOPPED # Get PID pid = state_info.get("Pid") if pid == 0: pid = None # Get image name image = info.get("Config", {}).get("Image") or info.get("Image", "") # Get created timestamp created = info.get("Created", "") # Get exit code if exited exit_code = None if exited: exit_code = state_info.get("ExitCode") return SessionStatus( name=name, state=state, backend=BackendType.CONTAINER, pid=pid, created_at=created, image=image, exit_code=exit_code, )
[docs] def logs(self, name: str, lines: int = 100) -> str: """Retrieve recent logs from a container. Args: name: Container name to get logs from. lines: Number of lines to retrieve. Returns: String with log output (ANSI codes preserved). Raises: SessionNotFoundError: If container doesn't exist. """ if not self.exists(name): raise SessionNotFoundError(name, "container") result = self._run(["logs", "--tail", str(lines), name]) # Combine stdout and stderr (container logs may go to either) return result.stdout + result.stderr
[docs] def exists(self, name: str) -> bool: """Check if a container with the given name exists. Args: name: Container name to check. Returns: True if container exists, False otherwise. """ return self._inspect(name) is not None
@staticmethod def _parse_container_json(raw: str) -> list[dict[str, Any]]: """Parse container JSON output into a list of dicts. Podman may return a single JSON array or one JSON object per line depending on version. Args: raw: Raw stdout from ``ps --format json``. Returns: List of parsed container dicts. """ try: parsed = json.loads(raw) except json.JSONDecodeError: parsed = None if isinstance(parsed, list): return [item for item in parsed if isinstance(item, dict)] items: list[dict[str, Any]] = [] for line in raw.split("\n"): if not line: continue try: obj = json.loads(line) if isinstance(obj, dict): items.append(obj) except json.JSONDecodeError: continue return items @staticmethod def _container_to_status(data: dict[str, Any]) -> SessionStatus | None: """Convert a single container dict to a SessionStatus. Args: data: Parsed container JSON dict. Returns: SessionStatus or None if the data is unparseable. """ try: name = data.get("Names") or data.get("Name", "") if isinstance(name, list): name = name[0] if name else "" state_str = data.get("State", "").lower() if state_str == "running": state = SessionState.RUNNING elif state_str in ("exited", "stopped"): state = SessionState.EXITED else: state = SessionState.UNKNOWN return SessionStatus( name=name, state=state, backend=BackendType.CONTAINER, image=data.get("Image", ""), created_at=data.get("CreatedAt", ""), ) except (KeyError, TypeError, AttributeError): return None
[docs] def list_sessions(self) -> list[SessionStatus]: """List all containers. Returns: List of SessionStatus for all containers. """ result = self._run(["ps", "-a", "--format", "json"]) if result.returncode != 0: return [] raw = result.stdout.strip() if not raw: return [] items = self._parse_container_json(raw) sessions: list[SessionStatus] = [] for data in items: status = self._container_to_status(data) if status is not None: sessions.append(status) return sessions
[docs] def exec( self, name: str, command: str, *, interactive: bool = False, ) -> subprocess.CompletedProcess[str]: """Execute a command in a running container. Args: name: Container name. command: Command to execute. interactive: If True, use -it flags. Returns: CompletedProcess with stdout/stderr. Raises: SessionNotFoundError: If container doesn't exist. """ validate_command(command) if not self.exists(name): raise SessionNotFoundError(name, "container") args = ["exec"] if interactive: args.append("-it") args.append(name) args.extend(shlex.split(command)) return self._run(args)
__all__ = [ "ContainerRunner", ]