Skip to content

Declarative Repositories & Queries

Pico-SQLAlchemy provides a powerful repository pattern that combines Implicit Transactions with Declarative Query Execution.

This allows you to write data access layers with minimal boilerplate, focusing only on the query logic or the business operation.


1. The @repository Decorator

The @repository decorator marks a class as a Pico-IoC component (singleton by default).

Automatic Transaction Management

Crucially, any public async method defined in a @repository class is automatically wrapped in a Read-Write Transaction (propagation="REQUIRED").

This means you do not need to manually add @transactional to your CRUD methods.

from pico_sqlalchemy import repository, SessionManager, get_session

@repository
class UserRepository:
    def __init__(self, manager: SessionManager):
        self.manager = manager

    # Implicitly transactional (Read-Write)
    async def save(self, user: User) -> User:
        session = get_session(self.manager)
        session.add(user)
        return user

Entity Binding

You can optionally bind a repository to a specific SQLAlchemy model using the entity parameter. This is required if you want to use Expression Mode in @query.

@repository(entity=User)
class UserRepository:
    ...

2. Declarative Queries (@query)

The @query decorator allows you to define database queries declaratively.

  • No Implementation Needed: The method body is ignored. The library intercepts the call, binds arguments, executes the query, and maps the result.
  • Implicit Read-Only Transaction: All @query methods run automatically in a Read-Only transaction for performance safety.

Mode A: Expression Mode (expr)

This is the most concise mode. It requires @repository(entity=Model). You only provide the WHERE clause.

Features:

  • Automatic SELECT * FROM table generation.
  • Supports Dynamic Sorting via PageRequest.
@repository(entity=User)
class UserRepository:

    # Effectively runs: SELECT * FROM users WHERE username = :username
    @query(expr="username = :username", unique=True)
    async def find_by_username(self, username: str) -> User | None:
        ...

    # Effectively runs: SELECT * FROM users WHERE active = true
    @query(expr="active = true")
    async def find_all_active(self) -> list[User]:
        ...

Mode B: SQL Mode (sql)

Use this for full control, joins, raw SQL, or complex projections. It does not require an entity binding on the repository.

Features:

  • Full control over the SQL string.
  • Does NOT support Dynamic Sorting via PageRequest. You must write the ORDER BY clause yourself.

Usage:

You can use the standard @query(sql="...") syntax or the @query.sql("...") shortcut.

@repository
class StatsRepository:

    # Option 1: Standard syntax
    @query(sql="SELECT count(*) as total FROM users")
    async def count_users(self) -> int:
        ...

    # Option 2: Shortcut syntax (Cleaner)
    @query.sql(
        "SELECT u.name, p.title FROM users u "
        "JOIN posts p ON u.id = p.user_id "
        "WHERE u.id = :uid "
        "ORDER BY u.name"
    )
    async def get_user_posts(self, uid: int) -> list[dict]:
        ...

3. Pagination & Sorting

Pico-SQLAlchemy provides built-in support for offset-based pagination and dynamic sorting.

Basic Pagination

  1. Set paged=True in the @query decorator.
  2. Add a parameter explicitly named page of type PageRequest to your method signature.
  3. Set the return type to Page[T].

The library automatically generates the count query and applies limits/offsets.

Important: The argument must be named page. Using other names (e.g., req, pagination) will result in a runtime error or ignored pagination.

from pico_sqlalchemy import Page, PageRequest

@repository(entity=User)
class UserRepository:

    @query(expr="active = true", paged=True)
    async def find_active_paged(self, page: PageRequest) -> Page[User]:
        ...

# Usage: Get page 0, size 10
# await repo.find_active_paged(PageRequest(page=0, size=10))

Dynamic Sorting

You can request dynamic sorting by passing a list of Sort objects within the PageRequest.

⚠️ Warning: Expression Mode Only

Dynamic sorting (injecting ORDER BY clauses based on PageRequest) works only in Expression Mode (expr).

If you use SQL Mode (sql) and provide a PageRequest with sorts, the library will raise a ValueError. This is a security measure to prevent ambiguity and injection risks in raw SQL.

Security Note: To prevent SQL Injection, dynamic sorting in expr mode validates that the requested sort fields exist in the underlying SQLAlchemy model columns. If a field is invalid, a ValueError is raised.

from pico_sqlalchemy import PageRequest, Sort

# Usage: Sort by name ASC, then age DESC
request = PageRequest(
    page=0, 
    size=20, 
    sorts=[
        Sort(field="name", direction="ASC"),
        Sort(field="age", direction="DESC")
    ]
)

# Works automatically because this method uses @query(expr=...)
await repo.find_active_paged(request)
Parameter Description
field The name of the column in the model (e.g., "username").
direction "ASC" (default) or "DESC".

4. Return Types & Mapping

The @query interceptor is smart about return types:

  • unique=True: Returns a single scalar/object or None (uses result.scalars().first() or mappings).
  • Default (List): Returns a list of scalars/objects (uses result.scalars().all() or mappings).
  • Raw SQL: If the query returns multiple columns (not a mapped entity), it returns dictionary-like mappings.

5. Summary of Behavior

Decorator Transaction Mode Execution Logic
@repository (Plain Method) Read-Write Executes your Python code body.
@query Read-Only Ignores body; executes SQL/Expr automatically.
@transactional Explicit Executes your Python code body with custom config.

Auto-generated API

pico_sqlalchemy.repository_interceptor

AOP interceptor that executes declarative @query methods.

RepositoryQueryInterceptor is the second link in the interceptor chain for @query-decorated methods. By the time it runs, the TransactionalInterceptor has already ensured that a transaction (and therefore an AsyncSession) is active.

Two execution modes are supported:

  • Expression mode (@query(expr="...")) -- generates SELECT * FROM <table> WHERE <expr> using the entity's __tablename__. Dynamic sorting via PageRequest.sorts is validated against the entity's column set to prevent injection.

  • SQL mode (@query(sql="...")) -- executes the raw SQL string verbatim. Dynamic sorting is not supported in this mode to prevent SQL injection.

Both modes support pagination (paged=True) with automatic COUNT(*) and LIMIT/OFFSET handling.

RepositoryQueryInterceptor

Bases: MethodInterceptor

Executes declarative queries for @query-decorated methods.

This interceptor runs after TransactionalInterceptor in the AOP chain. It checks for QUERY_META on the target method; if absent, it simply delegates to call_next. When present, the method body is never executed -- instead the interceptor builds and runs the SQL query, binding method parameters automatically.

Two execution modes:

  • expr -- generates SELECT * FROM <table> WHERE <expr> using the entity from @repository(entity=...).
  • sql -- executes the provided raw SQL verbatim.

Both modes support pagination (paged=True) and unique result (unique=True).

Parameters:

Name Type Description Default
session_manager SessionManager

The SessionManager singleton, used to obtain the current AsyncSession via get_session().

required
Source code in src/pico_sqlalchemy/repository_interceptor.py
@component
class RepositoryQueryInterceptor(MethodInterceptor):
    """Executes declarative queries for ``@query``-decorated methods.

    This interceptor runs **after** ``TransactionalInterceptor`` in the
    AOP chain.  It checks for ``QUERY_META`` on the target method; if
    absent, it simply delegates to ``call_next``.  When present, the
    method body is **never executed** -- instead the interceptor builds
    and runs the SQL query, binding method parameters automatically.

    Two execution modes:

    * ``expr`` -- generates ``SELECT * FROM <table> WHERE <expr>`` using
      the entity from ``@repository(entity=...)``.
    * ``sql`` -- executes the provided raw SQL verbatim.

    Both modes support pagination (``paged=True``) and unique result
    (``unique=True``).

    Args:
        session_manager: The ``SessionManager`` singleton, used to
            obtain the current ``AsyncSession`` via ``get_session()``.
    """

    def __init__(self, session_manager: SessionManager):
        self.session_manager = session_manager

    async def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
        """Intercept the method and execute the declarative query if applicable.

        Args:
            ctx: The AOP method context.
            call_next: Callback to the next interceptor or the original
                method.

        Returns:
            Query results -- a ``Page``, a list of ``RowMapping``, or a
            single ``RowMapping`` / ``None`` depending on query options.

        Raises:
            RuntimeError: ``"@query with expr requires @repository(entity=...)"``
                if expression mode is used without an entity.
            TypeError: ``"Paged query requires a 'page: PageRequest' parameter"``
                if ``paged=True`` but no ``PageRequest`` argument is found.
            ValueError: ``"Dynamic sorting via PageRequest is not supported
                in SQL mode"`` if sorts are provided in SQL mode.
            ValueError: ``"Invalid sort field: <field>"`` if a sort field
                is not a valid column on the entity.
        """
        func = getattr(ctx.cls, ctx.name, None)
        meta = getattr(func, QUERY_META, None)

        if meta is None:
            return await self._call_next_async(ctx, call_next)

        session = get_session(self.session_manager)
        params = self._bind_params(func, ctx.args, ctx.kwargs)
        repo_meta = getattr(ctx.cls, REPOSITORY_META, {}) or {}
        entity = repo_meta.get("entity")
        mode = meta.get("mode")

        if mode == "sql":
            return await self._execute_sql(session, meta, params)
        if mode == "expr":
            return await self._execute_expr(session, meta, params, entity)
        raise RuntimeError(f"Unsupported query mode: {mode!r}")

    async def _call_next_async(self, ctx: MethodCtx, call_next: Callable) -> Any:
        """Invoke the next interceptor or method, awaiting if needed."""
        result = call_next(ctx)
        if inspect.isawaitable(result):
            result = await result
        return result

    def _bind_params(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...],
        kwargs: Mapping[str, Any],
    ) -> dict[str, Any]:
        """Bind positional and keyword arguments to a named parameter dict.

        Uses ``inspect.signature`` to map *args* and *kwargs* onto the
        method's declared parameters (excluding ``self``).

        Args:
            func: The original method (unbound).
            args: Positional arguments from the invocation.
            kwargs: Keyword arguments from the invocation.

        Returns:
            A ``dict`` mapping parameter names to their values, suitable
            for use as SQLAlchemy bind parameters.
        """
        sig = inspect.signature(func)
        bound = sig.bind_partial(None, *args, **kwargs)
        bound.apply_defaults()
        arguments = dict(bound.arguments)
        arguments.pop("self", None)
        return arguments

    async def _execute_sql(
        self,
        session: Any,
        meta: dict[str, Any],
        params: dict[str, Any],
    ) -> Any:
        """Execute a raw-SQL-mode query.

        Dynamic sorting via ``PageRequest.sorts`` is rejected in this
        mode to prevent SQL injection.

        Raises:
            ValueError: If ``PageRequest.sorts`` is non-empty.
        """
        sql = meta.get("sql")
        unique = meta.get("unique", False)
        paged = meta.get("paged", False)

        page_req = _extract_page_request(params, paged)
        if page_req and page_req.sorts:
            raise ValueError(
                "Dynamic sorting via PageRequest is not supported in SQL mode. "
                "Please add ORDER BY clause to your SQL string directly."
            )

        if paged:
            return await _execute_paginated_query(session, sql, params, page_req)
        return await _execute_simple_query(session, sql, params, unique)

    async def _execute_expr(
        self,
        session: Any,
        meta: dict[str, Any],
        params: dict[str, Any],
        entity: Any,
    ) -> Any:
        """Execute an expression-mode query.

        Builds ``SELECT * FROM <table> WHERE <expr>`` using the entity's
        ``__tablename__``.  Dynamic sorting is supported and validated
        against the entity's column set.

        Raises:
            RuntimeError: If *entity* is ``None`` or lacks
                ``__tablename__``.
            ValueError: If a sort field is not a valid column.
        """
        expr = meta.get("expr")
        unique = meta.get("unique", False)
        paged = meta.get("paged", False)

        self._validate_entity(entity)
        base_sql = self._build_base_sql(entity, expr)

        page_req = _extract_page_request(params, paged)
        if page_req and page_req.sorts:
            valid_columns = {c.name for c in entity.__table__.columns}
            base_sql += _build_order_by_clause(page_req, valid_columns)

        if paged:
            return await _execute_paginated_query(session, base_sql, params, page_req)
        return await _execute_simple_query(session, base_sql, params, unique)

    def _validate_entity(self, entity: Any) -> None:
        """Ensure *entity* is set and has a ``__tablename__`` attribute.

        Raises:
            RuntimeError: ``"@query with expr requires @repository(entity=...)
                and an entity with __tablename__"``
        """
        if entity is None or not hasattr(entity, "__tablename__"):
            raise RuntimeError("@query with expr requires @repository(entity=...) and an entity with __tablename__")

    def _build_base_sql(self, entity: Any, expr: str | None) -> str:
        """Build the base ``SELECT`` query from the entity and optional expression.

        Args:
            entity: The SQLAlchemy model class (must have ``__tablename__``).
            expr: Optional WHERE-clause expression (e.g. ``"name = :name"``).

        Returns:
            A SQL string like ``"SELECT * FROM users WHERE name = :name"``.
        """
        table_name = entity.__tablename__
        base_sql = f"SELECT * FROM {table_name}"
        if expr:
            base_sql += f" WHERE {expr}"
        return base_sql

invoke(ctx, call_next) async

Intercept the method and execute the declarative query if applicable.

Parameters:

Name Type Description Default
ctx MethodCtx

The AOP method context.

required
call_next Callable[[MethodCtx], Any]

Callback to the next interceptor or the original method.

required

Returns:

Type Description
Any

Query results -- a Page, a list of RowMapping, or a

Any

single RowMapping / None depending on query options.

Raises:

Type Description
RuntimeError

"@query with expr requires @repository(entity=...)" if expression mode is used without an entity.

TypeError

"Paged query requires a 'page: PageRequest' parameter" if paged=True but no PageRequest argument is found.

ValueError

"Dynamic sorting via PageRequest is not supported in SQL mode" if sorts are provided in SQL mode.

ValueError

"Invalid sort field: <field>" if a sort field is not a valid column on the entity.

Source code in src/pico_sqlalchemy/repository_interceptor.py
async def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
    """Intercept the method and execute the declarative query if applicable.

    Args:
        ctx: The AOP method context.
        call_next: Callback to the next interceptor or the original
            method.

    Returns:
        Query results -- a ``Page``, a list of ``RowMapping``, or a
        single ``RowMapping`` / ``None`` depending on query options.

    Raises:
        RuntimeError: ``"@query with expr requires @repository(entity=...)"``
            if expression mode is used without an entity.
        TypeError: ``"Paged query requires a 'page: PageRequest' parameter"``
            if ``paged=True`` but no ``PageRequest`` argument is found.
        ValueError: ``"Dynamic sorting via PageRequest is not supported
            in SQL mode"`` if sorts are provided in SQL mode.
        ValueError: ``"Invalid sort field: <field>"`` if a sort field
            is not a valid column on the entity.
    """
    func = getattr(ctx.cls, ctx.name, None)
    meta = getattr(func, QUERY_META, None)

    if meta is None:
        return await self._call_next_async(ctx, call_next)

    session = get_session(self.session_manager)
    params = self._bind_params(func, ctx.args, ctx.kwargs)
    repo_meta = getattr(ctx.cls, REPOSITORY_META, {}) or {}
    entity = repo_meta.get("entity")
    mode = meta.get("mode")

    if mode == "sql":
        return await self._execute_sql(session, meta, params)
    if mode == "expr":
        return await self._execute_expr(session, meta, params, entity)
    raise RuntimeError(f"Unsupported query mode: {mode!r}")