"""Argon2id password hashing helpers.
This module wraps the optional ``argon2-cffi`` backend (install via
``pip install kstlib[passwords]``) to provide password hashing and
verification with safe, OWASP-aligned defaults.
Cost parameters follow a ``kwargs > kstlib config > defaults`` cascade. Any
resolved parameter below the security floor is clamped up and a ``[SECURITY]``
warning is logged. Passwords and hashes are never written to the logs.
Example:
>>> from kstlib.secure import hash_password, verify_password
>>> stored = hash_password("correct horse battery staple") # doctest: +SKIP
>>> verify_password("correct horse battery staple", stored) # doctest: +SKIP
True
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Final
from kstlib.config.exceptions import KstlibError
from kstlib.logging import get_logger
try:
from argon2 import PasswordHasher
from argon2.exceptions import (
HashingError,
InvalidHashError,
VerificationError,
VerifyMismatchError,
)
except ImportError: # pragma: no cover - argon2-cffi is an optional extra
_ARGON2_AVAILABLE = False
else:
_ARGON2_AVAILABLE = True
log = get_logger(__name__)
__all__ = [
"DEFAULT_HASH_LEN",
"DEFAULT_MEMORY_COST",
"DEFAULT_PARALLELISM",
"DEFAULT_SALT_LEN",
"DEFAULT_TIME_COST",
"MAX_PASSWORD_LENGTH",
"MIN_HASH_LEN",
"MIN_MEMORY_COST",
"MIN_PARALLELISM",
"MIN_SALT_LEN",
"MIN_TIME_COST",
"InvalidPasswordHashError",
"PasswordError",
"hash_password",
"needs_rehash",
"verify_password",
]
# Baseline cost parameters: mirror the argon2-cffi 25.x PasswordHasher defaults
# (RFC 9106 low-memory profile), which already track the OWASP recommendation.
DEFAULT_TIME_COST: Final = 3
DEFAULT_MEMORY_COST: Final = 65536 # KiB (64 MiB)
DEFAULT_PARALLELISM: Final = 4
DEFAULT_HASH_LEN: Final = 32 # bytes
DEFAULT_SALT_LEN: Final = 16 # bytes
# Security floors: a resolved parameter below these is clamped up. The floor is
# the OWASP "minimum configuration" baseline (19 MiB / t=2 / p=1); the defaults
# sit well above it, so the floor only blocks a deliberate downgrade.
MIN_TIME_COST: Final = 2
MIN_MEMORY_COST: Final = 19456 # KiB (19 MiB, OWASP minimum baseline)
MIN_PARALLELISM: Final = 1
MIN_HASH_LEN: Final = 16 # bytes
MIN_SALT_LEN: Final = 16 # bytes
# Hardening: cap the accepted password size so a multi-megabyte input cannot be
# used as a CPU/memory denial-of-service vector against the hasher.
MAX_PASSWORD_LENGTH: Final = 4096 # bytes
[docs]
class PasswordError(KstlibError, RuntimeError):
"""Raised when a password hashing operation cannot be completed."""
[docs]
class InvalidPasswordHashError(PasswordError):
"""Raised when a stored value is not a valid Argon2 hash."""
@dataclass(frozen=True, slots=True)
class _CostParams:
"""Resolved and clamped Argon2 cost parameters."""
time_cost: int
memory_cost: int
parallelism: int
hash_len: int
salt_len: int
def _ensure_available() -> None:
"""Raise a clear error when the optional argon2-cffi backend is missing."""
if not _ARGON2_AVAILABLE:
raise PasswordError("password hashing requires argon2-cffi: pip install kstlib[passwords]")
def _encode_password(password: str | bytes) -> bytes:
"""Return *password* as UTF-8 bytes, rejecting unexpected types."""
if isinstance(password, str):
return password.encode("utf-8")
if isinstance(password, bytes):
return password
raise PasswordError("password must be str or bytes")
def _pick(override: int | None, section: Any, key: str, default: int) -> int:
"""Resolve one cost value via the kwargs > config > default cascade."""
if override is not None:
return override
value: Any = section.get(key) if hasattr(section, "get") else None
if isinstance(value, int) and not isinstance(value, bool):
return value
return default
def _clamp_value(name: str, value: int, floor: int) -> int:
"""Clamp *value* up to *floor*, logging a security warning when it does."""
if value < floor:
log.warning(
"[SECURITY] argon2 %s=%d is below the security floor %d; clamped to floor",
name,
value,
floor,
)
return floor
return value
def _load_password_config() -> Any:
"""Return the ``secure.passwords`` config section, or an empty mapping.
Falls back to an empty mapping when no kstlib configuration is loaded, so
hashing works standalone without any ``kstlib.conf.yml`` present.
"""
try:
from kstlib.config import get_config
from kstlib.config.exceptions import ConfigNotLoadedError
except ImportError: # pragma: no cover - kstlib.config is always present
return {}
try:
cfg: Any = get_config()
except ConfigNotLoadedError:
return {}
except Exception: # any loader failure falls back to safe defaults
log.debug("Could not load kstlib config; using default argon2 parameters")
return {}
secure_section: Any = cfg.get("secure", {}) if hasattr(cfg, "get") else {}
passwords_section: Any = secure_section.get("passwords", {}) if hasattr(secure_section, "get") else {}
return passwords_section if hasattr(passwords_section, "get") else {}
def _resolve_params(
*,
time_cost: int | None,
memory_cost: int | None,
parallelism: int | None,
hash_len: int | None,
salt_len: int | None,
) -> _CostParams:
"""Resolve cost parameters (kwargs > config > defaults) and clamp to floors."""
section = _load_password_config()
return _CostParams(
time_cost=_clamp_value("time_cost", _pick(time_cost, section, "time_cost", DEFAULT_TIME_COST), MIN_TIME_COST),
memory_cost=_clamp_value(
"memory_cost",
_pick(memory_cost, section, "memory_cost", DEFAULT_MEMORY_COST),
MIN_MEMORY_COST,
),
parallelism=_clamp_value(
"parallelism",
_pick(parallelism, section, "parallelism", DEFAULT_PARALLELISM),
MIN_PARALLELISM,
),
hash_len=_clamp_value("hash_len", _pick(hash_len, section, "hash_len", DEFAULT_HASH_LEN), MIN_HASH_LEN),
salt_len=_clamp_value("salt_len", _pick(salt_len, section, "salt_len", DEFAULT_SALT_LEN), MIN_SALT_LEN),
)
[docs]
def hash_password(
password: str | bytes,
*,
time_cost: int | None = None,
memory_cost: int | None = None,
parallelism: int | None = None,
) -> str:
"""Hash a password with Argon2id and return the encoded PHC string.
The cost knobs (``time_cost``, ``memory_cost``, ``parallelism``) resolve as
``kwargs > kstlib config > defaults`` and are clamped to the security floors
(``time_cost>=2``, ``memory_cost>=19456`` KiB, ``parallelism>=1``). Defaults
track the argon2-cffi 25.x / OWASP recommendation (``time_cost=3``,
``memory_cost=65536`` KiB, ``parallelism=4``).
Output sizing (``hash_len``, ``salt_len``) is intentionally not a per-call
argument: it must stay consistent across all stored hashes. Configure it once
via ``secure.passwords.hash_len`` / ``secure.passwords.salt_len`` in
``kstlib.conf.yml`` (defaults ``hash_len=32``, ``salt_len=16``; floors
``hash_len>=16``, ``salt_len>=16``).
Args:
password: Plaintext password as ``str`` (UTF-8 encoded) or ``bytes``.
time_cost: Number of iterations. Defaults to the cascade value.
memory_cost: Memory usage in KiB. Defaults to the cascade value.
parallelism: Number of parallel lanes. Defaults to the cascade value.
Returns:
The Argon2id hash encoded as a PHC string (``$argon2id$...``).
Raises:
PasswordError: If argon2-cffi is not installed, the password is not
``str`` or ``bytes``, the password exceeds ``MAX_PASSWORD_LENGTH``,
or the underlying hashing operation fails.
Example:
>>> from kstlib.secure import hash_password
>>> hash_password("s3cret") # doctest: +SKIP
'$argon2id$v=19$m=65536,t=3,p=4$...'
"""
_ensure_available()
secret = _encode_password(password)
if len(secret) > MAX_PASSWORD_LENGTH:
raise PasswordError(f"password exceeds maximum length of {MAX_PASSWORD_LENGTH} bytes")
params = _resolve_params(
time_cost=time_cost,
memory_cost=memory_cost,
parallelism=parallelism,
hash_len=None,
salt_len=None,
)
hasher = PasswordHasher(
time_cost=params.time_cost,
memory_cost=params.memory_cost,
parallelism=params.parallelism,
hash_len=params.hash_len,
salt_len=params.salt_len,
)
try:
return hasher.hash(secret)
except HashingError as exc:
raise PasswordError(f"argon2 hashing failed: {exc}") from exc
[docs]
def verify_password(password: str | bytes, stored_hash: str) -> bool:
"""Verify a password against a stored Argon2id hash in constant time.
Args:
password: Plaintext password to check (``str`` or ``bytes``).
stored_hash: Previously stored PHC hash string.
Returns:
``True`` if the password matches, ``False`` otherwise (including when
the password exceeds ``MAX_PASSWORD_LENGTH``).
Raises:
PasswordError: If argon2-cffi is not installed or the password is not
``str`` or ``bytes``.
InvalidPasswordHashError: If ``stored_hash`` is not a valid Argon2 hash.
Example:
>>> from kstlib.secure import hash_password, verify_password
>>> stored = hash_password("s3cret") # doctest: +SKIP
>>> verify_password("s3cret", stored) # doctest: +SKIP
True
"""
_ensure_available()
secret = _encode_password(password)
if len(secret) > MAX_PASSWORD_LENGTH:
# An oversized candidate cannot match a hash of a bounded password.
# Return False (no raise) to avoid leaking the rejection via an
# exception and to keep the constant-time-ish verification contract.
return False
hasher = PasswordHasher()
try:
hasher.verify(stored_hash, secret)
except VerifyMismatchError:
return False
except (InvalidHashError, VerificationError) as exc:
raise InvalidPasswordHashError("stored value is not a valid argon2 hash") from exc
return True
[docs]
def needs_rehash(stored_hash: str) -> bool:
"""Return whether a stored hash should be recomputed with current parameters.
Compares the parameters embedded in *stored_hash* against the currently
resolved policy (kstlib config or defaults). Returns ``True`` when the stored
hash is weaker than the current parameters and should be upgraded on the next
successful login.
Args:
stored_hash: Previously stored PHC hash string.
Returns:
``True`` if the hash should be regenerated, ``False`` otherwise.
Raises:
PasswordError: If argon2-cffi is not installed.
InvalidPasswordHashError: If ``stored_hash`` is not a valid Argon2 hash.
"""
_ensure_available()
params = _resolve_params(time_cost=None, memory_cost=None, parallelism=None, hash_len=None, salt_len=None)
hasher = PasswordHasher(
time_cost=params.time_cost,
memory_cost=params.memory_cost,
parallelism=params.parallelism,
hash_len=params.hash_len,
salt_len=params.salt_len,
)
try:
return hasher.check_needs_rehash(stored_hash)
except InvalidHashError as exc:
raise InvalidPasswordHashError("stored value is not a valid argon2 hash") from exc