Skip to content

Transaction Management

Pico-SQLAlchemy provides a robust transaction management system inspired by enterprise frameworks (like Spring Data), but adapted for Python's asyncio ecosystem.

It supports Implicit Transactions (Zero-Boilerplate) and Explicit Transactions (Fine-grained control).


1. Implicit Transactions (Zero-Boilerplate)

To reduce code verbosity, pico-sqlalchemy applies transactional behavior automatically based on component types.

Repositories (@repository)

By default, any public async method within a class decorated with @repository runs inside a Read-Write transaction.

  • Propagation: REQUIRED (Joins existing or creates new).
  • Mode: Read-Write.
  • Use Case: Standard CRUD operations (save, update, delete).
@repository
class UserRepository:
    # Implicitly transactional (Read-Write)
    async def save(self, user: User):
        ...

Declarative Queries (@query)

Methods decorated with @query are automatically wrapped in a Read-Only transaction.

  • Propagation: REQUIRED (Joins existing or creates new).
  • Mode: Read-Only.
  • Use Case: Fetching data efficiently.
@repository(entity=User)
class UserRepository:
    # Implicitly transactional (Read-Only)
    @query(expr="active = true")
    async def find_active(self):
        ...

2. Configuration Priority

Since multiple rules can apply to a single method (e.g., a method in a repository that also has @transactional), the library follows a strict priority order (highest wins).

Priority Decorator Default Behavior Description
1 (High) @transactional User Defined Explicit configuration always overrides implicit rules.
2 @query read_only=True Specific query definition implies read-only intent.
3 (Low) @repository read_only=False General repository methods assume write capability by default.

Examples

Scenario A: Overriding Repository Default You want a complex reporting method inside a repository to be Read-Only for performance, but it doesn't use @query.

@repository
class ReportRepository:
    # Default is Read-Write...

    # Override to Read-Only!
    @transactional(read_only=True)
    async def generate_complex_stats(self):
        # ... logic ...

Scenario B: Overriding Query Default Rare case where a @query (usually read-only) involves a stored procedure that writes data.

@query(sql="CALL update_stats()")
@transactional(read_only=False)  # Force Read-Write
async def update_statistics(self):
    ...

3. The @transactional Decorator

Use @transactional when you need to define boundaries in your Service Layer or when you need to override defaults in Repositories.

from pico_sqlalchemy import transactional

class UserService:
    @transactional(propagation="REQUIRES_NEW")
    async def create_user(self, name: str):
        # ...

Configuration Options

Parameter Type Default Description
propagation str "REQUIRED" Defines behavior regarding existing transactions.
read_only bool False If True, avoids explicit commit (optimization).
isolation_level str None Sets DB isolation (e.g., "SERIALIZABLE").
rollback_for tuple (Exception,) Exception types that trigger rollback.
no_rollback_for tuple () Exception types that ignore rollback.

4. Propagation Levels

The library implements strict propagation logic:

  • REQUIRED (Default): Joins an existing transaction or creates a new one.
  • REQUIRES_NEW: Suspends the current transaction (if any) and starts a fresh, independent transaction.
  • SUPPORTS: Joins an existing transaction if available; otherwise, executes without a transaction context.
  • MANDATORY: Requires an existing transaction; raises RuntimeError otherwise.
  • NEVER: Requires no active transaction; raises RuntimeError if one exists.
  • NOT_SUPPORTED: Suspends the current transaction and executes non-transactionally.

5. Rollback Rules

By default, any exception (inheriting from Exception) triggers a rollback().

Customizing Rollback

@transactional(
    rollback_for=(MyCriticalError,),
    no_rollback_for=(ValidationWarning,)
)
async def business_logic():
    # ...

Note: If an exception matches no_rollback_for, the library skips the explicit rollback(), but the commit() is also skipped (the session closes naturally).


6. Accessing the Session

Within any transactional method (Implicit or Explicit), the AsyncSession is bound to the current context.

from pico_sqlalchemy import SessionManager, get_session

@component
class MyService:
    def __init__(self, sm: SessionManager):
        self.sm = sm

    @transactional
    async def do_work(self):
        # Safely retrieve the current session
        session = get_session(self.sm)
        # ... use session ...

Auto-generated API

pico_sqlalchemy.interceptor

AOP interceptor that manages transaction boundaries.

TransactionalInterceptor is the first link in the interceptor chain for every @transactional, @repository, and @query method. It inspects decorator metadata to determine the correct propagation mode and opens (or joins) a transaction via SessionManager.transaction() before delegating to the next interceptor or the original method body.

Priority resolution order (highest wins):

  1. @transactional metadata (explicit, user-defined).
  2. @query metadata (implicit read_only=True).
  3. @repository metadata (implicit read_only=False).

TransactionalInterceptor

Bases: MethodInterceptor

Opens or joins a transaction for intercepted methods.

Registered as a @component and injected with a SessionManager. Applied to methods via @intercepted_by(TransactionalInterceptor) (which is done automatically by the @transactional, @repository, and @query decorators).

The interceptor determines the transaction configuration from decorator metadata in the following priority order:

  1. @transactional (highest) -- user-defined propagation/read_only.
  2. @query -- REQUIRED propagation, read_only=True.
  3. @repository -- REQUIRED propagation, read_only=False.

If none of these markers are present, the method is invoked directly without opening a transaction.

Parameters:

Name Type Description Default
session_manager SessionManager

The SessionManager singleton used to open or join transactions.

required
Source code in src/pico_sqlalchemy/interceptor.py
@component
class TransactionalInterceptor(MethodInterceptor):
    """Opens or joins a transaction for intercepted methods.

    Registered as a ``@component`` and injected with a ``SessionManager``.
    Applied to methods via ``@intercepted_by(TransactionalInterceptor)``
    (which is done automatically by the ``@transactional``,
    ``@repository``, and ``@query`` decorators).

    The interceptor determines the transaction configuration from
    decorator metadata in the following priority order:

    1. ``@transactional`` (highest) -- user-defined propagation/read_only.
    2. ``@query`` -- ``REQUIRED`` propagation, ``read_only=True``.
    3. ``@repository`` -- ``REQUIRED`` propagation, ``read_only=False``.

    If none of these markers are present, the method is invoked directly
    without opening a transaction.

    Args:
        session_manager: The ``SessionManager`` singleton used to open
            or join transactions.
    """

    def __init__(self, session_manager: SessionManager):
        self.sm = session_manager

    async def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
        """Intercept the method call and wrap it in a transaction if needed.

        Args:
            ctx: The AOP method context containing target class, method
                name, and arguments.
            call_next: Callback to invoke the next interceptor or the
                original method.

        Returns:
            The return value of the intercepted method.
        """
        func = getattr(ctx.cls, ctx.name, None)
        meta = getattr(func, TRANSACTIONAL_META, None)

        if not meta:
            is_query = getattr(func, QUERY_META, None) is not None
            repo_meta = getattr(ctx.cls, REPOSITORY_META, None)
            is_repository = repo_meta is not None

            if is_query:
                meta = {
                    "propagation": "REQUIRED",
                    "read_only": True,
                    "isolation_level": None,
                    "rollback_for": (Exception,),
                    "no_rollback_for": (),
                }
            elif is_repository:
                meta = {
                    "propagation": "REQUIRED",
                    "read_only": False,
                    "isolation_level": None,
                    "rollback_for": (Exception,),
                    "no_rollback_for": (),
                }

        if not meta:
            result = call_next(ctx)
            if inspect.isawaitable(result):
                return await result
            return result

        propagation = meta["propagation"]
        read_only = meta["read_only"]
        isolation = meta["isolation_level"]
        rollback_for = meta["rollback_for"]
        no_rollback_for = meta["no_rollback_for"]

        async with self.sm.transaction(
            propagation=propagation,
            read_only=read_only,
            isolation_level=isolation,
            rollback_for=rollback_for,
            no_rollback_for=no_rollback_for,
        ):
            result = call_next(ctx)
            if inspect.isawaitable(result):
                result = await result
            return result

invoke(ctx, call_next) async

Intercept the method call and wrap it in a transaction if needed.

Parameters:

Name Type Description Default
ctx MethodCtx

The AOP method context containing target class, method name, and arguments.

required
call_next Callable[[MethodCtx], Any]

Callback to invoke the next interceptor or the original method.

required

Returns:

Type Description
Any

The return value of the intercepted method.

Source code in src/pico_sqlalchemy/interceptor.py
async def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
    """Intercept the method call and wrap it in a transaction if needed.

    Args:
        ctx: The AOP method context containing target class, method
            name, and arguments.
        call_next: Callback to invoke the next interceptor or the
            original method.

    Returns:
        The return value of the intercepted method.
    """
    func = getattr(ctx.cls, ctx.name, None)
    meta = getattr(func, TRANSACTIONAL_META, None)

    if not meta:
        is_query = getattr(func, QUERY_META, None) is not None
        repo_meta = getattr(ctx.cls, REPOSITORY_META, None)
        is_repository = repo_meta is not None

        if is_query:
            meta = {
                "propagation": "REQUIRED",
                "read_only": True,
                "isolation_level": None,
                "rollback_for": (Exception,),
                "no_rollback_for": (),
            }
        elif is_repository:
            meta = {
                "propagation": "REQUIRED",
                "read_only": False,
                "isolation_level": None,
                "rollback_for": (Exception,),
                "no_rollback_for": (),
            }

    if not meta:
        result = call_next(ctx)
        if inspect.isawaitable(result):
            return await result
        return result

    propagation = meta["propagation"]
    read_only = meta["read_only"]
    isolation = meta["isolation_level"]
    rollback_for = meta["rollback_for"]
    no_rollback_for = meta["no_rollback_for"]

    async with self.sm.transaction(
        propagation=propagation,
        read_only=read_only,
        isolation_level=isolation,
        rollback_for=rollback_for,
        no_rollback_for=no_rollback_for,
    ):
        result = call_next(ctx)
        if inspect.isawaitable(result):
            result = await result
        return result