Source code for kstlib.db.cipher

"""SQLCipher integration with SOPS secret resolution.

Provides secure key management for encrypted SQLite databases.
Keys can be loaded from:
- Direct passphrase
- Environment variable
- SOPS-encrypted file via kstlib.secrets
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from kstlib.db.exceptions import EncryptionError

if TYPE_CHECKING:
    import sqlite3
    from pathlib import Path

log = logging.getLogger(__name__)


[docs] def resolve_cipher_key( *, passphrase: str | None = None, env_var: str | None = None, sops_path: str | Path | None = None, sops_key: str = "db_key", ) -> str: """Resolve encryption key from various sources. Priority: passphrase > env_var > sops_path Args: passphrase: Direct passphrase string. env_var: Environment variable name containing the key. sops_path: Path to SOPS-encrypted file. sops_key: Key name within SOPS file (default: "db_key"). Returns: Resolved encryption key. Raises: EncryptionError: If no key source provided or resolution fails. Examples: >>> key = resolve_cipher_key(passphrase="my-secret-key") >>> len(key) > 0 True """ # Direct passphrase (highest priority) if passphrase: return passphrase # Environment variable if env_var: import os key = os.environ.get(env_var) if key: log.debug("Resolved cipher key from env var: %s", env_var) return key raise EncryptionError(f"Environment variable '{env_var}' not set or empty") # SOPS file if sops_path: try: from kstlib.secrets.models import SecretRequest from kstlib.secrets.providers.sops import SOPSProvider provider = SOPSProvider(path=sops_path) request = SecretRequest(name=sops_key, required=True) record = provider.resolve(request) if record is None or record.value is None: raise EncryptionError(f"Key '{sops_key}' not found in SOPS file") log.debug("Resolved cipher key from SOPS: %s", sops_path) return str(record.value) except ImportError as e: raise EncryptionError("kstlib.secrets required for SOPS support") from e except Exception as e: raise EncryptionError(f"Failed to resolve SOPS key: {e}") from e raise EncryptionError("No encryption key source provided. Specify passphrase, env_var, or sops_path.")
[docs] def apply_cipher_key(conn: sqlite3.Connection, key: str) -> None: """Apply SQLCipher key to a connection. Args: conn: SQLite connection object. key: Encryption key to apply. Raises: EncryptionError: If key application fails. """ if "\x00" in key: raise EncryptionError("Null bytes are not allowed in cipher key") try: # SQLCipher PRAGMA to set key # Escape single quotes to prevent SQL injection escaped_key = key.replace("'", "''") cursor = conn.execute(f"PRAGMA key = '{escaped_key}'") cursor.close() # Verify key works by reading schema cursor = conn.execute("SELECT count(*) FROM sqlite_master") cursor.fetchone() cursor.close() log.debug("SQLCipher key applied successfully") except Exception as e: raise EncryptionError(f"Failed to apply cipher key: {e}") from e
__all__ = ["apply_cipher_key", "resolve_cipher_key"]