Skip to content

Custom Challenge Store

The default InMemoryChallengeStore stores challenges in a Python dict. For multi-instance deployments, replace it with a shared-storage implementation.

The ChallengeStore Protocol

from typing import Protocol

class ChallengeStore(Protocol):
    def create(self, address: str) -> str: ...
    def validate(self, address: str, nonce: str) -> bool: ...
    def cleanup(self) -> int: ...
Method Description
create(address) Generate a random nonce, store it for the given address, return the nonce.
validate(address, nonce) Check if the nonce matches, consume it (one-time use), return True/False.
cleanup() Remove expired entries, return count of entries removed.

Redis Example

import secrets
from pico_ioc import component
from pico_server_auth import ChallengeStore, ServerAuthSettings

@component(on_missing_selector=ChallengeStore)
class RedisChallengeStore:
    """Challenge store backed by Redis with automatic TTL expiry."""

    def __init__(self, settings: ServerAuthSettings):
        import redis
        self._redis = redis.Redis(host="localhost", port=6379, db=0)
        self._ttl = settings.challenge_ttl_seconds
        self._prefix = "pico:challenge:"

    def create(self, address: str) -> str:
        nonce = secrets.token_hex(32)
        key = f"{self._prefix}{address}"
        self._redis.setex(key, self._ttl, nonce)
        return nonce

    def validate(self, address: str, nonce: str) -> bool:
        key = f"{self._prefix}{address}"
        stored = self._redis.get(key)
        if stored is None:
            return False
        self._redis.delete(key)  # One-time use
        return secrets.compare_digest(stored.decode(), nonce)

    def cleanup(self) -> int:
        # Redis handles TTL expiry automatically
        return 0

Database Example

import secrets
import time
from pico_ioc import component
from pico_server_auth import ChallengeStore, ServerAuthSettings

@component(on_missing_selector=ChallengeStore)
class DatabaseChallengeStore:
    """Challenge store backed by a SQL database."""

    def __init__(self, settings: ServerAuthSettings, db_session):
        self._ttl = settings.challenge_ttl_seconds
        self._db = db_session

    def create(self, address: str) -> str:
        self.cleanup()
        nonce = secrets.token_hex(32)
        self._db.execute(
            "INSERT INTO challenges (address, nonce, created_at) "
            "VALUES (?, ?, ?) ON CONFLICT (address) DO UPDATE "
            "SET nonce = ?, created_at = ?",
            (address, nonce, time.time(), nonce, time.time()),
        )
        self._db.commit()
        return nonce

    def validate(self, address: str, nonce: str) -> bool:
        row = self._db.execute(
            "SELECT nonce, created_at FROM challenges WHERE address = ?",
            (address,),
        ).fetchone()
        if row is None:
            return False
        self._db.execute("DELETE FROM challenges WHERE address = ?", (address,))
        self._db.commit()
        stored_nonce, created_at = row
        if time.time() - created_at > self._ttl:
            return False
        return secrets.compare_digest(stored_nonce, nonce)

    def cleanup(self) -> int:
        cursor = self._db.execute(
            "DELETE FROM challenges WHERE created_at < ?",
            (time.time() - self._ttl,),
        )
        self._db.commit()
        return cursor.rowcount

Registration

pico-ioc uses on_missing_selector=ChallengeStore to register the default InMemoryChallengeStore only when no other implementation exists. By decorating your class with @component(on_missing_selector=ChallengeStore), your implementation takes priority.

Module discovery

Ensure your custom store's module is included in the pico-boot module_names list so pico-ioc discovers it.

Key Requirements

  • Nonces must be single-use: validate() must consume the nonce on first call.
  • Use secrets.compare_digest: Prevents timing attacks on nonce comparison.
  • Respect TTL: Expired nonces must be rejected even if they exist in storage.
  • Thread/async safety: If your store is accessed concurrently, ensure operations are atomic.