"""Configuration models for kstlib.transform module.
Frozen dataclasses for transform chain configuration, parsed from
the ``transforms:`` section of ``kstlib.conf.yml``.
YAML schema::
transforms:
security:
allowed_callable_modules: # Whitelist (default: empty)
- myproject.viya
chains:
my_chain:
forward: # Required (or preset/patch must be set)
- base64
- zlib:
skip_bytes: 3
- json:
extract: "path.to.field"
- xml
backward: # Optional (auto-reversed if absent)
- xml
- json:
wrap: "path.to.field"
- zlib:
prepend_bytes: "4d1504"
- base64
preset: other_chain # Mutually exclusive with forward
patch: # Mutually exclusive with composed_patch
scope: blob # blob | outer | all (default: blob)
replace: # Mutually exclusive with callable
"old": "new"
callable: mod.path:fn # Mutually exclusive with replace
args:
key: "{{var}}"
# Composed patches: reference other chains and apply their patches
# with optional filters. Mutually exclusive with patch:.
composed_chain:
preset: other_chain
global_patches: # Preset names, applied to all objects
- remap_host
targeted_patches: # Conditional patches (filter + patches)
- filter:
content_type: report
name: "R220_*"
patches:
- remap_caslib_r220
- filter:
name: "*" # Fallback for other objects
patches:
- remap_caslib_global
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Literal
from kstlib.transform.exceptions import TransformConfigError
from kstlib.transform.validators import (
MAX_ARG_KEY_LENGTH,
MAX_CALLABLE_ARGS,
MAX_CHAIN_PRIMITIVES,
MAX_ENCODING_LENGTH,
MAX_GLOBAL_PATCHES,
MAX_MAPPING_ENTRIES,
MAX_MAPPING_STRING_LENGTH,
MAX_NAMED_CHAINS,
MAX_PATCHES_PER_TARGETED,
MAX_PREFIX_LENGTH,
MAX_PRIMITIVE_OPTIONS,
MAX_SKIP_BYTES,
MAX_TARGETED_PATCHES,
ZLIB_LEVEL_MAX,
ZLIB_LEVEL_MIN,
validate_callable_module,
validate_callable_target,
validate_chain_name,
validate_dot_path,
validate_filter_type,
validate_glob_pattern,
validate_hex_string,
validate_primitive_name,
)
log = logging.getLogger(__name__)
# ============================================================================
# Dataclasses
# ============================================================================
[docs]
@dataclass(frozen=True, slots=True)
class PrimitiveConfig:
"""Configuration for a single transform primitive.
Attributes:
name: Primitive name (base64, zlib, json, xml, bytes).
options: Primitive-specific options dict.
Examples:
>>> PrimitiveConfig(name="base64")
PrimitiveConfig(name='base64', options={})
>>> PrimitiveConfig(name="zlib", options={"skip_bytes": 3})
PrimitiveConfig(name='zlib', options={'skip_bytes': 3})
"""
name: str
options: dict[str, Any] = field(default_factory=dict)
[docs]
def __post_init__(self) -> None:
"""Validate primitive configuration.
Raises:
TransformConfigError: If configuration is invalid.
"""
validate_primitive_name(self.name)
if len(self.options) > MAX_PRIMITIVE_OPTIONS:
raise TransformConfigError(
f"Primitive '{self.name}' has too many options: {len(self.options)} > {MAX_PRIMITIVE_OPTIONS}"
)
if self.name == "zlib":
self._validate_zlib_options()
elif self.name == "json":
self._validate_json_options()
elif self.name in ("base64", "bytes"):
self._validate_encoding_option()
def _validate_bool_option(self, key: str) -> None:
"""Validate that an option, if present, is a strict bool.
Helper used by zlib/json/base64 validators for bool flags.
"""
value = self.options.get(key)
if value is not None and not isinstance(value, bool):
raise TransformConfigError(f"{self.name} {key} must be bool, got: {type(value).__name__}")
def _validate_string_option_with_max_length(
self,
key: str,
max_length: int,
) -> None:
"""Validate that an option, if present, is a string within length.
Helper used by base64 strip_prefix and prefix validators.
"""
value = self.options.get(key)
if value is None:
return
if not isinstance(value, str):
raise TransformConfigError(f"{self.name} {key} must be string, got: {type(value).__name__}")
if len(value) > max_length:
raise TransformConfigError(f"{self.name} {key} too long: {len(value)} > {max_length}")
def _validate_zlib_options(self) -> None:
"""Validate zlib-specific options."""
skip = self.options.get("skip_bytes")
if skip is not None and (not isinstance(skip, int) or skip < 0 or skip > MAX_SKIP_BYTES):
raise TransformConfigError(f"zlib skip_bytes must be int 0-{MAX_SKIP_BYTES}, got: {skip!r}")
prepend = self.options.get("prepend_bytes")
if prepend is not None:
if not isinstance(prepend, str):
raise TransformConfigError(f"zlib prepend_bytes must be hex string, got: {type(prepend).__name__}")
validate_hex_string(prepend, label="zlib prepend_bytes")
level = self.options.get("level")
if level is not None:
# Reject bools explicitly: bool is a subclass of int in Python.
if not isinstance(level, int) or isinstance(level, bool):
raise TransformConfigError(f"zlib level must be int, got: {type(level).__name__}")
if level < ZLIB_LEVEL_MIN or level > ZLIB_LEVEL_MAX:
raise TransformConfigError(
f"zlib level must be in range [{ZLIB_LEVEL_MIN}, {ZLIB_LEVEL_MAX}], got: {level}"
)
def _validate_json_options(self) -> None:
"""Validate json-specific options."""
extract = self.options.get("extract")
if extract is not None:
if not isinstance(extract, str):
raise TransformConfigError(f"json extract must be string, got: {type(extract).__name__}")
validate_dot_path(extract, label="json extract")
wrap = self.options.get("wrap")
if wrap is not None:
if not isinstance(wrap, str):
raise TransformConfigError(f"json wrap must be string, got: {type(wrap).__name__}")
validate_dot_path(wrap, label="json wrap")
self._validate_bool_option("minify")
self._validate_bool_option("ensure_ascii")
def _validate_encoding_option(self) -> None:
"""Validate encoding option for base64/bytes primitives."""
encoding = self.options.get("encoding")
if encoding is not None:
if not isinstance(encoding, str):
raise TransformConfigError(f"{self.name} encoding must be string, got: {type(encoding).__name__}")
if len(encoding) > MAX_ENCODING_LENGTH:
raise TransformConfigError(f"{self.name} encoding too long: {len(encoding)} > {MAX_ENCODING_LENGTH}")
# base64-specific options (do not apply to bytes primitive)
if self.name == "base64":
self._validate_bool_option("strict")
self._validate_string_option_with_max_length("strip_prefix", MAX_PREFIX_LENGTH)
self._validate_string_option_with_max_length("prefix", MAX_PREFIX_LENGTH)
#: Allowed values for PatchConfig.scope.
PATCH_SCOPE_VALUES: frozenset[str] = frozenset({"blob", "outer", "all"})
[docs]
@dataclass(frozen=True, slots=True)
class PatchConfig:
"""Configuration for the patch stage between forward and backward.
A patch operates either as a string-replacement mapping (``replace``)
or as a Python callable (``callable``). The two modes are mutually
exclusive.
The ``scope`` field controls WHERE replacements apply:
- ``"blob"`` (default): patch the data decoded by the forward chain
(e.g. the BIRD XML extracted from a SAS Viya report blob).
Preserves the historical behavior.
- ``"outer"``: patch the wrapper dict passed in
``metadata["outer"]`` to ``chain.transform()``. The wrapper is
mutated in place; the blob itself is not modified beyond the
normal forward+backward round-trip. Useful for fields like
``connectors[].uri`` that live outside the encoded blob.
- ``"all"``: do both, blob first then outer.
Attributes:
replace: String replacement mapping ``{old: new}``. Mutually
exclusive with ``callable``.
scope: Where to apply the replace mapping. One of
``"blob"`` (default), ``"outer"``, ``"all"``.
callable: Import target ``module.path:function`` for complex
patch logic. Mutually exclusive with ``replace``.
args: Keyword arguments passed to the callable as ``**kwargs``.
mapping: **Deprecated alias for ``replace``**. Setting it
triggers a ``DeprecationWarning`` and is silently copied
to ``replace``. Will be removed in a future version. Do
not set both ``mapping`` and ``replace`` (raises).
Examples:
>>> PatchConfig(replace={"old": "new"})
PatchConfig(replace={'old': 'new'}, scope='blob', callable=None, args={}, mapping=None)
>>> PatchConfig(replace={"a": "b"}, scope="all")
PatchConfig(replace={'a': 'b'}, scope='all', callable=None, args={}, mapping=None)
"""
replace: dict[str, str] | None = None
scope: Literal["blob", "outer", "all"] = "blob"
callable: str | None = None
args: dict[str, Any] = field(default_factory=dict)
mapping: dict[str, str] | None = None # DEPRECATED: use 'replace' instead
[docs]
def __post_init__(self) -> None:
"""Validate patch configuration.
Raises:
TransformConfigError: If configuration is invalid.
"""
# Handle the deprecated 'mapping' alias: copy to 'replace' if
# only 'mapping' is set, error if both are set (ambiguous).
if self.mapping is not None:
if self.replace is not None:
raise TransformConfigError(
"PatchConfig: 'mapping' and 'replace' are both set. "
"'mapping' is the deprecated alias of 'replace' - use only one."
)
import warnings
warnings.warn(
"PatchConfig 'mapping' field is deprecated - use 'replace' instead. "
"The behavior is identical; this is a pure rename.",
DeprecationWarning,
stacklevel=3,
)
# Frozen dataclass: use object.__setattr__ to copy the alias.
object.__setattr__(self, "replace", self.mapping)
if self.replace is not None and self.callable is not None:
raise TransformConfigError("PatchConfig: 'replace' and 'callable' are mutually exclusive")
self._validate_scope()
self._validate_replace()
self._validate_callable_and_args()
def _validate_scope(self) -> None:
"""Validate scope value (must be one of blob/outer/all)."""
if self.scope not in PATCH_SCOPE_VALUES:
raise TransformConfigError(
f"PatchConfig scope must be one of {sorted(PATCH_SCOPE_VALUES)}, got: {self.scope!r}"
)
def _validate_replace(self) -> None:
"""Validate replace entries (length and key non-emptiness)."""
if self.replace is None:
return
if len(self.replace) > MAX_MAPPING_ENTRIES:
raise TransformConfigError(
f"PatchConfig replace has too many entries: {len(self.replace)} > {MAX_MAPPING_ENTRIES}"
)
for key, value in self.replace.items():
if not key:
raise TransformConfigError("PatchConfig replace key must not be empty")
if len(key) > MAX_MAPPING_STRING_LENGTH:
raise TransformConfigError(
f"PatchConfig replace key too long: {len(key)} > {MAX_MAPPING_STRING_LENGTH}"
)
if len(value) > MAX_MAPPING_STRING_LENGTH:
raise TransformConfigError(
f"PatchConfig replace value too long: {len(value)} > {MAX_MAPPING_STRING_LENGTH}"
)
def _validate_callable_and_args(self) -> None:
"""Validate callable target and args."""
if self.callable is not None:
validate_callable_target(self.callable)
if len(self.args) > MAX_CALLABLE_ARGS:
raise TransformConfigError(f"PatchConfig args has too many entries: {len(self.args)} > {MAX_CALLABLE_ARGS}")
for key in self.args:
if len(key) > MAX_ARG_KEY_LENGTH:
raise TransformConfigError(f"PatchConfig args key too long: {len(key)} > {MAX_ARG_KEY_LENGTH}")
[docs]
@dataclass(frozen=True, slots=True)
class FilterConfig:
"""Filter used by TargetedPatchConfig to select matching objects.
All fields are ANDed: an object matches only if every field matches.
A value of ``"*"`` means "any value".
Attributes:
content_type: Object content type ("report", "folder", "file", or "*").
name: fnmatch glob pattern on the object name (e.g. ``"R220_*"``).
Examples:
>>> FilterConfig(content_type="report", name="R220_*")
FilterConfig(content_type='report', name='R220_*')
>>> FilterConfig()
FilterConfig(content_type='*', name='*')
"""
content_type: str = "*"
name: str = "*"
[docs]
def __post_init__(self) -> None:
"""Validate filter configuration.
Raises:
TransformConfigError: If configuration is invalid.
"""
validate_filter_type(self.content_type)
validate_glob_pattern(self.name, label="filter name")
[docs]
@dataclass(frozen=True, slots=True)
class TargetedPatchConfig:
"""A filter plus a list of patch chain names to apply when it matches.
Attributes:
filter: Filter describing which objects this entry applies to.
patches: Ordered tuple of chain names whose ``.patch`` is applied.
Examples:
>>> TargetedPatchConfig(
... filter=FilterConfig(content_type="report", name="R220_*"),
... patches=("remap_caslib_r220",),
... )
TargetedPatchConfig(filter=FilterConfig(...), patches=('remap_caslib_r220',))
"""
filter: FilterConfig
patches: tuple[str, ...]
[docs]
def __post_init__(self) -> None:
"""Validate targeted patch configuration.
Raises:
TransformConfigError: If configuration is invalid.
"""
if not self.patches:
raise TransformConfigError("TargetedPatchConfig.patches must not be empty")
if len(self.patches) > MAX_PATCHES_PER_TARGETED:
raise TransformConfigError(
f"TargetedPatchConfig has too many patches: {len(self.patches)} > {MAX_PATCHES_PER_TARGETED}"
)
for patch_name in self.patches:
validate_chain_name(patch_name)
[docs]
@dataclass(frozen=True, slots=True)
class ComposedPatchConfig:
"""Composition of global and targeted patch chain references.
Execution order (per object):
1. ``global_patches``: applied to every object, in declaration order.
2. ``targeted_patches``: for each entry in declaration order, if the
filter matches the object metadata, apply all its patches in order.
**Last applied wins on conflict**, following kstlib cascade philosophy
(``kwargs > user config > preset > defaults``). Ordering is by
declaration, not by filter specificity. Order your targeted_patches
from most general to most specific.
Attributes:
global_patches: Chain names applied to every object.
targeted_patches: Conditional entries applied when their filter matches.
Examples:
>>> ComposedPatchConfig(
... global_patches=("remap_host",),
... targeted_patches=(
... TargetedPatchConfig(
... filter=FilterConfig(name="R220_*"),
... patches=("remap_caslib_r220",),
... ),
... ),
... )
ComposedPatchConfig(global_patches=('remap_host',), targeted_patches=(...))
"""
global_patches: tuple[str, ...] = ()
targeted_patches: tuple[TargetedPatchConfig, ...] = ()
[docs]
def __post_init__(self) -> None:
"""Validate composed patch configuration.
Raises:
TransformConfigError: If configuration is invalid.
"""
if not self.global_patches and not self.targeted_patches:
raise TransformConfigError(
"ComposedPatchConfig must declare at least one global_patches or targeted_patches entry"
)
if len(self.global_patches) > MAX_GLOBAL_PATCHES:
raise TransformConfigError(
f"ComposedPatchConfig has too many global_patches: {len(self.global_patches)} > {MAX_GLOBAL_PATCHES}"
)
if len(self.targeted_patches) > MAX_TARGETED_PATCHES:
raise TransformConfigError(
f"ComposedPatchConfig has too many targeted_patches: "
f"{len(self.targeted_patches)} > {MAX_TARGETED_PATCHES}"
)
for patch_name in self.global_patches:
validate_chain_name(patch_name)
# ============================================================================
# Config loading
# ============================================================================
def _parse_primitive(raw: str | dict[str, Any]) -> PrimitiveConfig:
"""Parse a primitive from YAML (string or dict with options).
Args:
raw: Either a string ("base64") or dict ({"zlib": {"skip_bytes": 3}}).
Returns:
Parsed PrimitiveConfig.
Raises:
TransformConfigError: If format is invalid.
"""
if isinstance(raw, str):
return PrimitiveConfig(name=raw)
if isinstance(raw, dict):
if len(raw) != 1:
raise TransformConfigError(f"Primitive dict must have exactly 1 key, got: {list(raw.keys())}")
name = next(iter(raw))
options = raw[name]
if options is None:
options = {}
if not isinstance(options, dict):
raise TransformConfigError(f"Primitive '{name}' options must be dict, got: {type(options).__name__}")
return PrimitiveConfig(name=name, options=options)
raise TransformConfigError(f"Primitive must be str or dict, got: {type(raw).__name__}")
def _parse_primitives(raw_list: list[Any]) -> tuple[PrimitiveConfig, ...]:
"""Parse a list of primitives from YAML.
Args:
raw_list: List of raw primitive definitions.
Returns:
Tuple of PrimitiveConfig.
"""
return tuple(_parse_primitive(item) for item in raw_list)
def _parse_patch(raw: dict[str, Any]) -> PatchConfig:
"""Parse a PatchConfig from YAML dict.
Accepts both the new ``replace:`` field and the deprecated ``mapping:``
alias. If ``mapping:`` is set, the PatchConfig dataclass will emit a
DeprecationWarning at construction time.
The legacy ``depth:`` key is no longer supported and is rejected
with a clear migration message: it was never implemented and
``scope:`` covers the original intent.
Args:
raw: Raw patch configuration dict.
Returns:
Parsed PatchConfig.
Raises:
TransformConfigError: If ``depth:`` is present in the raw dict.
"""
if "depth" in raw:
raise TransformConfigError(
"PatchConfig 'depth:' is no longer supported. The field was "
"never implemented; use 'scope: blob | outer | all' instead "
"to control where replacements apply."
)
return PatchConfig(
replace=raw.get("replace"),
scope=raw.get("scope", "blob"),
callable=raw.get("callable"),
args=raw.get("args", {}),
mapping=raw.get("mapping"),
)
def _parse_filter(raw: dict[str, Any]) -> FilterConfig:
"""Parse a FilterConfig from YAML dict.
Args:
raw: Raw filter configuration dict.
Returns:
Parsed FilterConfig.
Raises:
TransformConfigError: If raw is not a dict.
"""
if not isinstance(raw, dict):
raise TransformConfigError(f"Filter must be a dict, got: {type(raw).__name__}")
return FilterConfig(
content_type=raw.get("content_type", "*"),
name=raw.get("name", "*"),
)
def _parse_targeted_patch(raw: dict[str, Any]) -> TargetedPatchConfig:
"""Parse a TargetedPatchConfig from YAML dict.
Args:
raw: Raw targeted patch configuration dict.
Returns:
Parsed TargetedPatchConfig.
Raises:
TransformConfigError: If format is invalid.
"""
if not isinstance(raw, dict):
raise TransformConfigError(f"Targeted patch must be a dict, got: {type(raw).__name__}")
filter_raw = raw.get("filter", {})
filter_config = _parse_filter(filter_raw) if filter_raw else FilterConfig()
patches_raw = raw.get("patches", [])
if not isinstance(patches_raw, list):
raise TransformConfigError(f"Targeted patch 'patches' must be a list, got: {type(patches_raw).__name__}")
for item in patches_raw:
if not isinstance(item, str):
raise TransformConfigError(f"Targeted patch 'patches' entries must be strings, got: {type(item).__name__}")
return TargetedPatchConfig(
filter=filter_config,
patches=tuple(patches_raw),
)
def _parse_composed_patch(
global_raw: list[Any] | None,
targeted_raw: list[Any] | None,
) -> ComposedPatchConfig:
"""Parse a ComposedPatchConfig from YAML lists.
Args:
global_raw: Raw global_patches list from YAML.
targeted_raw: Raw targeted_patches list from YAML.
Returns:
Parsed ComposedPatchConfig.
Raises:
TransformConfigError: If format is invalid.
"""
if global_raw is not None and not isinstance(global_raw, list):
raise TransformConfigError(f"'global_patches' must be a list, got: {type(global_raw).__name__}")
if targeted_raw is not None and not isinstance(targeted_raw, list):
raise TransformConfigError(f"'targeted_patches' must be a list, got: {type(targeted_raw).__name__}")
global_patches: tuple[str, ...] = ()
if global_raw:
for item in global_raw:
if not isinstance(item, str):
raise TransformConfigError(f"'global_patches' entries must be strings, got: {type(item).__name__}")
global_patches = tuple(global_raw)
targeted_patches: tuple[TargetedPatchConfig, ...] = ()
if targeted_raw:
targeted_patches = tuple(_parse_targeted_patch(item) for item in targeted_raw)
return ComposedPatchConfig(
global_patches=global_patches,
targeted_patches=targeted_patches,
)
def _parse_chain(name: str, raw: dict[str, Any]) -> TransformChainConfig:
"""Parse a TransformChainConfig from YAML dict.
Args:
name: Chain name.
raw: Raw chain configuration dict.
Returns:
Parsed TransformChainConfig.
"""
forward_raw = raw.get("forward")
backward_raw = raw.get("backward")
patch_raw = raw.get("patch")
preset = raw.get("preset")
global_patches_raw = raw.get("global_patches")
targeted_patches_raw = raw.get("targeted_patches")
forward = _parse_primitives(forward_raw) if forward_raw else ()
backward = _parse_primitives(backward_raw) if backward_raw else None
patch = _parse_patch(patch_raw) if patch_raw else None
composed_patch: ComposedPatchConfig | None = None
if global_patches_raw is not None or targeted_patches_raw is not None:
composed_patch = _parse_composed_patch(global_patches_raw, targeted_patches_raw)
return TransformChainConfig(
name=name,
forward=forward,
backward=backward,
patch=patch,
composed_patch=composed_patch,
preset=preset,
)
__all__ = [
"PATCH_SCOPE_VALUES",
"ComposedPatchConfig",
"FilterConfig",
"PatchConfig",
"PrimitiveConfig",
"TargetedPatchConfig",
"TransformChainConfig",
"TransformConfig",
"load_transform_config",
]