Database¶
Async SQLite database module with connection pooling, SQLCipher encryption, and SOPS secret integration.
TL;DR¶
from kstlib.db import AsyncDatabase
# Simple in-memory database
async with AsyncDatabase(":memory:") as db:
await db.execute("CREATE TABLE users (id INTEGER, name TEXT)")
await db.execute("INSERT INTO users VALUES (?, ?)", (1, "alice"))
row = await db.fetch_one("SELECT * FROM users WHERE id=?", (1,))
print(row) # (1, 'alice')
# Encrypted database with SOPS key management
# secrets.yml contains: { "database_key": "my-sqlcipher-passphrase" }
# The passphrase is used by SQLCipher to encrypt the entire .db file
db = AsyncDatabase(
"app.db",
cipher_sops="secrets.yml", # SOPS-encrypted file with the passphrase
cipher_sops_key="database_key" # Key name inside the SOPS file
)
async with db:
# All data in app.db is encrypted at rest via SQLCipher
await db.execute("SELECT * FROM users")
# Connection pooling for high-concurrency
db = AsyncDatabase(
"trading.db",
pool_min=2,
pool_max=20,
pool_timeout=30.0
)
Installation¶
SQLite async support is included by default. For SQLCipher encryption:
pip install kstlib[db-crypto]
System dependencies for SQLCipher:
Platform |
Command |
|---|---|
Debian/Ubuntu |
|
macOS |
|
Windows |
Pre-built wheels included |
Check availability:
from kstlib.db import is_sqlcipher_available
if is_sqlcipher_available():
print("SQLCipher ready for encrypted databases")
else:
print("Install: pip install kstlib[db-crypto]")
Key Features¶
Async Interface: Built on
aiosqlitefor non-blocking database operationsConnection Pooling: Configurable pool with health checks and automatic retry
SQLCipher Encryption: Transparent encryption at rest with SQLCipher
SOPS Integration: Secure key management via SOPS-encrypted files
Transaction Support: Automatic commit/rollback with context managers
WAL Mode: Write-ahead logging enabled for better concurrency
Query Helpers: Convenient methods for common patterns (fetch_one, fetch_all, etc.)
Quick Start¶
Basic Usage¶
from kstlib.db import AsyncDatabase
async def main():
async with AsyncDatabase(":memory:") as db:
# Create table
await db.execute("""
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL
)
""")
# Insert data
await db.execute(
"INSERT INTO products (name, price) VALUES (?, ?)",
("Widget", 9.99)
)
# Query data
row = await db.fetch_one("SELECT * FROM products WHERE id=?", (1,))
print(row) # (1, 'Widget', 9.99)
# Fetch all rows
rows = await db.fetch_all("SELECT * FROM products")
# Fetch single value
count = await db.fetch_value("SELECT count(*) FROM products")
Encrypted Database¶
from kstlib.db import AsyncDatabase
# Option 1: Direct passphrase (not recommended for production)
db = AsyncDatabase("secure.db", cipher_key="my-secret-passphrase")
# Option 2: Environment variable
db = AsyncDatabase("secure.db", cipher_env="DB_ENCRYPTION_KEY")
# Option 3: SOPS file (recommended)
db = AsyncDatabase(
"secure.db",
cipher_sops="secrets.yml",
cipher_sops_key="database_key" # Key name in SOPS file
)
Transactions¶
from kstlib.db import AsyncDatabase
async with AsyncDatabase("app.db") as db:
# Transaction with automatic commit/rollback
async with db.transaction() as conn:
await conn.execute("INSERT INTO accounts VALUES (?, ?)", (1, 1000))
await conn.execute("INSERT INTO accounts VALUES (?, ?)", (2, 500))
# Commits automatically if no exception
# Rolls back if exception occurs
How It Works¶
Connection Pool Architecture¶
Application
│
▼
┌───────────────┐
│ AsyncDatabase │ High-level API
└───────┬───────┘
│
▼
┌────────────────┐
│ ConnectionPool │ Pool management
│ ○ ○ ○ ○ ○ │ (min_size to max_size connections)
└────────┬───────┘
│
▼
┌───────────────┐
│ aiosqlite │ Async SQLite wrapper
└───────┬───────┘
│
▼
┌──────────────────┐
│ SQLite/SQLCipher │ Database engine
└──────────────────┘
Pool Behavior¶
Parameter |
Default |
Hard Limits |
Description |
|---|---|---|---|
|
1 |
0-10 |
Minimum connections to maintain |
|
10 |
1-100 |
Maximum connections allowed |
|
30.0 |
1.0-300.0 |
Seconds to wait for available connection |
|
3 |
1-10 |
Retry attempts on connection failure |
|
0.5 |
0.1-60.0 |
Seconds between retries |
Connection Lifecycle¶
Acquire: Get connection from pool (or create new if under max)
Health Check: Verify connection is alive with
SELECT 1Use: Execute queries
Release: Return connection to pool for reuse
Encryption Flow¶
Key Resolution (priority order):
1. cipher_key (direct passphrase)
2. cipher_env (environment variable)
3. cipher_sops (SOPS file)
│
▼
┌─────────────────┐
│ resolve_cipher_ │
│ key() │
└────────┬────────┘
│
▼
┌─────────────────┐
│ apply_cipher_ │ PRAGMA key = '...'
│ key() │
└────────┬────────┘
│
▼
Encrypted DB
Configuration¶
In kstlib.conf.yml¶
db:
pool:
# Minimum connections to maintain in pool (0 = lazy pool, on-demand)
# Hard limits enforced in code: min 0, max 10
min_size: 1
# Maximum connections allowed in pool
# Hard limits enforced in code: min 1, max 100
max_size: 10
# Timeout for acquiring a connection (seconds)
# Hard limits enforced in code: min 1.0, max 300.0 (5 minutes)
acquire_timeout: 30.0
retry:
# Retry attempts on connection failure
# Hard limits enforced in code: min 1, max 10
max_attempts: 3
# Delay between retries (seconds)
# Hard limits enforced in code: min 0.1, max 60.0
delay: 0.5
# SQLCipher encryption (opt-in, requires: pip install kstlib[db-crypto])
cipher:
# Enable SQLCipher encryption (default: false)
enabled: false
# Key source: env | sops | passphrase
key_source: env
# Environment variable containing the encryption key
key_env: "KSTLIB_DB_KEY"
# SOPS configuration (when key_source: sops)
sops_path: null
sops_key: "db_key"
Note
All configuration values are enforced with hard limits in code for deep defense. Values outside the allowed range are automatically clamped to the nearest bound.
SOPS Secrets File¶
# secrets.yml (encrypted with SOPS)
database_key: ENC[AES256_GCM,data:...,iv:...,tag:...]
api_token: ENC[AES256_GCM,data:...,iv:...,tag:...]
Per-Instance Override¶
db = AsyncDatabase(
"app.db",
pool_min=5,
pool_max=50,
pool_timeout=60.0,
max_retries=5
)
Common Patterns¶
Trading Bot Database¶
from kstlib.db import AsyncDatabase
class TradingDatabase:
def __init__(self, db_path: str, sops_path: str):
self.db = AsyncDatabase(
db_path,
cipher_sops=sops_path,
pool_min=2,
pool_max=10
)
async def connect(self):
await self.db.connect()
await self._init_schema()
async def _init_schema(self):
await self.db.execute("""
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
timestamp TEXT NOT NULL
)
""")
async def record_trade(
self, symbol: str, side: str, quantity: float, price: float
):
from datetime import datetime, timezone
await self.db.execute(
"INSERT INTO trades (symbol, side, quantity, price, timestamp) "
"VALUES (?, ?, ?, ?, ?)",
(symbol, side, quantity, price, datetime.now(timezone.utc).isoformat())
)
async def get_recent_trades(self, limit: int = 100):
return await self.db.fetch_all(
"SELECT * FROM trades ORDER BY timestamp DESC LIMIT ?",
(limit,)
)
async def close(self):
await self.db.close()
Bulk Insert with Transaction¶
async with AsyncDatabase("data.db") as db:
async with db.transaction() as conn:
await conn.executemany(
"INSERT INTO records (key, value) VALUES (?, ?)",
[(f"key_{i}", i * 10) for i in range(1000)]
)
Multiple Databases¶
from kstlib.db import AsyncDatabase
class MultiDB:
def __init__(self):
self.users_db = AsyncDatabase("users.db")
self.trades_db = AsyncDatabase("trades.db", cipher_env="TRADES_KEY")
self.cache_db = AsyncDatabase(":memory:")
async def connect_all(self):
await self.users_db.connect()
await self.trades_db.connect()
await self.cache_db.connect()
async def close_all(self):
await self.users_db.close()
await self.trades_db.close()
await self.cache_db.close()
Connection Pool Monitoring¶
from kstlib.db import AsyncDatabase
db = AsyncDatabase("app.db", pool_min=2, pool_max=10)
async with db:
# Check pool statistics
stats = db.stats
print(f"Total connections: {stats.total_connections}")
print(f"Active: {stats.active_connections}")
print(f"Idle: {stats.idle_connections}")
print(f"Acquired: {stats.total_acquired}")
print(f"Released: {stats.total_released}")
print(f"Timeouts: {stats.total_timeouts}")
print(f"Errors: {stats.total_errors}")
Direct Connection Access¶
async with AsyncDatabase("app.db") as db:
# When you need raw connection access
async with db.connection() as conn:
await conn.execute("PRAGMA optimize")
await conn.execute("VACUUM")
Troubleshooting¶
Database file is locked¶
Cause: Another process has exclusive lock or WAL checkpoint pending.
Solution: Enable WAL mode (enabled by default) or close other connections:
# WAL is enabled automatically, but you can force checkpoint
async with db.connection() as conn:
await conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
Pool exhausted timeout¶
Cause: All connections in use and pool at max capacity.
Solution: Increase pool size or reduce connection hold time:
db = AsyncDatabase(
"app.db",
pool_max=50, # Increase max connections
pool_timeout=60.0 # Increase wait timeout
)
Encryption key mismatch¶
Cause: Wrong key used to open existing encrypted database.
Solution: Verify key source is correct:
# Check if database is encrypted
db = AsyncDatabase("app.db", cipher_key="my-key")
print(f"Encrypted: {db.is_encrypted}")
SOPS key not found¶
Cause: Key name not present in SOPS file.
Solution: Verify the key name matches:
# SOPS file contains: { "db_key": "secret" }
db = AsyncDatabase(
"app.db",
cipher_sops="secrets.yml",
cipher_sops_key="db_key" # Must match key in SOPS file
)
Connection health check fails¶
Cause: Connection dropped or database file moved/deleted.
Solution: Pool automatically handles this by discarding dead connections and creating new ones. Check stats for error count:
if db.stats.total_errors > 10:
print("High error rate - check database health")
API Reference¶
Full autodoc: Database
Class |
Description |
|---|---|
|
High-level async database interface |
|
Connection pool with health checks |
|
Pool statistics dataclass |
Function |
Description |
|---|---|
|
Check if SQLCipher is installed |
|
Resolve encryption key from various sources |
|
Apply SQLCipher key to connection (low-level) |
Exception |
Description |
|---|---|
|
Base exception for database operations |
|
Connection establishment failed |
|
Encryption/decryption failed |
|
No connections available in pool |
|
Transaction commit/rollback failed |