Skip to content

Architecture Overview — pico-sqlalchemy

pico-sqlalchemy is a thin integration layer that connects Pico-IoC's inversion-of-control container with SQLAlchemy's async session and transaction management. Its purpose is not to replace SQLAlchemy — but to ensure that repositories and domain services are executed inside explicit, async-native transactional boundaries, declared via annotations, consistently managed through Pico-IoC.


1. High-Level Design

             ┌─────────────────────────────┐
             │          SQLAlchemy         │
             │ (AsyncEngine / AsyncSession)│
             └──────────────┬──────────────┘
                  Async Transaction Wrapping
             ┌──────────────▼───────────────┐
             │       pico-sqlalchemy        │
             │ @transactional  @repository  │
             │ @query          Pagination   │
             └──────────────┬───────────────┘
                     IoC Resolution
             ┌──────────────▼───────────────┐
             │           Pico-IoC           │
             │ (Container / Scopes / DI)    │
             └──────────────┬───────────────┘
               Async Domain Services, Repos,
                    Aggregates, Logic

2. Component Model

pico-sqlalchemy registers the following components at startup:

┌─────────────────────────────────────────────────────────────────────┐
│                         Pico-IoC Container                         │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ SqlAlchemyFactory               │  @factory                      │
│  │   └─ @provides(SessionManager)  │  Creates SessionManager from   │
│  │                                 │  DatabaseSettings (singleton)   │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ PicoSqlAlchemyLifecycle         │  @component + @configure       │
│  │   └─ setup_database()           │  Runs all DatabaseConfigurers  │
│  │      collects: DatabaseConfigurer│  in priority order against     │
│  │      injects:  SessionManager   │  the engine                    │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ TransactionalInterceptor        │  @component (MethodInterceptor)│
│  │   injects: SessionManager       │  Opens/joins transactions for  │
│  │                                 │  @transactional, @repository,  │
│  │                                 │  @query methods                │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ RepositoryQueryInterceptor      │  @component (MethodInterceptor)│
│  │   injects: SessionManager       │  Executes SQL/expr queries     │
│  │                                 │  for @query methods only       │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ DatabaseSettings                │  @configured (prefix="database")│
│  │   url, echo, pool_size, ...     │  Loaded from config sources    │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ AppBase                         │  @component (singleton)        │
│  │   subclasses: DeclarativeBase   │  Central ORM model registry    │
│  └─────────────────────────────────┘                                │
│                                                                     │
│  ┌─────────────────────────────────┐                                │
│  │ SessionManager                  │  Created by factory (singleton)│
│  │   owns: AsyncEngine             │  NOT @component — no decorator │
│  │   owns: session factory         │  on the class itself           │
│  └─────────────────────────────────┘                                │
└─────────────────────────────────────────────────────────────────────┘

Key detail: SessionManager has no @component decorator. It is created by SqlAlchemyFactory via @provides(SessionManager, scope="singleton"). This is intentional — the factory controls its construction from DatabaseSettings.


3. Startup Sequence

1. Container scans modules
2. DatabaseSettings loaded from configuration (prefix="database")
3. SqlAlchemyFactory.create_session_manager(settings) → SessionManager
       │  Creates AsyncEngine + session factory
4. PicoSqlAlchemyLifecycle.setup_database(session_manager, configurers)
       │  Collects all DatabaseConfigurer implementations
       │  Sorts by priority (ascending)
       │  Calls configure_database(engine) on each
5. Application ready — interceptors, repositories, services available

4. Transaction Context (_tx_context)

pico-sqlalchemy uses a ContextVar to propagate the active session across async call chains:

_tx_context: ContextVar[TransactionContext | None]

This is separate from pico-ioc's scope system. It is a lightweight, per-async-task variable that stores the currently active AsyncSession wrapped in a TransactionContext.

How it works:

Service.create_user()                     ← @transactional
│  _tx_context = TransactionContext(session_A)
├─ Repository.find_by_name()              ← @repository (REQUIRED)
│  │  _tx_context.get() → session_A       ← Joins existing
│  │  (no new session created)
│  └─ returns result
├─ Repository.save()                      ← @repository (REQUIRED)
│  │  _tx_context.get() → session_A       ← Same session
│  └─ session_A.add(user)
└─ commit(session_A) or rollback
   _tx_context = None

Why not pico-ioc scopes? The _tx_context ContextVar provides transaction propagation semantics (REQUIRED, REQUIRES_NEW, etc.) that don't map to pico-ioc's scope lifecycle. A transaction may be suspended and restored (REQUIRES_NEW, NOT_SUPPORTED), which requires explicit save/restore of the context — something ContextVar handles naturally.


5. Interceptor Chain

pico-sqlalchemy uses two interceptors that work together via pico-ioc's AOP system:

For @repository methods (implicit transactions)

method call → TransactionalInterceptor → original method body
                    ├─ Opens REQUIRED Read-Write transaction
                    └─ method body executes with session available

For @query methods (declarative queries)

method call → TransactionalInterceptor → RepositoryQueryInterceptor
                    │                           │
                    ├─ Opens REQUIRED            ├─ Binds method params
                    │  Read-Only transaction     ├─ Builds SQL (expr or raw)
                    │                            ├─ Executes query
                    │                            ├─ Handles pagination
                    │                            └─ Returns mapped result
                    └─ method body is NEVER executed

The @query decorator chains both interceptors via @intercepted_by:

# Inside @query decorator (simplified)
step_1 = intercepted_by(TransactionalInterceptor)(func)   # Transaction layer
step_2 = intercepted_by(RepositoryQueryInterceptor)(step_1) # Query execution

Configuration priority

When multiple decorators apply to the same method, TransactionalInterceptor resolves the transaction configuration using this priority:

Priority Source Default Behavior
1 (Highest) @transactional metadata User-defined (explicit)
2 @query metadata read_only=True
3 (Lowest) @repository metadata read_only=False

6. Transaction Propagation Model

Supported propagation levels (modeled after Spring Data):

Propagation Behavior
REQUIRED Join existing or start new (default)
REQUIRES_NEW Suspend current, always start new
SUPPORTS Join if exists, else run without transaction
MANDATORY Must already be in a transaction
NOT_SUPPORTED Suspend any transaction, run non-transactional
NEVER Error if a transaction is active

Suspension mechanism: REQUIRES_NEW and NOT_SUPPORTED save the current _tx_context, set it to None, execute in a new context, then restore the original. This ensures the outer transaction is unaffected.

REQUIRES_NEW flow:
  _tx_context = ctx_A (outer)
  ├─ save ctx_A, set _tx_context = None
  ├─ create new session_B, _tx_context = ctx_B
  ├─ execute method with session_B
  ├─ commit/rollback session_B
  └─ restore _tx_context = ctx_A

Session lifecycle is fully deterministic:

begin → await work → await commit or await rollback → await close

Rollback logic is selective via: - rollback_for=(...) — exception types that trigger rollback (default: Exception) - no_rollback_for=(...) — exception types that skip rollback


7. Query Execution Model

The RepositoryQueryInterceptor supports two execution modes:

Expression mode (@query(expr="..."))

Requires @repository(entity=Model). Generates SQL automatically.

@query(expr="username = :username", unique=True)
SELECT * FROM users WHERE username = :username
                ├─ Parameters bound from method signature
                ├─ Dynamic sorting appended (if PageRequest with sorts)
                │  └─ Column names validated against entity.__table__.columns
                └─ unique=True → scalars().first() | default → scalars().all()

SQL mode (@query(sql="..."))

Full control over the query. Does not require entity binding.

@query(sql="SELECT u.name, count(p.id) FROM users u JOIN posts p ...")
                ├─ Parameters bound from method signature
                ├─ Dynamic sorting NOT supported (ValueError if attempted)
                │  └─ Security: prevents injection in raw SQL
                └─ Returns dict-like mappings (RowMapping)

Pagination flow (@query(..., paged=True))

@query(expr="active = true", paged=True)
async def find_active(self, page: PageRequest) → Page[User]:
1. Extract PageRequest from parameter named "page" (required name)
2. Build base SQL (expr or raw)
3. Execute COUNT(*) subquery → total_elements
4. Append LIMIT :_limit OFFSET :_offset
5. Execute paginated query → content rows
6. Return Page(content, total_elements, page, size)

Page[T] provides computed properties: total_pages, is_first, is_last.


8. Repository Model

Repositories are plain Python classes declared with @repository. They:

  • Receive dependencies via __init__ (constructor injection)
  • Run all public async methods inside transactional boundaries (implicit Read-Write)
  • Access the active async session using get_session(manager)
@repository(entity=User)
class UserRepository:
    def __init__(self, manager: SessionManager):
        self.manager = manager

    # Implicit Read-Write transaction (from @repository)
    async def save(self, user: User) -> User:
        session = get_session(self.manager)
        session.add(user)
        return user

    # Declarative Read-Only query (from @query)
    @query(expr="username = :username", unique=True)
    async def find_by_username(self, username: str) -> User | None:
        ...  # Body is never executed

No transactional code inside the repository. No global sessions. No shared state.


9. Scoping Model

pico-sqlalchemy does not introduce custom IoC scopes. Instead, it relies on transaction boundaries:

Scope Meaning
Transaction (via _tx_context) AsyncSession lifetime, per-async-task
Singleton SessionManager, AppBase, interceptors, factories
Request-specific (optional) Available if combined with pico-fastapi

Unlike pico-fastapi, there is no middleware layer. The container and interceptors drive the entire lifecycle.


10. Architectural Intent

pico-sqlalchemy exists to:

  • Provide declarative, Spring-style async transaction management for Python
  • Replace ad-hoc async with session... scattered across repositories
  • Centralize AsyncSession creation and lifecycle in a single place
  • Make transactional semantics explicit and testable
  • Ensure business logic is clean and free from persistence boilerplate

It does not attempt to:

  • Replace SQLAlchemy Async ORM or AsyncEngine
  • Change SQLAlchemy's session model
  • Hide transaction boundaries
  • Validate or transform query results (that is Pydantic's job)

11. When to Use

Use pico-sqlalchemy if:

  • Your application uses the SQLAlchemy Async ORM
  • You want clean repository/service layers
  • You prefer declarative transactions and queries
  • You want deterministic AsyncSession lifecycle
  • You value testability and DI patterns

Avoid pico-sqlalchemy if:

  • You are not using asyncio or the SQLAlchemy async extensions
  • You prefer manual session management
  • You only use SQLAlchemy Core with no ORM session lifecycle

12. Diagrams (Mermaid)

Transaction Propagation Decision Flow

flowchart TD
    START["transaction(propagation=?)"] --> CHECK{propagation}

    CHECK -->|REQUIRED| REQ_HAS{Active tx?}
    REQ_HAS -->|Yes| REQ_JOIN["Join existing session"]
    REQ_HAS -->|No| REQ_NEW["Start new transaction"]

    CHECK -->|REQUIRES_NEW| RN_HAS{Active tx?}
    RN_HAS -->|Yes| RN_SUSPEND["Suspend outer tx<br/>(_tx_context = None)"]
    RN_HAS -->|No| RN_SKIP["--"]
    RN_SUSPEND --> RN_NEW["Start new transaction"]
    RN_SKIP --> RN_NEW
    RN_NEW --> RN_RESTORE["Restore outer tx"]

    CHECK -->|MANDATORY| M_HAS{Active tx?}
    M_HAS -->|Yes| M_JOIN["Join existing session"]
    M_HAS -->|No| M_ERR["RuntimeError:<br/>MANDATORY propagation<br/>requires active transaction"]

    CHECK -->|NEVER| N_HAS{Active tx?}
    N_HAS -->|Yes| N_ERR["RuntimeError:<br/>NEVER propagation<br/>forbids active transaction"]
    N_HAS -->|No| N_SESSION["Yield non-transactional session"]

    CHECK -->|NOT_SUPPORTED| NS_HAS{Active tx?}
    NS_HAS -->|Yes| NS_SUSPEND["Suspend outer tx"]
    NS_HAS -->|No| NS_SKIP["--"]
    NS_SUSPEND --> NS_SESSION["Yield non-transactional session"]
    NS_SKIP --> NS_SESSION
    NS_SESSION --> NS_RESTORE["Restore outer tx (if suspended)"]

    CHECK -->|SUPPORTS| S_HAS{Active tx?}
    S_HAS -->|Yes| S_JOIN["Join existing session"]
    S_HAS -->|No| S_SESSION["Yield non-transactional session"]

Interceptor Chain for @query

sequenceDiagram
    participant Caller
    participant TI as TransactionalInterceptor
    participant RQI as RepositoryQueryInterceptor
    participant SM as SessionManager
    participant DB as Database

    Caller->>TI: invoke(ctx, call_next)
    TI->>TI: Detect @query meta (read_only=True)
    TI->>SM: transaction(REQUIRED, read_only=True)
    SM-->>TI: AsyncSession (new or joined)

    TI->>RQI: call_next(ctx)
    RQI->>RQI: Detect @query meta
    RQI->>RQI: Bind method params to dict
    RQI->>SM: get_session(manager)
    SM-->>RQI: AsyncSession

    alt expr mode
        RQI->>RQI: Build SELECT * FROM table WHERE expr
        RQI->>RQI: Validate sort fields against entity columns
    else sql mode
        RQI->>RQI: Use raw SQL (reject dynamic sorts)
    end

    alt paged=True
        RQI->>DB: SELECT COUNT(*) FROM (query)
        DB-->>RQI: total_elements
        RQI->>DB: query LIMIT :_limit OFFSET :_offset
        DB-->>RQI: rows
        RQI-->>TI: Page(content, total, page, size)
    else paged=False
        RQI->>DB: Execute query
        DB-->>RQI: rows
        RQI-->>TI: rows or single row (unique)
    end

    TI-->>Caller: result
    Note over TI,SM: Session closed (read_only: no commit)

Startup Sequence

flowchart TD
    A["pico_ioc.init(modules=['pico_sqlalchemy', ...])"] --> B["Container scans modules"]
    B --> C["DatabaseSettings populated<br/>from config prefix 'database'"]
    C --> D["SqlAlchemyFactory instantiated"]
    D --> E["create_session_manager(settings)<br/>creates AsyncEngine + session factory"]
    E --> F["SessionManager registered<br/>as singleton (no @component)"]
    F --> G["TransactionalInterceptor registered<br/>(@component, injects SessionManager)"]
    G --> H["RepositoryQueryInterceptor registered<br/>(@component, injects SessionManager)"]
    H --> I["PicoSqlAlchemyLifecycle.setup_database()"]
    I --> J["Collect all DatabaseConfigurer beans"]
    J --> K["Sort by priority (ascending)"]
    K --> L["Call configure_database(engine) on each"]
    L --> M["Application ready"]