Source code for kstlib.monitoring.config

"""Configuration loader for monitoring dashboards.

This module provides tools to load monitoring configurations from YAML files,
with auto-discovery of ``*.monitor.yml`` files in specified directories.

Examples:
    Load a single monitoring config:

    >>> from kstlib.monitoring.config import load_monitoring_config
    >>> config = load_monitoring_config("dashboard.monitor.yml")  # doctest: +SKIP
    >>> service = config.to_service()  # doctest: +SKIP

    Discover all monitoring configs in a directory:

    >>> from kstlib.monitoring.config import discover_monitoring_configs
    >>> configs = discover_monitoring_configs("./configs")  # doctest: +SKIP
    >>> for name, config in configs.items():  # doctest: +SKIP
    ...     print(f"Found: {name}")  # doctest: +SKIP

"""

from __future__ import annotations

import importlib
import pathlib
import re
from dataclasses import dataclass, field
from typing import Any

import yaml

from kstlib.monitoring.exceptions import MonitoringConfigError
from kstlib.monitoring.service import Collector, MonitoringService

# File pattern for monitoring configs
MONITORING_CONFIG_PATTERN = "*.monitor.yml"
MONITORING_CONFIG_SUFFIX = ".monitor.yml"

# Deep defense: Security limits
MAX_CONFIG_FILE_SIZE = 1024 * 1024  # 1 MB max config file size
MAX_COLLECTORS = 100  # Maximum number of collectors per config
MAX_NAME_LENGTH = 128  # Maximum length for names (config name, collector names)
MAX_TEMPLATE_SIZE = 512 * 1024  # 512 KB max template size

# Module import restrictions for callable collectors
BLOCKED_MODULE_PREFIXES = (
    "os.",
    "sys.",
    "subprocess",
    "shutil",
    "socket",
    "pickle",
    "marshal",
    "__",
)
ALLOWED_MODULE_PATTERN = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*$")


[docs] class MonitoringConfigFileNotFoundError(MonitoringConfigError, FileNotFoundError): """Monitoring configuration file not found."""
[docs] class MonitoringConfigFormatError(MonitoringConfigError, ValueError): """Invalid monitoring configuration format."""
[docs] class MonitoringConfigCollectorError(MonitoringConfigError): """Error loading a collector from configuration."""
[docs] @dataclass class CollectorConfig: """Configuration for a single collector. Attributes: name: Name of the collector (used as template variable). collector_type: Type of collector ("static", "callable", "env"). value: Static value (for type="static"). module: Module path (for type="callable"). function: Function name (for type="callable"). env_var: Environment variable name (for type="env"). default: Default value if env var not set (for type="env"). """ name: str collector_type: str = "static" value: Any = None module: str | None = None function: str | None = None env_var: str | None = None default: Any = None
[docs] def to_collector(self) -> Collector: """Convert config to a collector callable. Returns: Callable that can be used as a MonitoringService collector. Raises: MonitoringConfigCollectorError: If collector cannot be created. """ if self.collector_type == "static": return self._create_static_collector() if self.collector_type == "callable": return self._create_callable_collector() if self.collector_type == "env": return self._create_env_collector() raise MonitoringConfigCollectorError(f"Unknown collector type: {self.collector_type}")
def _create_static_collector(self) -> Collector: """Create a static value collector.""" value = self.value def collector() -> Any: return value return collector def _create_callable_collector(self) -> Collector: """Create a collector from a module.function reference.""" if not self.module or not self.function: raise MonitoringConfigCollectorError( f"Collector '{self.name}' type='callable' requires 'module' and 'function'" ) # Deep defense: Validate module name format if not ALLOWED_MODULE_PATTERN.match(self.module): raise MonitoringConfigCollectorError(f"Invalid module name format: '{self.module}'") # Deep defense: Block dangerous modules for prefix in BLOCKED_MODULE_PREFIXES: if self.module.startswith(prefix) or self.module == prefix.rstrip("."): raise MonitoringConfigCollectorError(f"Module '{self.module}' is blocked for security reasons") try: mod = importlib.import_module(self.module) func: Collector = getattr(mod, self.function) if not callable(func): raise MonitoringConfigCollectorError(f"'{self.module}.{self.function}' is not callable") return func except ImportError as e: raise MonitoringConfigCollectorError(f"Cannot import module '{self.module}': {e}") from e except AttributeError as e: raise MonitoringConfigCollectorError(f"Function '{self.function}' not found in '{self.module}': {e}") from e def _create_env_collector(self) -> Collector: """Create a collector that reads from environment variable.""" import os env_var = self.env_var default = self.default name = self.name if not env_var: raise MonitoringConfigCollectorError(f"Collector '{name}' type='env' requires 'env_var'") def collector() -> Any: return os.environ.get(env_var, default) return collector
[docs] @dataclass class MonitoringConfig: """Parsed monitoring configuration. Attributes: name: Dashboard name (defaults to filename without extension). template: Jinja2 template string for rendering. collectors: List of collector configurations. inline_css: Whether to use inline CSS (default True). fail_fast: Whether to fail on first collector error (default True). source_path: Path to the source config file (if loaded from file). metadata: Additional metadata from the config file. """ name: str template: str collectors: list[CollectorConfig] = field(default_factory=list) inline_css: bool = True fail_fast: bool = True source_path: pathlib.Path | None = None metadata: dict[str, Any] = field(default_factory=dict)
[docs] def to_service(self) -> MonitoringService: """Create a MonitoringService from this configuration. Returns: Configured MonitoringService instance. Raises: MonitoringConfigCollectorError: If any collector cannot be created. """ collectors: dict[str, Collector] = {} for collector_config in self.collectors: collectors[collector_config.name] = collector_config.to_collector() return MonitoringService( template=self.template, collectors=collectors, inline_css=self.inline_css, fail_fast=self.fail_fast, )
@classmethod def _parse_collectors(cls, collectors_data: Any) -> list[CollectorConfig]: """Parse collectors from config data with validation.""" if not isinstance(collectors_data, dict): raise MonitoringConfigFormatError("'collectors' must be a dictionary mapping names to collector configs") # Deep defense: Limit number of collectors if len(collectors_data) > MAX_COLLECTORS: raise MonitoringConfigFormatError(f"Too many collectors ({len(collectors_data)} > {MAX_COLLECTORS})") collectors: list[CollectorConfig] = [] for name, collector_data in collectors_data.items(): # Deep defense: Validate collector name length if len(str(name)) > MAX_NAME_LENGTH: raise MonitoringConfigFormatError( f"Collector name '{name[:20]}...' exceeds maximum length ({MAX_NAME_LENGTH})" ) if not isinstance(collector_data, dict): # Simple static value collectors.append(CollectorConfig(name=name, collector_type="static", value=collector_data)) else: collectors.append( CollectorConfig( name=name, collector_type=collector_data.get("type", "static"), value=collector_data.get("value"), module=collector_data.get("module"), function=collector_data.get("function"), env_var=collector_data.get("env_var"), default=collector_data.get("default"), ) ) return collectors
[docs] @classmethod def from_dict( cls, data: dict[str, Any], *, source_path: pathlib.Path | None = None, ) -> MonitoringConfig: """Create a MonitoringConfig from a dictionary. Args: data: Configuration dictionary. source_path: Optional path to source file. Returns: Parsed MonitoringConfig. Raises: MonitoringConfigFormatError: If required fields are missing. """ # Validate required fields if "template" not in data: raise MonitoringConfigFormatError("Monitoring config must have a 'template' field") # Deep defense: Validate template size template = data["template"] if not isinstance(template, str): raise MonitoringConfigFormatError("'template' must be a string") if len(template) > MAX_TEMPLATE_SIZE: raise MonitoringConfigFormatError(f"Template exceeds maximum size ({MAX_TEMPLATE_SIZE} bytes)") # Deep defense: Validate config name length config_name = data.get("name") if config_name and len(str(config_name)) > MAX_NAME_LENGTH: raise MonitoringConfigFormatError(f"Config name exceeds maximum length ({MAX_NAME_LENGTH})") # Parse collectors collectors = cls._parse_collectors(data.get("collectors", {})) # Determine name name = data.get("name") if not name and source_path: # Use filename without .monitor.yml suffix name = source_path.name name = name.removesuffix(MONITORING_CONFIG_SUFFIX) # Extract known fields for metadata known_fields = { "name", "template", "collectors", "inline_css", "fail_fast", } metadata = {k: v for k, v in data.items() if k not in known_fields} return cls( name=name or "unnamed", template=data["template"], collectors=collectors, inline_css=data.get("inline_css", True), fail_fast=data.get("fail_fast", True), source_path=source_path, metadata=metadata, )
[docs] def load_monitoring_config( path: str | pathlib.Path, *, encoding: str = "utf-8", ) -> MonitoringConfig: """Load a monitoring configuration from a YAML file. Args: path: Path to the monitoring config file. encoding: File encoding (default UTF-8). Returns: Parsed MonitoringConfig. Raises: MonitoringConfigFileNotFoundError: If file does not exist. MonitoringConfigFormatError: If file format is invalid. Examples: >>> config = load_monitoring_config("dashboard.monitor.yml") # doctest: +SKIP >>> service = config.to_service() # doctest: +SKIP >>> result = service.run_sync() # doctest: +SKIP """ path = pathlib.Path(path) if not path.is_file(): raise MonitoringConfigFileNotFoundError(f"Config file not found: {path}") # Deep defense: Check file size before reading file_size = path.stat().st_size if file_size > MAX_CONFIG_FILE_SIZE: raise MonitoringConfigFormatError(f"Config file too large ({file_size} > {MAX_CONFIG_FILE_SIZE} bytes)") try: with path.open("r", encoding=encoding) as f: data = yaml.safe_load(f) except yaml.YAMLError as e: raise MonitoringConfigFormatError(f"Invalid YAML in {path}: {e}") from e if not isinstance(data, dict): raise MonitoringConfigFormatError(f"Monitoring config must be a YAML dictionary, got {type(data).__name__}") return MonitoringConfig.from_dict(data, source_path=path.resolve())
[docs] def discover_monitoring_configs( directory: str | pathlib.Path, *, recursive: bool = False, encoding: str = "utf-8", ) -> dict[str, MonitoringConfig]: """Discover and load all monitoring configs in a directory. Searches for files matching ``*.monitor.yml`` pattern. Args: directory: Directory to search. recursive: If True, search subdirectories recursively. encoding: File encoding (default UTF-8). Returns: Dictionary mapping config names to MonitoringConfig objects. Raises: FileNotFoundError: If directory does not exist. MonitoringConfigFormatError: If any config file is invalid. Examples: >>> configs = discover_monitoring_configs("./monitoring") # doctest: +SKIP >>> for name, config in configs.items(): # doctest: +SKIP ... print(f"Loaded: {name}") # doctest: +SKIP """ directory = pathlib.Path(directory) if not directory.is_dir(): raise FileNotFoundError(f"Directory not found: {directory}") pattern = f"**/{MONITORING_CONFIG_PATTERN}" if recursive else MONITORING_CONFIG_PATTERN configs: dict[str, MonitoringConfig] = {} for config_path in directory.glob(pattern): config = load_monitoring_config(config_path, encoding=encoding) configs[config.name] = config return configs
[docs] def create_services_from_directory( directory: str | pathlib.Path, *, recursive: bool = False, encoding: str = "utf-8", ) -> dict[str, MonitoringService]: """Discover configs and create MonitoringService instances. Convenience function that combines discover_monitoring_configs with to_service() for each config. Args: directory: Directory to search for ``*.monitor.yml`` files. recursive: If True, search subdirectories recursively. encoding: File encoding (default UTF-8). Returns: Dictionary mapping config names to MonitoringService instances. Examples: >>> services = create_services_from_directory("./monitoring") # doctest: +SKIP >>> for name, service in services.items(): # doctest: +SKIP ... result = service.run_sync() # doctest: +SKIP ... print(f"{name}: {result.success}") # doctest: +SKIP """ configs = discover_monitoring_configs(directory, recursive=recursive, encoding=encoding) return {name: config.to_service() for name, config in configs.items()}
__all__ = [ "CollectorConfig", "MonitoringConfig", "MonitoringConfigCollectorError", "MonitoringConfigFileNotFoundError", "MonitoringConfigFormatError", "create_services_from_directory", "discover_monitoring_configs", "load_monitoring_config", ]