Authentication

Public API for kstlib.auth, providing a config-driven OAuth2/OIDC authentication layer. The module supports Authorization Code flow with PKCE, automatic token refresh, and token storage via SOPS, file, or in-memory backends.

Tip

Pair this reference with Authentication for the feature guide and quickstart examples.

Quick overview

Providers:

  • OIDCProvider - OpenID Connect provider with discovery and PKCE support

  • OAuth2Provider - Generic OAuth2 provider for non-OIDC endpoints

  • AbstractAuthProvider - Base class for custom provider implementations

Session management:

  • AuthSession - Context manager wrapping requests.Session with automatic token injection and refresh

Token storage:

  • MemoryTokenStorage - Ephemeral in-memory storage (development/testing)

  • FileTokenStorage - Plain JSON file storage (unencrypted, for dev/non-SOPS environments)

  • SOPSTokenStorage - Encrypted persistent storage using Mozilla SOPS

  • AbstractTokenStorage - Base class for custom storage backends

Token validation:

  • TokenChecker - 6-step JWT token verification with cryptographic proof

  • TokenCheckReport - Complete validation report (header, payload, signature, claims)

  • ValidationStep - Result of a single validation step

Config helpers:

  • OIDCProvider.from_config(name) - Load provider from kstlib.conf.yml

  • get_provider_config(name) - Retrieve raw provider configuration

  • list_configured_providers() - List all configured provider names


Providers

OIDCProvider

class kstlib.auth.OIDCProvider(name, config, token_storage, *, http_client=None)[source]

Bases: OAuth2Provider

OpenID Connect provider with PKCE and automatic discovery.

Extends OAuth2Provider with: - Automatic discovery of endpoints via .well-known/openid-configuration - PKCE (Proof Key for Code Exchange) for enhanced security - ID token validation (signature, claims) - UserInfo endpoint support

Example

>>> from kstlib.auth.providers import OIDCProvider, AuthProviderConfig  
>>> from kstlib.auth.token import MemoryTokenStorage  
>>>
>>> config = AuthProviderConfig(  
...     client_id="my-app",
...     issuer="https://auth.example.com",
...     scopes=["openid", "profile", "email"],
...     pkce=True,  # Enabled by default
... )
>>> provider = OIDCProvider("example", config, MemoryTokenStorage())  
>>> url, state = provider.get_authorization_url()  
>>> # User authenticates, provider.exchange_code() handles PKCE automatically
Config-driven usage:
>>> # Configure in kstlib.conf.yml:
>>> # auth:
>>> #   providers:
>>> #     corporate:
>>> #       type: oidc
>>> #       issuer: https://idp.corp.local/realms/main
>>> #       client_id: my-app
>>> #       scopes: [openid, profile, email]
>>> #       pkce: true
>>> provider = OIDCProvider.from_config("corporate")  
classmethod from_config(provider_name: 'str', *, config: 'dict[str, Any] | None' = None, http_client: 'httpx.Client | None' = None, **overrides: 'Any') 'OIDCProvider' -> OIDCProvider[source]

Create an OIDCProvider from configuration.

Loads provider settings from kstlib.conf.yml (auth.providers section) and creates a fully configured provider instance.

Parameters:
  • provider_name (str) – Name of the provider in config (e.g., “corporate”).

  • config (dict[str, Any] | None) – Optional explicit config dict (overrides global config).

  • http_client (Client | None) – Optional custom HTTP client.

  • **overrides (Any) – Direct parameter overrides (highest priority).

Returns:

Configured OIDCProvider instance.

Raises:

ConfigurationError – If provider not found or required fields missing.

Return type:

OIDCProvider

Example

>>> provider = OIDCProvider.from_config("corporate")  
>>> provider = OIDCProvider.from_config(
...     "corporate",
...     client_id="override-id",  # Override config value
... )  
__init__(self, name: 'str', config: 'AuthProviderConfig', token_storage: 'AbstractTokenStorage', *, http_client: 'httpx.Client | None' = None) 'None' -> None[source]

Initialize OIDC provider.

Supports three configuration modes:

  1. Auto discovery: Only issuer provided. Endpoints discovered via .well-known/openid-configuration.

  2. Hybrid mode: issuer + some explicit endpoints. Discovery fills missing endpoints, explicit ones take precedence (useful for buggy IDPs).

  3. Full manual: No issuer, all required endpoints explicit. No discovery attempted (for IDPs without discovery support).

Parameters:
  • name (str) – Provider identifier.

  • config (AuthProviderConfig) – Provider configuration.

  • token_storage (AbstractTokenStorage) – Token storage backend.

  • http_client (httpx.Client | None) – Optional custom HTTP client.

Raises:

ConfigurationError – If configuration is invalid.

property flow: AuthFlow

Return the OAuth2/OIDC flow type.

property discovery_mode: str

Return the current discovery mode.

Returns:

“auto”, “hybrid”, “manual”

Return type:

One of

discover(self, *, force: 'bool' = False) 'dict[str, Any]' -> dict[str, Any][source]

Fetch and cache the OIDC discovery document.

In manual mode (no issuer), this returns an empty dict without making any network calls. In auto/hybrid mode, it fetches the discovery document and updates endpoints accordingly.

Parameters:

force (bool) – Force refresh even if cached.

Returns:

Discovery document as dict (empty in manual mode).

Raises:

DiscoveryError – If discovery fails (only in auto/hybrid mode).

Return type:

dict[str, Any]

get_authorization_url(self, state: 'str | None' = None) 'tuple[str, str]' -> tuple[str, str][source]

Generate the authorization URL with PKCE if enabled.

Parameters:

state (str | None) – Optional state parameter.

Returns:

Tuple of (authorization_url, state).

Return type:

tuple[str, str]

exchange_code(self, code: 'str', state: 'str', *, code_verifier: 'str | None' = None) 'Token' -> Token[source]

Exchange authorization code for tokens, with PKCE support.

Parameters:
  • code (str) – Authorization code from callback.

  • state (str) – State parameter for validation.

  • code_verifier (str | None) – PKCE code verifier (auto-used from internal state if not provided).

Returns:

Token with access_token, id_token, etc.

Raises:

TokenExchangeError – If exchange fails.

Return type:

Token

refresh(self, token: 'Token | None' = None) 'Token' -> Token[source]

Refresh an access token, ensuring OIDC discovery is done first.

For OIDC providers, we must perform discovery before refreshing to ensure we have the correct token_endpoint URL. This is necessary because the endpoint URLs set during __init__ are temporary fallbacks that may not match the actual IDP endpoints.

Parameters:

token (Token | None) – Token to refresh. Uses stored token if not provided.

Returns:

New token with refreshed access_token.

Raises:

TokenRefreshError – If refresh fails.

Return type:

Token

get_userinfo(self, token: 'Token | None' = None) 'dict[str, Any]' -> dict[str, Any][source]

Fetch user information from the UserInfo endpoint.

Uses explicit userinfo_url if configured, otherwise gets it from discovery.

Parameters:

token (Token | None) – Token to use. Uses stored token if not provided.

Returns:

User claims from the UserInfo endpoint.

Raises:

AuthError – If request fails or endpoint not configured.

Return type:

dict[str, Any]

preflight(self) 'PreflightReport' -> PreflightReport[source]

Run preflight validation with OIDC-specific checks.

OAuth2Provider

class kstlib.auth.OAuth2Provider(name, config, token_storage, *, http_client=None)[source]

Bases: AbstractAuthProvider

OAuth2 Authorization Code flow provider.

Implements the standard OAuth2 Authorization Code flow for confidential clients. For public clients or enhanced security, use OIDCProvider with PKCE.

Example

>>> from kstlib.auth.providers import OAuth2Provider, AuthProviderConfig  
>>> from kstlib.auth.token import MemoryTokenStorage  
>>>
>>> config = AuthProviderConfig(  
...     client_id="my-app",
...     client_secret="secret",
...     authorize_url="https://auth.example.com/authorize",
...     token_url="https://auth.example.com/token",
...     scopes=["read", "write"],
... )
>>> provider = OAuth2Provider("example", config, MemoryTokenStorage())  
>>> url, state = provider.get_authorization_url()  
>>> # User visits URL, authorizes, redirected back with code
>>> token = provider.exchange_code(code="...", state=state)  
Config-driven usage:
>>> # Configure in kstlib.conf.yml:
>>> # auth:
>>> #   providers:
>>> #     github:
>>> #       type: oauth2
>>> #       authorization_endpoint: https://github.com/login/oauth/authorize
>>> #       token_endpoint: https://github.com/login/oauth/access_token
>>> #       client_id: my-app
>>> #       client_secret: sops://secrets.yaml#github.secret
>>> provider = OAuth2Provider.from_config("github")  
classmethod from_config(provider_name: 'str', *, config: 'dict[str, Any] | None' = None, http_client: 'httpx.Client | None' = None, **overrides: 'Any') 'OAuth2Provider' -> OAuth2Provider[source]

Create an OAuth2Provider from configuration.

Loads provider settings from kstlib.conf.yml (auth.providers section) and creates a fully configured provider instance.

Parameters:
  • provider_name (str) – Name of the provider in config.

  • config (dict[str, Any] | None) – Optional explicit config dict (overrides global config).

  • http_client (Client | None) – Optional custom HTTP client.

  • **overrides (Any) – Direct parameter overrides (highest priority).

Returns:

Configured OAuth2Provider instance.

Raises:

ConfigurationError – If provider not found or required fields missing.

Return type:

OAuth2Provider

Example

>>> provider = OAuth2Provider.from_config("github")  
__init__(self, name: 'str', config: 'AuthProviderConfig', token_storage: 'AbstractTokenStorage', *, http_client: 'httpx.Client | None' = None) 'None' -> None[source]

Initialize OAuth2 provider.

Parameters:
  • name (str) – Provider identifier.

  • config (AuthProviderConfig) – Provider configuration.

  • token_storage (AbstractTokenStorage) – Token storage backend.

  • http_client (httpx.Client | None) – Optional custom HTTP client.

property flow: AuthFlow

Return the OAuth2 flow type.

property tracer: HTTPTraceLogger

Get or create HTTP trace logger with config-driven settings.

property http_client: Client

Get or create HTTP client with TRACE logging hooks.

The client automatically includes any custom headers configured in config.headers. These headers are sent with all IDP requests, useful for environments requiring specific headers (e.g., Host header for load balancer validation).

SSL verification is controlled by config.ssl_verify and config.ssl_ca_bundle. See AuthProviderConfig for details.

close(self) 'None' -> None[source]

Close the HTTP client and release connection pool resources.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit context manager - close HTTP client and clear sensitive data.

get_authorization_url(self, state: 'str | None' = None) 'tuple[str, str]' -> tuple[str, str][source]

Generate the authorization URL.

Parameters:

state (str | None) – Optional state parameter. If provided, a warning is logged since caller-supplied state weakens CSRF protection. Generated securely if not provided.

Returns:

Tuple of (authorization_url, state).

Return type:

tuple[str, str]

exchange_code(self, code: 'str', state: 'str', *, code_verifier: 'str | None' = None) 'Token' -> Token[source]

Exchange authorization code for tokens.

Parameters:
  • code (str) – Authorization code from callback.

  • state (str) – State parameter for validation.

  • code_verifier (str | None) – PKCE code verifier (ignored for basic OAuth2).

Returns:

Token with access_token and optionally refresh_token.

Raises:

TokenExchangeError – If exchange fails.

Return type:

Token

refresh(self, token: 'Token | None' = None) 'Token' -> Token[source]

Refresh an expired token.

Parameters:

token (Token | None) – Token to refresh. Uses stored token if not provided.

Returns:

New Token.

Raises:

TokenRefreshError – If refresh fails.

Return type:

Token

revoke(self, token: 'Token | None' = None) 'bool' -> bool[source]

Revoke a token.

Parameters:

token (Token | None) – Token to revoke. Uses stored token if not provided.

Returns:

True if revoked, False if revocation not supported.

Return type:

bool

get_userinfo(self, token: 'Token | None' = None) 'dict[str, Any]' -> dict[str, Any][source]

Fetch user information from the UserInfo endpoint.

Requires userinfo_url to be configured in the provider config.

Parameters:

token (Token | None) – Token to use. Uses stored token if not provided.

Returns:

User claims from the UserInfo endpoint.

Raises:
  • ConfigurationError – If userinfo_url is not configured.

  • AuthError – If request fails.

Return type:

dict[str, Any]

Example

>>> provider = OAuth2Provider.from_config("github")  
>>> userinfo = provider.get_userinfo()  
>>> print(userinfo["login"])  
preflight(self) 'PreflightReport' -> PreflightReport[source]

Run preflight validation checks.

Returns:

PreflightReport with validation results.

Return type:

PreflightReport

AbstractAuthProvider

class kstlib.auth.providers.base.AbstractAuthProvider(name, config, token_storage)[source]

Bases: ABC

Abstract base class for OAuth2/OIDC authentication providers.

Subclasses must implement the abstract methods to handle the specific authentication flow (OAuth2, OIDC, etc.).

name

Provider identifier (matches config key).

config

Provider configuration.

token_storage

Storage backend for tokens.

__init__(self, name: 'str', config: 'AuthProviderConfig', token_storage: 'AbstractTokenStorage') 'None' -> None[source]

Initialize the provider.

Parameters:
  • name (str) – Provider identifier.

  • config (AuthProviderConfig) – Provider configuration.

  • token_storage (AbstractTokenStorage) – Token storage backend.

property is_authenticated: bool

Check if a valid (non-expired) token is available.

abstract property flow: AuthFlow

Return the OAuth2 flow used by this provider.

abstract get_authorization_url(self, state: 'str | None' = None) 'tuple[str, str]' -> tuple[str, str][source]

Generate the authorization URL for the user to visit.

Parameters:

state (str | None) – Optional state parameter. Generated if not provided.

Returns:

Tuple of (authorization_url, state).

Return type:

tuple[str, str]

abstract exchange_code(self, code: 'str', state: 'str', *, code_verifier: 'str | None' = None) 'Token' -> Token[source]

Exchange an authorization code for tokens.

Parameters:
  • code (str) – Authorization code from callback.

  • state (str) – State parameter for CSRF validation.

  • code_verifier (str | None) – PKCE code verifier (required if PKCE was used).

Returns:

Token object with access_token, refresh_token, etc.

Raises:

TokenExchangeError – If the exchange fails.

Return type:

Token

abstract refresh(self, token: 'Token | None' = None) 'Token' -> Token[source]

Refresh an expired token.

Parameters:

token (Token | None) – Token to refresh. Uses stored token if not provided.

Returns:

New Token object.

Raises:

TokenRefreshError – If refresh fails or no refresh_token available.

Return type:

Token

abstract revoke(self, token: 'Token | None' = None) 'bool' -> bool[source]

Revoke a token at the authorization server.

Parameters:

token (Token | None) – Token to revoke. Uses stored token if not provided.

Returns:

True if revoked successfully, False if revocation not supported.

Return type:

bool

get_token(self, *, auto_refresh: 'bool' = True) 'Token | None' -> Token | None[source]

Get the current token, optionally refreshing if expired.

Parameters:

auto_refresh (bool) – If True and token is expired, attempt refresh.

Returns:

Token if available, None otherwise.

Return type:

Token | None

save_token(self, token: 'Token') 'None' -> None[source]

Save a token to storage.

Parameters:

token (Token) – Token to save.

clear_token(self) 'None' -> None[source]

Clear the current token from memory and storage.

abstract preflight(self) 'PreflightReport' -> PreflightReport[source]

Run preflight validation checks.

Returns:

PreflightReport with results for each validation step.

Return type:

PreflightReport

__enter__(self) 'Self' -> Self[source]

Enter context manager.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit context manager - clear sensitive data from memory.


Session Management

AuthSession

class kstlib.auth.AuthSession(provider, *, timeout=30.0, auto_refresh=True, retry_on_401=True, ssl_verify=None, ssl_ca_bundle=None)[source]

Bases: object

HTTP session with automatic token injection and refresh.

Wraps httpx.Client (sync) or httpx.AsyncClient (async) to automatically: - Inject Bearer token in Authorization header - Refresh expired tokens before making requests - Handle 401 responses by refreshing and retrying

Example (sync):
>>> from kstlib.auth import AuthSession, get_provider  
>>> provider = get_provider("corporate")  
>>> with AuthSession(provider) as session:  
...     response = session.get("https://api.example.com/users/me")
...     print(response.json())
Example (async):
>>> async with AuthSession(provider) as session:  
...     response = await session.get("https://api.example.com/users/me")
...     print(response.json())
__init__(self, provider: 'AbstractAuthProvider', *, timeout: 'float' = 30.0, auto_refresh: 'bool' = True, retry_on_401: 'bool' = True, ssl_verify: 'bool | None' = None, ssl_ca_bundle: 'str | None' = None) 'None' -> None[source]

Initialize authenticated session.

Parameters:
  • provider (AbstractAuthProvider) – Authentication provider to use for tokens.

  • timeout (float) – Default request timeout in seconds.

  • auto_refresh (bool) – Automatically refresh expired tokens before requests.

  • retry_on_401 (bool) – Retry request after token refresh on 401 response.

  • ssl_verify (bool | None) – Override SSL verification (True/False). If None, uses provider’s SSL config or global config.

  • ssl_ca_bundle (str | None) – Override CA bundle path. If None, uses provider’s SSL config or global config.

__enter__(self) 'Self' -> Self[source]

Enter sync context manager.

__exit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit sync context manager.

async __aenter__(self) 'Self' -> Self[source]

Enter async context manager.

async __aexit__(self, exc_type: 'type[BaseException] | None', exc_val: 'BaseException | None', exc_tb: 'types.TracebackType | None') 'None' -> None[source]

Exit async context manager.

get(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated GET request.

post(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated POST request.

put(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated PUT request.

patch(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated PATCH request.

delete(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated DELETE request.

head(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated HEAD request.

options(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated OPTIONS request.

async aget(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async GET request.

async apost(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async POST request.

async aput(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async PUT request.

async apatch(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async PATCH request.

async adelete(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async DELETE request.

async ahead(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async HEAD request.

async aoptions(self, url: 'str', **kwargs: 'Any') 'httpx.Response' -> httpx.Response[source]

Make authenticated async OPTIONS request.


Token Storage

SOPSTokenStorage

class kstlib.auth.token.SOPSTokenStorage(directory, *, sops_binary='sops', age_recipients=None)[source]

Bases: AbstractTokenStorage

SOPS-encrypted token storage.

Tokens are encrypted using SOPS before being written to disk. Uses the SOPS CLI directly for encryption/decryption operations.

__init__(self, directory: 'Path | str', *, sops_binary: 'str' = 'sops', age_recipients: 'list[str] | None' = None) 'None' -> None[source]

Initialize SOPS storage.

Parameters:
  • directory (Path | str) – Directory to store encrypted token files.

  • sops_binary (str) – Path to sops binary (default: “sops”).

  • age_recipients (list[str] | None) – Age public keys for encryption. If not provided, relies on .sops.yaml or environment.

save(self, provider_name: 'str', token: 'Token') 'None' -> None[source]

Save token encrypted with SOPS.

load(self, provider_name: 'str') 'Token | None' -> Token | None[source]

Load and decrypt token from SOPS file.

delete(self, provider_name: 'str') 'bool' -> bool[source]

Delete encrypted token file.

exists(self, provider_name: 'str') 'bool' -> bool[source]

Check if encrypted token file exists.

sensitive_token(self, provider_name: 'str') 'Iterator[Token | None]' -> Iterator[Token | None][source]

Context manager for secure token access with cleanup.

FileTokenStorage

class kstlib.auth.token.FileTokenStorage(directory=None)[source]

Bases: AbstractTokenStorage

Plain JSON file token storage.

Tokens are stored as unencrypted JSON files with restrictive permissions (600). Suitable for development, testing, or environments where SOPS is unavailable.

Warning

Tokens are stored in plaintext. Use SOPS storage for production environments where token confidentiality is critical.

__init__(self, directory: 'Path | str | None' = None) 'None' -> None[source]

Initialize file storage.

Parameters:

directory (Path | str | None) – Directory to store token files. Default: ~/.config/kstlib/auth/tokens

save(self, provider_name: 'str', token: 'Token') 'None' -> None[source]

Save token to JSON file with restrictive permissions.

load(self, provider_name: 'str') 'Token | None' -> Token | None[source]

Load token from JSON file.

delete(self, provider_name: 'str') 'bool' -> bool[source]

Delete token file.

exists(self, provider_name: 'str') 'bool' -> bool[source]

Check if token file exists.

MemoryTokenStorage

class kstlib.auth.token.MemoryTokenStorage[source]

Bases: AbstractTokenStorage

In-memory token storage (for development/testing).

Tokens are stored in a dictionary and lost when the process exits. No encryption or persistence.

__init__(self) 'None' -> None[source]

Initialize empty storage.

save(self, provider_name: 'str', token: 'Token') 'None' -> None[source]

Store token in memory.

load(self, provider_name: 'str') 'Token | None' -> Token | None[source]

Retrieve token from memory.

delete(self, provider_name: 'str') 'bool' -> bool[source]

Remove token from memory.

exists(self, provider_name: 'str') 'bool' -> bool[source]

Check if token exists in memory.

clear_all(self) 'None' -> None[source]

Clear all tokens from memory.

AbstractTokenStorage

class kstlib.auth.token.AbstractTokenStorage[source]

Bases: ABC

Abstract base class for token storage backends.

Implementations handle persisting and retrieving tokens, with optional encryption (e.g., SOPS) for secure storage.

abstract save(self, provider_name: 'str', token: 'Token') 'None' -> None[source]

Persist a token for a provider.

Parameters:
  • provider_name (str) – Provider identifier.

  • token (Token) – Token to save.

Raises:

TokenStorageError – If save fails.

abstract load(self, provider_name: 'str') 'Token | None' -> Token | None[source]

Load a token for a provider.

Parameters:

provider_name (str) – Provider identifier.

Returns:

Token if found, None otherwise.

Raises:

TokenStorageError – If load fails (not for missing tokens).

Return type:

Token | None

abstract delete(self, provider_name: 'str') 'bool' -> bool[source]

Delete a token for a provider.

Parameters:

provider_name (str) – Provider identifier.

Returns:

True if token existed and was deleted.

Return type:

bool

abstract exists(self, provider_name: 'str') 'bool' -> bool[source]

Check if a token exists for a provider.

Parameters:

provider_name (str) – Provider identifier.

Returns:

True if token exists.

Return type:

bool

sensitive_token(self, provider_name: 'str') 'Iterator[Token | None]' -> Iterator[Token | None][source]

Context manager for secure token access.

Loads the token and yields it. On exit, clears the local reference.

Note

Python strings are immutable, so the actual token bytes cannot be scrubbed from memory. This context manager minimizes the exposure window by deleting the reference as soon as possible.

Parameters:

provider_name (str) – Provider identifier.

Yields:

Token if available, None otherwise.

Example

>>> with storage.sensitive_token("corporate") as token:  
...     if token:
...         print(token.access_token)
... # token reference cleared here

Token Validation

Independent JWT token verification with cryptographic proof. Works with any RSA-signed JWT (id_token or access_token) whose issuer exposes an OpenID Connect discovery endpoint. See the CLI Reference feature guide for the full proof chain diagram and use cases.

TokenChecker

class kstlib.auth.check.TokenChecker(http_client, expected_issuer=None, expected_audience=None)[source]

Bases: object

Validates JWT tokens with full cryptographic verification.

Runs a 6-step validation chain:
  1. Decode JWT structure (header + payload)

  2. Discover issuer (OIDC discovery document)

  3. Fetch JWKS (JSON Web Key Set)

  4. Extract public key (JWK to PEM, SHA-256 fingerprint)

  5. Verify signature (cryptographic proof)

  6. Validate claims (iss, aud, exp, iat, nbf)

Parameters:
  • http_client (httpx.Client) – httpx.Client for HTTP requests.

  • expected_issuer (str | None) – Expected issuer URL (for claim validation). If None, the issuer from the token payload is used.

  • expected_audience (str | None) – Expected audience (client_id). If None, audience validation is skipped.

Example

>>> import httpx  
>>> client = httpx.Client(verify="/path/to/ca.pem")  
>>> checker = TokenChecker(client, expected_audience="my-app")  
>>> report = checker.check(token_string)  
__init__(self, http_client: 'httpx.Client', expected_issuer: 'str | None' = None, expected_audience: 'str | None' = None) 'None' -> None[source]

Initialize the token checker with an HTTP client and optional expected claims.

check(self, token_str: 'str', *, token_type: 'str' = 'id_token') 'TokenCheckReport' -> TokenCheckReport[source]

Run full token validation chain.

Parameters:
  • token_str (str) – Raw JWT string to validate.

  • token_type (str) – Label for the report (“id_token” or “access_token”).

Returns:

TokenCheckReport with all validation results.

Return type:

TokenCheckReport

TokenCheckReport

class kstlib.auth.check.TokenCheckReport(token_type='id_token', valid=False, steps=<factory>, header=<factory>, payload=<factory>, signature_algorithm=None, key_id=None, discovery_url=None, discovery_data=<factory>, jwks_uri=None, jwks_data=<factory>, public_key_pem=None, key_fingerprint=None, key_type=None, key_size_bits=None, x509_info=<factory>, issuer_match=None, audience_match=None, error=None)[source]

Bases: object

Complete token validation report.

token_type

Whether “id_token” or “access_token” was checked.

Type:

str

valid

Overall result (True only if ALL steps passed).

Type:

bool

steps

Ordered list of validation steps executed.

Type:

list[kstlib.auth.check.ValidationStep]

header

Decoded JWT header (alg, kid, typ, …).

Type:

dict[str, Any]

payload

Decoded JWT payload (claims).

Type:

dict[str, Any]

signature_algorithm

Algorithm from JWT header (e.g. “RS256”, “RS512”).

Type:

str | None

key_id

Key ID from JWT header (kid).

Type:

str | None

discovery_url

OpenID Connect discovery URL used.

Type:

str | None

discovery_data

Relevant fields from discovery document.

Type:

dict[str, Any]

jwks_uri

JWKS endpoint URL.

Type:

str | None

public_key_pem

PEM-encoded public key used for verification.

Type:

str | None

key_fingerprint

SHA-256 fingerprint of the public key (hex).

Type:

str | None

issuer_match

Whether iss claim matches expected issuer.

Type:

bool | None

audience_match

Whether aud claim matches expected audience.

Type:

bool | None

error

Error message if validation failed.

Type:

str | None

token_type: str
valid: bool
steps: list[ValidationStep]
header: dict[str, Any]
payload: dict[str, Any]
signature_algorithm: str | None
key_id: str | None
discovery_url: str | None
discovery_data: dict[str, Any]
jwks_uri: str | None
jwks_data: dict[str, Any]
public_key_pem: str | None
key_fingerprint: str | None
key_type: str | None
key_size_bits: int | None
x509_info: dict[str, Any]
issuer_match: bool | None
audience_match: bool | None
error: str | None
to_dict(self) 'dict[str, Any]' -> dict[str, Any][source]

Serialize report to dictionary for JSON output.

Returns:

Dictionary representation of the report.

Return type:

dict[str, Any]

__init__(self, token_type: 'str' = 'id_token', valid: 'bool' = False, steps: 'list[ValidationStep]' = <factory>, header: 'dict[str, Any]' = <factory>, payload: 'dict[str, Any]' = <factory>, signature_algorithm: 'str | None' = None, key_id: 'str | None' = None, discovery_url: 'str | None' = None, discovery_data: 'dict[str, Any]' = <factory>, jwks_uri: 'str | None' = None, jwks_data: 'dict[str, Any]' = <factory>, public_key_pem: 'str | None' = None, key_fingerprint: 'str | None' = None, key_type: 'str | None' = None, key_size_bits: 'int | None' = None, x509_info: 'dict[str, Any]' = <factory>, issuer_match: 'bool | None' = None, audience_match: 'bool | None' = None, error: 'str | None' = None) None -> None

ValidationStep

class kstlib.auth.check.ValidationStep(name, passed, message, details=<factory>)[source]

Bases: object

Result of a single validation step.

name

Step identifier (e.g. “decode_structure”, “verify_signature”).

Type:

str

passed

Whether the step succeeded.

Type:

bool

message

Human-readable result description.

Type:

str

details

Optional extra information for verbose output.

Type:

dict[str, Any]

name: str
passed: bool
message: str
details: dict[str, Any]
__init__(self, name: 'str', passed: 'bool', message: 'str', details: 'dict[str, Any]' = <factory>) None -> None

Models

Token

class kstlib.auth.models.Token(access_token, token_type=TokenType.BEARER, expires_at=None, refresh_token=None, scope=<factory>, id_token=None, issued_at=<factory>, metadata=<factory>)[source]

Bases: object

Represents an OAuth2/OIDC token set.

access_token

The access token issued by the authorization server.

Type:

str

token_type

Token type (usually “Bearer”).

Type:

kstlib.auth.models.TokenType | str

expires_at

Absolute expiration time (UTC). None if unknown.

Type:

datetime.datetime | None

refresh_token

Optional refresh token for obtaining new access tokens.

Type:

str | None

scope

List of granted scopes.

Type:

list[str]

id_token

OIDC ID token (JWT) containing user claims. None for pure OAuth2.

Type:

str | None

issued_at

When the token was issued (UTC).

Type:

datetime.datetime

metadata

Additional provider-specific data.

Type:

dict[str, Any]

Example

>>> from datetime import datetime, timezone
>>> token = Token(
...     access_token="eyJhbGc...",
...     expires_at=datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
...     refresh_token="dGhpcyBpcyBh...",
...     scope=["openid", "profile"],
... )
>>> token.is_expired
True
>>> token.is_refreshable
True
access_token: str
token_type: TokenType | str
expires_at: datetime | None
refresh_token: str | None
scope: list[str]
id_token: str | None
issued_at: datetime
metadata: dict[str, Any]
property is_expired: bool

Check if the access token has expired.

Returns:

True if expired or expiration is unknown and token is old (>1h).

property is_refreshable: bool

Check if the token can be refreshed.

Returns:

True if a refresh_token is available.

property expires_in: int | None

Seconds until expiration. None if unknown, negative if expired.

property should_refresh: bool

Check if the token should be proactively refreshed.

Returns:

True if token expires within 60 seconds or is already expired.

classmethod from_response(data: 'dict[str, Any]') 'Token' -> Token[source]

Create a Token from an OAuth2 token response.

Parameters:

data (dict[str, Any]) – Raw token response from the authorization server.

Returns:

Token instance populated from the response.

Return type:

Token

Example

>>> response = {
...     "access_token": "eyJhbGc...",
...     "token_type": "Bearer",
...     "expires_in": 3600,
...     "refresh_token": "dGhpcyBpcyBh...",
...     "scope": "openid profile",
...     "id_token": "eyJhbGc...",
... }
>>> token = Token.from_response(response)
>>> token.scope
['openid', 'profile']
to_dict(self) 'dict[str, Any]' -> dict[str, Any][source]

Serialize token to dictionary for storage.

Returns:

Dictionary representation suitable for JSON serialization.

Return type:

dict[str, Any]

classmethod from_dict(data: 'dict[str, Any]') 'Token' -> Token[source]

Deserialize token from dictionary (storage retrieval).

Parameters:

data (dict[str, Any]) – Dictionary from to_dict() or storage.

Returns:

Token instance.

Return type:

Token

__init__(self, access_token: 'str', token_type: 'TokenType | str' = <TokenType.BEARER: 'Bearer'>, expires_at: 'datetime | None' = None, refresh_token: 'str | None' = None, scope: 'list[str]' = <factory>, id_token: 'str | None' = None, issued_at: 'datetime' = <factory>, metadata: 'dict[str, Any]' = <factory>) None -> None

Config Helpers

get_provider_config

kstlib.auth.config.get_provider_config(provider_name: 'str', *, config: 'Mapping[str, Any] | None' = None) 'dict[str, Any] | None' -> dict[str, Any] | None[source]

Get configuration for a specific auth provider.

Parameters:
  • provider_name (str) – Name of the provider to look up.

  • config (Mapping[str, Any] | None) – Optional explicit config dict (overrides global).

Returns:

Provider configuration dict, or None if not found.

Return type:

dict[str, Any] | None

Example

>>> cfg = get_provider_config("nonexistent")
>>> cfg is None
True

list_configured_providers

kstlib.auth.config.list_configured_providers(*, config: 'Mapping[str, Any] | None' = None) 'list[str]' -> list[str][source]

List all configured provider names.

Parameters:

config (Mapping[str, Any] | None) – Optional explicit config dict.

Returns:

List of provider names.

Return type:

list[str]