Skip to content

API Reference

Complete API reference for Pico-Celery, auto-generated from source code docstrings.

Module Overview

Module Description
pico_celery Package exports and public API
pico_celery.config Celery configuration settings
pico_celery.decorators @task and @celery decorators
pico_celery.client Celery client wrapper
pico_celery.factory Container integration factory
pico_celery.registrar Task registrar for auto-discovery

pico_celery

pico_celery

Async-first Celery integration for pico-ioc.

pico-celery provides dependency-injected Celery task workers and declarative task clients, bridging pico-ioc's inversion-of-control container with Celery 5 distributed task execution.

Worker side

Use @task on async def methods inside @component classes to define tasks that are automatically discovered and registered by PicoTaskRegistrar.

Client side

Use @celery on a class and @send_task on its methods to create declarative, injectable clients whose calls are transparently converted into celery_app.send_task() invocations by CeleryClientInterceptor.

Example

.. code-block:: python

from pico_ioc import component
from pico_celery import task, celery, send_task

@component(scope="prototype")
class EmailTasks:
    def __init__(self, mailer: MailerService):
        self.mailer = mailer

    @task(name="tasks.send_email")
    async def send_email(self, to: str, body: str):
        await self.mailer.send(to, body)

@celery
class EmailClient:
    @send_task(name="tasks.send_email")
    def send_email(self, to: str, body: str):
        pass  # body is never executed

CeleryClient

Bases: Protocol

Marker protocol for Celery client classes.

Classes decorated with @celery may optionally implement this protocol. It carries no methods -- its sole purpose is to provide a type that can be used for isinstance checks and type-hint annotations.

Source code in src/pico_celery/client.py
@runtime_checkable
class CeleryClient(Protocol):
    """Marker protocol for Celery client classes.

    Classes decorated with ``@celery`` may optionally implement this
    protocol. It carries no methods -- its sole purpose is to provide a
    type that can be used for ``isinstance`` checks and type-hint
    annotations.
    """

    pass

CeleryClientInterceptor

Bases: MethodInterceptor

AOP interceptor that converts method calls into send_task invocations.

Registered as a pico-ioc @component and injected with the Celery application. When a @send_task-decorated method is called, this interceptor reads the stored metadata and delegates to celery_app.send_task().

Parameters:

Name Type Description Default
celery_app Celery

The Celery application instance provided by CeleryFactory.

required
Source code in src/pico_celery/client.py
@component
class CeleryClientInterceptor(MethodInterceptor):
    """AOP interceptor that converts method calls into ``send_task`` invocations.

    Registered as a pico-ioc ``@component`` and injected with the
    ``Celery`` application. When a ``@send_task``-decorated method is
    called, this interceptor reads the stored metadata and delegates to
    ``celery_app.send_task()``.

    Args:
        celery_app: The ``Celery`` application instance provided by
            ``CeleryFactory``.
    """

    def __init__(self, celery_app: Celery):
        self._celery = celery_app

    def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
        """Intercept a method call and dispatch it as a Celery task.

        If the called method carries ``@send_task`` metadata, the
        interceptor extracts the task name and options and calls
        ``celery_app.send_task()``. Otherwise the call is forwarded to
        the next handler in the interceptor chain.

        Args:
            ctx: The method invocation context containing the class,
                method name, positional args, and keyword args.
            call_next: Callable to invoke the next interceptor or the
                original method.

        Returns:
            A ``celery.result.AsyncResult`` when the method is a
            ``@send_task`` sender, or the result of *call_next* for
            non-sender methods.
        """
        try:
            original_func = getattr(ctx.cls, ctx.name)
            meta = getattr(original_func, PICO_CELERY_SENDER_META, None)
        except AttributeError:
            meta = None

        if not meta:
            return call_next(ctx)

        task_name = meta["name"]
        options = meta.get("options", {})

        return self._celery.send_task(task_name, args=ctx.args, kwargs=ctx.kwargs, **options)

invoke(ctx, call_next)

Intercept a method call and dispatch it as a Celery task.

If the called method carries @send_task metadata, the interceptor extracts the task name and options and calls celery_app.send_task(). Otherwise the call is forwarded to the next handler in the interceptor chain.

Parameters:

Name Type Description Default
ctx MethodCtx

The method invocation context containing the class, method name, positional args, and keyword args.

required
call_next Callable[[MethodCtx], Any]

Callable to invoke the next interceptor or the original method.

required

Returns:

Type Description
Any

A celery.result.AsyncResult when the method is a

Any

@send_task sender, or the result of call_next for

Any

non-sender methods.

Source code in src/pico_celery/client.py
def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
    """Intercept a method call and dispatch it as a Celery task.

    If the called method carries ``@send_task`` metadata, the
    interceptor extracts the task name and options and calls
    ``celery_app.send_task()``. Otherwise the call is forwarded to
    the next handler in the interceptor chain.

    Args:
        ctx: The method invocation context containing the class,
            method name, positional args, and keyword args.
        call_next: Callable to invoke the next interceptor or the
            original method.

    Returns:
        A ``celery.result.AsyncResult`` when the method is a
        ``@send_task`` sender, or the result of *call_next* for
        non-sender methods.
    """
    try:
        original_func = getattr(ctx.cls, ctx.name)
        meta = getattr(original_func, PICO_CELERY_SENDER_META, None)
    except AttributeError:
        meta = None

    if not meta:
        return call_next(ctx)

    task_name = meta["name"]
    options = meta.get("options", {})

    return self._celery.send_task(task_name, args=ctx.args, kwargs=ctx.kwargs, **options)

CelerySettings dataclass

Settings required to create a Celery application instance.

Fields are populated from the pico-ioc configuration source under the "celery" prefix. For example, a DictSource entry of {"celery": {"broker_url": "redis://..."}} maps to CelerySettings.broker_url.

Attributes:

Name Type Description
broker_url str

URL of the message broker (e.g. "redis://localhost:6379/0").

backend_url str

URL of the result backend (e.g. "redis://localhost:6379/1").

task_track_started bool

When True, Celery reports a STARTED state when a worker begins executing a task. Defaults to True.

Example

.. code-block:: python

from pico_ioc import configuration, DictSource

config = configuration(DictSource({
    "celery": {
        "broker_url": "redis://localhost:6379/0",
        "backend_url": "redis://localhost:6379/1",
    }
}))
Source code in src/pico_celery/config.py
@configured(target="self", prefix="celery", mapping="tree")
@dataclass
class CelerySettings:
    """Settings required to create a Celery application instance.

    Fields are populated from the pico-ioc configuration source under
    the ``"celery"`` prefix. For example, a ``DictSource`` entry of
    ``{"celery": {"broker_url": "redis://..."}}`` maps to
    ``CelerySettings.broker_url``.

    Attributes:
        broker_url: URL of the message broker
            (e.g. ``"redis://localhost:6379/0"``).
        backend_url: URL of the result backend
            (e.g. ``"redis://localhost:6379/1"``).
        task_track_started: When ``True``, Celery reports a ``STARTED``
            state when a worker begins executing a task. Defaults to
            ``True``.

    Example:
        .. code-block:: python

            from pico_ioc import configuration, DictSource

            config = configuration(DictSource({
                "celery": {
                    "broker_url": "redis://localhost:6379/0",
                    "backend_url": "redis://localhost:6379/1",
                }
            }))
    """

    broker_url: str
    backend_url: str

    task_track_started: bool = True

CeleryFactory

IoC factory that provides a singleton Celery application.

Registered with pico-ioc via the @factory decorator. Its create_celery_app method is discovered automatically and called once to produce the shared Celery instance.

Example

The factory is auto-discovered; no manual instantiation is needed. Simply inject Celery wherever required:

.. code-block:: python

from celery import Celery
from pico_ioc import component

@component
class MyService:
    def __init__(self, app: Celery):
        self._app = app
Source code in src/pico_celery/factory.py
@factory
class CeleryFactory:
    """IoC factory that provides a singleton ``Celery`` application.

    Registered with pico-ioc via the ``@factory`` decorator. Its
    ``create_celery_app`` method is discovered automatically and called
    once to produce the shared ``Celery`` instance.

    Example:
        The factory is auto-discovered; no manual instantiation is needed.
        Simply inject ``Celery`` wherever required:

        .. code-block:: python

            from celery import Celery
            from pico_ioc import component

            @component
            class MyService:
                def __init__(self, app: Celery):
                    self._app = app
    """

    @provides(Celery, scope="singleton")
    def create_celery_app(self, settings: CelerySettings) -> Celery:
        """Create and configure a ``Celery`` application instance.

        Args:
            settings: A ``CelerySettings`` instance populated from the
                pico-ioc configuration tree.

        Returns:
            A fully configured ``Celery`` application with the broker,
            backend, and tracking options applied.
        """
        celery_app = Celery(
            "pico_celery_tasks",
            broker=settings.broker_url,
            backend=settings.backend_url,
        )
        celery_app.conf.update(
            task_track_started=settings.task_track_started,
        )
        return celery_app

create_celery_app(settings)

Create and configure a Celery application instance.

Parameters:

Name Type Description Default
settings CelerySettings

A CelerySettings instance populated from the pico-ioc configuration tree.

required

Returns:

Type Description
Celery

A fully configured Celery application with the broker,

Celery

backend, and tracking options applied.

Source code in src/pico_celery/factory.py
@provides(Celery, scope="singleton")
def create_celery_app(self, settings: CelerySettings) -> Celery:
    """Create and configure a ``Celery`` application instance.

    Args:
        settings: A ``CelerySettings`` instance populated from the
            pico-ioc configuration tree.

    Returns:
        A fully configured ``Celery`` application with the broker,
        backend, and tracking options applied.
    """
    celery_app = Celery(
        "pico_celery_tasks",
        broker=settings.broker_url,
        backend=settings.backend_url,
    )
    celery_app.conf.update(
        task_track_started=settings.task_track_started,
    )
    return celery_app

PicoTaskRegistrar

Discovers @task methods and registers them with the Celery app.

This component is created during container initialisation. Its register_tasks method (annotated with @configure) is called automatically by pico-ioc after all components are registered, ensuring that every @task-decorated async method is made available to the Celery worker.

Parameters:

Name Type Description Default
container PicoContainer

The pico-ioc PicoContainer used for component resolution at task execution time.

required
celery_app Celery

The Celery application instance (provided by CeleryFactory) on which tasks are registered.

required
Source code in src/pico_celery/registrar.py
@component
class PicoTaskRegistrar:
    """Discovers ``@task`` methods and registers them with the Celery app.

    This component is created during container initialisation. Its
    ``register_tasks`` method (annotated with ``@configure``) is called
    automatically by pico-ioc after all components are registered,
    ensuring that every ``@task``-decorated async method is made available
    to the Celery worker.

    Args:
        container: The pico-ioc ``PicoContainer`` used for component
            resolution at task execution time.
        celery_app: The ``Celery`` application instance (provided by
            ``CeleryFactory``) on which tasks are registered.
    """

    def __init__(self, container: PicoContainer, celery_app: Celery):
        self._container = container
        self._celery_app = celery_app

    @configure
    def register_tasks(self) -> None:
        """Scan all container components and register ``@task`` methods.

        Iterates over every component registered in the container's
        locator. For each class that has methods decorated with
        ``@task``, a synchronous wrapper is created and registered on the
        Celery application via ``celery_app.task()``.

        This method is invoked automatically by pico-ioc during the
        ``@configure`` lifecycle phase.
        """
        locator = getattr(self._container, "_locator", None)
        if locator is None:
            return
        metadata_map = getattr(locator, "_metadata", {})
        for md in metadata_map.values():
            component_cls = getattr(md, "concrete_class", None)
            if not inspect.isclass(component_cls):
                continue
            for method_name, method_func in inspect.getmembers(component_cls, inspect.isfunction):
                if not hasattr(method_func, PICO_CELERY_METHOD_META):
                    continue
                meta = getattr(method_func, PICO_CELERY_METHOD_META)
                task_name = meta.get("name")
                celery_options = meta.get("options", {})
                wrapper = self._create_task_wrapper(component_cls, method_name, self._container)
                self._celery_app.task(name=task_name, **celery_options)(wrapper)

    def _create_task_wrapper(
        self, component_cls: Type, method_name: str, container: PicoContainer
    ) -> Callable[..., Any]:
        """Build a sync wrapper that resolves the component and runs the async task.

        The wrapper handles event-loop detection: if no loop is running it
        uses ``asyncio.run()``; if a loop is already active (e.g. inside
        an ``eventlet``/``gevent`` worker) it offloads execution to a
        ``ThreadPoolExecutor`` to avoid blocking.

        Args:
            component_cls: The ``@component`` class that owns the task
                method.
            method_name: Name of the ``@task``-decorated method on
                *component_cls*.
            container: The ``PicoContainer`` used to resolve a fresh
                instance of *component_cls* for each execution.

        Returns:
            A synchronous callable suitable for registration with
            ``celery_app.task()``.
        """

        def sync_task_executor(*args: Any, **kwargs: Any) -> Any:
            async def run_task_logic() -> Any:
                component_instance = await container.aget(component_cls)
                task_method = getattr(component_instance, method_name)
                return await task_method(*args, **kwargs)

            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = None

            if loop and loop.is_running():
                import concurrent.futures

                with concurrent.futures.ThreadPoolExecutor() as pool:
                    return pool.submit(asyncio.run, run_task_logic()).result()

            return asyncio.run(run_task_logic())

        return sync_task_executor

register_tasks()

Scan all container components and register @task methods.

Iterates over every component registered in the container's locator. For each class that has methods decorated with @task, a synchronous wrapper is created and registered on the Celery application via celery_app.task().

This method is invoked automatically by pico-ioc during the @configure lifecycle phase.

Source code in src/pico_celery/registrar.py
@configure
def register_tasks(self) -> None:
    """Scan all container components and register ``@task`` methods.

    Iterates over every component registered in the container's
    locator. For each class that has methods decorated with
    ``@task``, a synchronous wrapper is created and registered on the
    Celery application via ``celery_app.task()``.

    This method is invoked automatically by pico-ioc during the
    ``@configure`` lifecycle phase.
    """
    locator = getattr(self._container, "_locator", None)
    if locator is None:
        return
    metadata_map = getattr(locator, "_metadata", {})
    for md in metadata_map.values():
        component_cls = getattr(md, "concrete_class", None)
        if not inspect.isclass(component_cls):
            continue
        for method_name, method_func in inspect.getmembers(component_cls, inspect.isfunction):
            if not hasattr(method_func, PICO_CELERY_METHOD_META):
                continue
            meta = getattr(method_func, PICO_CELERY_METHOD_META)
            task_name = meta.get("name")
            celery_options = meta.get("options", {})
            wrapper = self._create_task_wrapper(component_cls, method_name, self._container)
            self._celery_app.task(name=task_name, **celery_options)(wrapper)

celery(cls=None, *, scope='singleton', **kwargs)

Class decorator that registers a Celery client or worker component.

Scans the decorated class for methods marked with @send_task or @task. For @send_task methods it wires the CeleryClientInterceptor via pico-ioc's @intercepted_by. The class is then registered as a pico-ioc @component.

Can be used with or without parentheses::

@celery
class MyClient: ...

@celery(scope="prototype")
class MyClient: ...

Parameters:

Name Type Description Default
cls Optional[type]

The class to decorate. Passed automatically when the decorator is used without parentheses.

None
scope str

The pico-ioc scope for the component. Defaults to "singleton".

'singleton'
**kwargs Any

Additional keyword arguments forwarded to pico_ioc.component().

{}

Returns:

Type Description

The decorated class registered as a pico-ioc component, or a

decorator if cls is None.

Raises:

Type Description
ValueError

If the class contains no @send_task or @task methods (exact message: "No @send_task or @task methods found on <ClassName>").

Example

.. code-block:: python

from pico_celery import celery, send_task

@celery
class OrderClient:
    @send_task(name="tasks.place_order")
    def place_order(self, order_id: int):
        pass
Source code in src/pico_celery/client.py
def celery(cls: Optional[type] = None, *, scope: str = "singleton", **kwargs: Any):
    """Class decorator that registers a Celery client or worker component.

    Scans the decorated class for methods marked with ``@send_task`` or
    ``@task``. For ``@send_task`` methods it wires the
    ``CeleryClientInterceptor`` via pico-ioc's ``@intercepted_by``.
    The class is then registered as a pico-ioc ``@component``.

    Can be used with or without parentheses::

        @celery
        class MyClient: ...

        @celery(scope="prototype")
        class MyClient: ...

    Args:
        cls: The class to decorate. Passed automatically when the
            decorator is used without parentheses.
        scope: The pico-ioc scope for the component. Defaults to
            ``"singleton"``.
        **kwargs: Additional keyword arguments forwarded to
            ``pico_ioc.component()``.

    Returns:
        The decorated class registered as a pico-ioc component, or a
        decorator if *cls* is ``None``.

    Raises:
        ValueError: If the class contains no ``@send_task`` or ``@task``
            methods (exact message:
            ``"No @send_task or @task methods found on <ClassName>"``).

    Example:
        .. code-block:: python

            from pico_celery import celery, send_task

            @celery
            class OrderClient:
                @send_task(name="tasks.place_order")
                def place_order(self, order_id: int):
                    pass
    """

    def decorate(c: type) -> type:
        has_send_tasks = False
        has_worker_tasks = False

        for name, method in inspect.getmembers(c, inspect.isfunction):
            if hasattr(method, PICO_CELERY_SENDER_META):
                has_send_tasks = True
                setattr(method, "_needs_interception", True)

            if hasattr(method, PICO_CELERY_METHOD_META):
                has_worker_tasks = True

        if not has_send_tasks and not has_worker_tasks:
            raise ValueError(f"No @send_task or @task methods found on {c.__name__}")

        if has_send_tasks:
            for name, method in inspect.getmembers(c, inspect.isfunction):
                if getattr(method, "_needs_interception", False):
                    intercepted_method = intercepted_by(CeleryClientInterceptor)(method)
                    setattr(c, name, intercepted_method)

        return component(c, scope=scope, **kwargs)

    if cls is not None:
        return decorate(cls)
    return decorate

send_task(name, **celery_options)

Mark a method as a Celery task sender.

When a method decorated with @send_task is called on a class that has been decorated with @celery, the CeleryClientInterceptor intercepts the call and converts it into celery_app.send_task(). The method body is never executed.

Parameters:

Name Type Description Default
name str

The Celery task name to send (e.g. "tasks.send_email").

required
**celery_options Any

Additional keyword arguments forwarded to celery_app.send_task() (e.g. queue, countdown, eta, expires).

{}

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

A decorator that attaches sender metadata to the function and

Callable[[Callable[..., Any]], Callable[..., Any]]

returns the original function unchanged.

Raises:

Type Description
TypeError

If the target is not a function or method.

Example

.. code-block:: python

from pico_celery import celery, send_task

@celery
class NotificationClient:
    @send_task(name="tasks.notify", queue="high")
    def notify(self, user_id: int, msg: str):
        pass  # body is never executed
Source code in src/pico_celery/client.py
def send_task(name: str, **celery_options: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Mark a method as a Celery task sender.

    When a method decorated with ``@send_task`` is called on a class
    that has been decorated with ``@celery``, the
    ``CeleryClientInterceptor`` intercepts the call and converts it into
    ``celery_app.send_task()``. The method body is **never executed**.

    Args:
        name: The Celery task name to send
            (e.g. ``"tasks.send_email"``).
        **celery_options: Additional keyword arguments forwarded to
            ``celery_app.send_task()`` (e.g. ``queue``, ``countdown``,
            ``eta``, ``expires``).

    Returns:
        A decorator that attaches sender metadata to the function and
        returns the original function unchanged.

    Raises:
        TypeError: If the target is not a function or method.

    Example:
        .. code-block:: python

            from pico_celery import celery, send_task

            @celery
            class NotificationClient:
                @send_task(name="tasks.notify", queue="high")
                def notify(self, user_id: int, msg: str):
                    pass  # body is never executed
    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if not inspect.isfunction(func) and not inspect.ismethod(func):
            raise TypeError("send_task can only decorate methods or functions")
        metadata = {"name": name, "options": dict(celery_options)}
        setattr(func, PICO_CELERY_SENDER_META, metadata)
        return func

    return decorator

task(name, **celery_options)

Mark an async method as a Celery task.

The decorated method must be an async def coroutine. The decorator attaches metadata (task name and Celery options) to the function so that PicoTaskRegistrar can discover and register it at startup.

Parameters:

Name Type Description Default
name str

The Celery task name used for routing (e.g. "tasks.send_email").

required
**celery_options Any

Additional keyword arguments forwarded to celery_app.task() during registration (e.g. queue, max_retries, default_retry_delay).

{}

Returns:

Type Description
Callable[[Callable], Callable]

A decorator that attaches task metadata to the function and

Callable[[Callable], Callable]

returns the original function unchanged.

Raises:

Type Description
TypeError

If the decorated function is not an async def coroutine function.

Example

.. code-block:: python

from pico_ioc import component
from pico_celery import task

@component(scope="prototype")
class NotificationTasks:
    @task(name="tasks.notify", queue="high")
    async def notify(self, user_id: int, msg: str):
        ...
Source code in src/pico_celery/decorators.py
def task(name: str, **celery_options: Any) -> Callable[[Callable], Callable]:
    """Mark an async method as a Celery task.

    The decorated method must be an ``async def`` coroutine. The decorator
    attaches metadata (task name and Celery options) to the function so
    that ``PicoTaskRegistrar`` can discover and register it at startup.

    Args:
        name: The Celery task name used for routing (e.g. ``"tasks.send_email"``).
        **celery_options: Additional keyword arguments forwarded to
            ``celery_app.task()`` during registration (e.g. ``queue``,
            ``max_retries``, ``default_retry_delay``).

    Returns:
        A decorator that attaches task metadata to the function and
        returns the original function unchanged.

    Raises:
        TypeError: If the decorated function is not an ``async def``
            coroutine function.

    Example:
        .. code-block:: python

            from pico_ioc import component
            from pico_celery import task

            @component(scope="prototype")
            class NotificationTasks:
                @task(name="tasks.notify", queue="high")
                async def notify(self, user_id: int, msg: str):
                    ...
    """

    def decorator(func: Callable) -> Callable:
        if not inspect.iscoroutinefunction(func):
            raise TypeError(f"@task decorator can only be applied to async methods, got: {func.__name__}")
        metadata = {"name": name, "options": celery_options}
        setattr(func, PICO_CELERY_METHOD_META, metadata)
        return func

    return decorator

Configuration

pico_celery.config

Configuration dataclass for the Celery application.

CelerySettings is populated automatically from the pico-ioc configuration tree under the celery prefix using the @configured decorator.

CelerySettings dataclass

Settings required to create a Celery application instance.

Fields are populated from the pico-ioc configuration source under the "celery" prefix. For example, a DictSource entry of {"celery": {"broker_url": "redis://..."}} maps to CelerySettings.broker_url.

Attributes:

Name Type Description
broker_url str

URL of the message broker (e.g. "redis://localhost:6379/0").

backend_url str

URL of the result backend (e.g. "redis://localhost:6379/1").

task_track_started bool

When True, Celery reports a STARTED state when a worker begins executing a task. Defaults to True.

Example

.. code-block:: python

from pico_ioc import configuration, DictSource

config = configuration(DictSource({
    "celery": {
        "broker_url": "redis://localhost:6379/0",
        "backend_url": "redis://localhost:6379/1",
    }
}))
Source code in src/pico_celery/config.py
@configured(target="self", prefix="celery", mapping="tree")
@dataclass
class CelerySettings:
    """Settings required to create a Celery application instance.

    Fields are populated from the pico-ioc configuration source under
    the ``"celery"`` prefix. For example, a ``DictSource`` entry of
    ``{"celery": {"broker_url": "redis://..."}}`` maps to
    ``CelerySettings.broker_url``.

    Attributes:
        broker_url: URL of the message broker
            (e.g. ``"redis://localhost:6379/0"``).
        backend_url: URL of the result backend
            (e.g. ``"redis://localhost:6379/1"``).
        task_track_started: When ``True``, Celery reports a ``STARTED``
            state when a worker begins executing a task. Defaults to
            ``True``.

    Example:
        .. code-block:: python

            from pico_ioc import configuration, DictSource

            config = configuration(DictSource({
                "celery": {
                    "broker_url": "redis://localhost:6379/0",
                    "backend_url": "redis://localhost:6379/1",
                }
            }))
    """

    broker_url: str
    backend_url: str

    task_track_started: bool = True

Decorators

pico_celery.decorators

Worker-side @task decorator for marking async methods as Celery tasks.

This module provides the @task decorator used on async def methods inside pico-ioc @component classes. Decorated methods are later discovered by PicoTaskRegistrar and registered with the Celery application.

PICO_CELERY_METHOD_META = '_pico_celery_method_meta' module-attribute

str: Attribute name used to store task metadata on decorated methods.

task(name, **celery_options)

Mark an async method as a Celery task.

The decorated method must be an async def coroutine. The decorator attaches metadata (task name and Celery options) to the function so that PicoTaskRegistrar can discover and register it at startup.

Parameters:

Name Type Description Default
name str

The Celery task name used for routing (e.g. "tasks.send_email").

required
**celery_options Any

Additional keyword arguments forwarded to celery_app.task() during registration (e.g. queue, max_retries, default_retry_delay).

{}

Returns:

Type Description
Callable[[Callable], Callable]

A decorator that attaches task metadata to the function and

Callable[[Callable], Callable]

returns the original function unchanged.

Raises:

Type Description
TypeError

If the decorated function is not an async def coroutine function.

Example

.. code-block:: python

from pico_ioc import component
from pico_celery import task

@component(scope="prototype")
class NotificationTasks:
    @task(name="tasks.notify", queue="high")
    async def notify(self, user_id: int, msg: str):
        ...
Source code in src/pico_celery/decorators.py
def task(name: str, **celery_options: Any) -> Callable[[Callable], Callable]:
    """Mark an async method as a Celery task.

    The decorated method must be an ``async def`` coroutine. The decorator
    attaches metadata (task name and Celery options) to the function so
    that ``PicoTaskRegistrar`` can discover and register it at startup.

    Args:
        name: The Celery task name used for routing (e.g. ``"tasks.send_email"``).
        **celery_options: Additional keyword arguments forwarded to
            ``celery_app.task()`` during registration (e.g. ``queue``,
            ``max_retries``, ``default_retry_delay``).

    Returns:
        A decorator that attaches task metadata to the function and
        returns the original function unchanged.

    Raises:
        TypeError: If the decorated function is not an ``async def``
            coroutine function.

    Example:
        .. code-block:: python

            from pico_ioc import component
            from pico_celery import task

            @component(scope="prototype")
            class NotificationTasks:
                @task(name="tasks.notify", queue="high")
                async def notify(self, user_id: int, msg: str):
                    ...
    """

    def decorator(func: Callable) -> Callable:
        if not inspect.iscoroutinefunction(func):
            raise TypeError(f"@task decorator can only be applied to async methods, got: {func.__name__}")
        metadata = {"name": name, "options": celery_options}
        setattr(func, PICO_CELERY_METHOD_META, metadata)
        return func

    return decorator

Client

pico_celery.client

Client-side decorators and interceptor for sending Celery tasks.

This module provides:

  • CeleryClient -- a runtime-checkable Protocol used as a marker interface for client classes.
  • @send_task -- method decorator that marks a method as a task sender.
  • @celery -- class decorator that registers a class as a pico-ioc component and wires CeleryClientInterceptor to its @send_task methods.
  • CeleryClientInterceptor -- a MethodInterceptor that intercepts calls to @send_task methods and converts them into celery_app.send_task() invocations.

PICO_CELERY_SENDER_META = '_pico_celery_sender_meta' module-attribute

str: Attribute name used to store sender metadata on decorated methods.

CeleryClient

Bases: Protocol

Marker protocol for Celery client classes.

Classes decorated with @celery may optionally implement this protocol. It carries no methods -- its sole purpose is to provide a type that can be used for isinstance checks and type-hint annotations.

Source code in src/pico_celery/client.py
@runtime_checkable
class CeleryClient(Protocol):
    """Marker protocol for Celery client classes.

    Classes decorated with ``@celery`` may optionally implement this
    protocol. It carries no methods -- its sole purpose is to provide a
    type that can be used for ``isinstance`` checks and type-hint
    annotations.
    """

    pass

CeleryClientInterceptor

Bases: MethodInterceptor

AOP interceptor that converts method calls into send_task invocations.

Registered as a pico-ioc @component and injected with the Celery application. When a @send_task-decorated method is called, this interceptor reads the stored metadata and delegates to celery_app.send_task().

Parameters:

Name Type Description Default
celery_app Celery

The Celery application instance provided by CeleryFactory.

required
Source code in src/pico_celery/client.py
@component
class CeleryClientInterceptor(MethodInterceptor):
    """AOP interceptor that converts method calls into ``send_task`` invocations.

    Registered as a pico-ioc ``@component`` and injected with the
    ``Celery`` application. When a ``@send_task``-decorated method is
    called, this interceptor reads the stored metadata and delegates to
    ``celery_app.send_task()``.

    Args:
        celery_app: The ``Celery`` application instance provided by
            ``CeleryFactory``.
    """

    def __init__(self, celery_app: Celery):
        self._celery = celery_app

    def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
        """Intercept a method call and dispatch it as a Celery task.

        If the called method carries ``@send_task`` metadata, the
        interceptor extracts the task name and options and calls
        ``celery_app.send_task()``. Otherwise the call is forwarded to
        the next handler in the interceptor chain.

        Args:
            ctx: The method invocation context containing the class,
                method name, positional args, and keyword args.
            call_next: Callable to invoke the next interceptor or the
                original method.

        Returns:
            A ``celery.result.AsyncResult`` when the method is a
            ``@send_task`` sender, or the result of *call_next* for
            non-sender methods.
        """
        try:
            original_func = getattr(ctx.cls, ctx.name)
            meta = getattr(original_func, PICO_CELERY_SENDER_META, None)
        except AttributeError:
            meta = None

        if not meta:
            return call_next(ctx)

        task_name = meta["name"]
        options = meta.get("options", {})

        return self._celery.send_task(task_name, args=ctx.args, kwargs=ctx.kwargs, **options)

invoke(ctx, call_next)

Intercept a method call and dispatch it as a Celery task.

If the called method carries @send_task metadata, the interceptor extracts the task name and options and calls celery_app.send_task(). Otherwise the call is forwarded to the next handler in the interceptor chain.

Parameters:

Name Type Description Default
ctx MethodCtx

The method invocation context containing the class, method name, positional args, and keyword args.

required
call_next Callable[[MethodCtx], Any]

Callable to invoke the next interceptor or the original method.

required

Returns:

Type Description
Any

A celery.result.AsyncResult when the method is a

Any

@send_task sender, or the result of call_next for

Any

non-sender methods.

Source code in src/pico_celery/client.py
def invoke(self, ctx: MethodCtx, call_next: Callable[[MethodCtx], Any]) -> Any:
    """Intercept a method call and dispatch it as a Celery task.

    If the called method carries ``@send_task`` metadata, the
    interceptor extracts the task name and options and calls
    ``celery_app.send_task()``. Otherwise the call is forwarded to
    the next handler in the interceptor chain.

    Args:
        ctx: The method invocation context containing the class,
            method name, positional args, and keyword args.
        call_next: Callable to invoke the next interceptor or the
            original method.

    Returns:
        A ``celery.result.AsyncResult`` when the method is a
        ``@send_task`` sender, or the result of *call_next* for
        non-sender methods.
    """
    try:
        original_func = getattr(ctx.cls, ctx.name)
        meta = getattr(original_func, PICO_CELERY_SENDER_META, None)
    except AttributeError:
        meta = None

    if not meta:
        return call_next(ctx)

    task_name = meta["name"]
    options = meta.get("options", {})

    return self._celery.send_task(task_name, args=ctx.args, kwargs=ctx.kwargs, **options)

send_task(name, **celery_options)

Mark a method as a Celery task sender.

When a method decorated with @send_task is called on a class that has been decorated with @celery, the CeleryClientInterceptor intercepts the call and converts it into celery_app.send_task(). The method body is never executed.

Parameters:

Name Type Description Default
name str

The Celery task name to send (e.g. "tasks.send_email").

required
**celery_options Any

Additional keyword arguments forwarded to celery_app.send_task() (e.g. queue, countdown, eta, expires).

{}

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

A decorator that attaches sender metadata to the function and

Callable[[Callable[..., Any]], Callable[..., Any]]

returns the original function unchanged.

Raises:

Type Description
TypeError

If the target is not a function or method.

Example

.. code-block:: python

from pico_celery import celery, send_task

@celery
class NotificationClient:
    @send_task(name="tasks.notify", queue="high")
    def notify(self, user_id: int, msg: str):
        pass  # body is never executed
Source code in src/pico_celery/client.py
def send_task(name: str, **celery_options: Any) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Mark a method as a Celery task sender.

    When a method decorated with ``@send_task`` is called on a class
    that has been decorated with ``@celery``, the
    ``CeleryClientInterceptor`` intercepts the call and converts it into
    ``celery_app.send_task()``. The method body is **never executed**.

    Args:
        name: The Celery task name to send
            (e.g. ``"tasks.send_email"``).
        **celery_options: Additional keyword arguments forwarded to
            ``celery_app.send_task()`` (e.g. ``queue``, ``countdown``,
            ``eta``, ``expires``).

    Returns:
        A decorator that attaches sender metadata to the function and
        returns the original function unchanged.

    Raises:
        TypeError: If the target is not a function or method.

    Example:
        .. code-block:: python

            from pico_celery import celery, send_task

            @celery
            class NotificationClient:
                @send_task(name="tasks.notify", queue="high")
                def notify(self, user_id: int, msg: str):
                    pass  # body is never executed
    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if not inspect.isfunction(func) and not inspect.ismethod(func):
            raise TypeError("send_task can only decorate methods or functions")
        metadata = {"name": name, "options": dict(celery_options)}
        setattr(func, PICO_CELERY_SENDER_META, metadata)
        return func

    return decorator

celery(cls=None, *, scope='singleton', **kwargs)

Class decorator that registers a Celery client or worker component.

Scans the decorated class for methods marked with @send_task or @task. For @send_task methods it wires the CeleryClientInterceptor via pico-ioc's @intercepted_by. The class is then registered as a pico-ioc @component.

Can be used with or without parentheses::

@celery
class MyClient: ...

@celery(scope="prototype")
class MyClient: ...

Parameters:

Name Type Description Default
cls Optional[type]

The class to decorate. Passed automatically when the decorator is used without parentheses.

None
scope str

The pico-ioc scope for the component. Defaults to "singleton".

'singleton'
**kwargs Any

Additional keyword arguments forwarded to pico_ioc.component().

{}

Returns:

Type Description

The decorated class registered as a pico-ioc component, or a

decorator if cls is None.

Raises:

Type Description
ValueError

If the class contains no @send_task or @task methods (exact message: "No @send_task or @task methods found on <ClassName>").

Example

.. code-block:: python

from pico_celery import celery, send_task

@celery
class OrderClient:
    @send_task(name="tasks.place_order")
    def place_order(self, order_id: int):
        pass
Source code in src/pico_celery/client.py
def celery(cls: Optional[type] = None, *, scope: str = "singleton", **kwargs: Any):
    """Class decorator that registers a Celery client or worker component.

    Scans the decorated class for methods marked with ``@send_task`` or
    ``@task``. For ``@send_task`` methods it wires the
    ``CeleryClientInterceptor`` via pico-ioc's ``@intercepted_by``.
    The class is then registered as a pico-ioc ``@component``.

    Can be used with or without parentheses::

        @celery
        class MyClient: ...

        @celery(scope="prototype")
        class MyClient: ...

    Args:
        cls: The class to decorate. Passed automatically when the
            decorator is used without parentheses.
        scope: The pico-ioc scope for the component. Defaults to
            ``"singleton"``.
        **kwargs: Additional keyword arguments forwarded to
            ``pico_ioc.component()``.

    Returns:
        The decorated class registered as a pico-ioc component, or a
        decorator if *cls* is ``None``.

    Raises:
        ValueError: If the class contains no ``@send_task`` or ``@task``
            methods (exact message:
            ``"No @send_task or @task methods found on <ClassName>"``).

    Example:
        .. code-block:: python

            from pico_celery import celery, send_task

            @celery
            class OrderClient:
                @send_task(name="tasks.place_order")
                def place_order(self, order_id: int):
                    pass
    """

    def decorate(c: type) -> type:
        has_send_tasks = False
        has_worker_tasks = False

        for name, method in inspect.getmembers(c, inspect.isfunction):
            if hasattr(method, PICO_CELERY_SENDER_META):
                has_send_tasks = True
                setattr(method, "_needs_interception", True)

            if hasattr(method, PICO_CELERY_METHOD_META):
                has_worker_tasks = True

        if not has_send_tasks and not has_worker_tasks:
            raise ValueError(f"No @send_task or @task methods found on {c.__name__}")

        if has_send_tasks:
            for name, method in inspect.getmembers(c, inspect.isfunction):
                if getattr(method, "_needs_interception", False):
                    intercepted_method = intercepted_by(CeleryClientInterceptor)(method)
                    setattr(c, name, intercepted_method)

        return component(c, scope=scope, **kwargs)

    if cls is not None:
        return decorate(cls)
    return decorate

Factory

pico_celery.factory

Factory that creates a singleton Celery application from CelerySettings.

The CeleryFactory is a pico-ioc @factory whose @provides method builds and configures a celery.Celery instance, making it available for injection throughout the application.

CeleryFactory

IoC factory that provides a singleton Celery application.

Registered with pico-ioc via the @factory decorator. Its create_celery_app method is discovered automatically and called once to produce the shared Celery instance.

Example

The factory is auto-discovered; no manual instantiation is needed. Simply inject Celery wherever required:

.. code-block:: python

from celery import Celery
from pico_ioc import component

@component
class MyService:
    def __init__(self, app: Celery):
        self._app = app
Source code in src/pico_celery/factory.py
@factory
class CeleryFactory:
    """IoC factory that provides a singleton ``Celery`` application.

    Registered with pico-ioc via the ``@factory`` decorator. Its
    ``create_celery_app`` method is discovered automatically and called
    once to produce the shared ``Celery`` instance.

    Example:
        The factory is auto-discovered; no manual instantiation is needed.
        Simply inject ``Celery`` wherever required:

        .. code-block:: python

            from celery import Celery
            from pico_ioc import component

            @component
            class MyService:
                def __init__(self, app: Celery):
                    self._app = app
    """

    @provides(Celery, scope="singleton")
    def create_celery_app(self, settings: CelerySettings) -> Celery:
        """Create and configure a ``Celery`` application instance.

        Args:
            settings: A ``CelerySettings`` instance populated from the
                pico-ioc configuration tree.

        Returns:
            A fully configured ``Celery`` application with the broker,
            backend, and tracking options applied.
        """
        celery_app = Celery(
            "pico_celery_tasks",
            broker=settings.broker_url,
            backend=settings.backend_url,
        )
        celery_app.conf.update(
            task_track_started=settings.task_track_started,
        )
        return celery_app

create_celery_app(settings)

Create and configure a Celery application instance.

Parameters:

Name Type Description Default
settings CelerySettings

A CelerySettings instance populated from the pico-ioc configuration tree.

required

Returns:

Type Description
Celery

A fully configured Celery application with the broker,

Celery

backend, and tracking options applied.

Source code in src/pico_celery/factory.py
@provides(Celery, scope="singleton")
def create_celery_app(self, settings: CelerySettings) -> Celery:
    """Create and configure a ``Celery`` application instance.

    Args:
        settings: A ``CelerySettings`` instance populated from the
            pico-ioc configuration tree.

    Returns:
        A fully configured ``Celery`` application with the broker,
        backend, and tracking options applied.
    """
    celery_app = Celery(
        "pico_celery_tasks",
        broker=settings.broker_url,
        backend=settings.backend_url,
    )
    celery_app.conf.update(
        task_track_started=settings.task_track_started,
    )
    return celery_app

Registrar

pico_celery.registrar

Auto-discovery and registration of @task-decorated methods with Celery.

PicoTaskRegistrar is a pico-ioc @component that runs during the container's @configure phase. It scans all registered components for methods carrying @task metadata and registers a synchronous wrapper with the Celery application so that the worker can execute them.

PicoTaskRegistrar

Discovers @task methods and registers them with the Celery app.

This component is created during container initialisation. Its register_tasks method (annotated with @configure) is called automatically by pico-ioc after all components are registered, ensuring that every @task-decorated async method is made available to the Celery worker.

Parameters:

Name Type Description Default
container PicoContainer

The pico-ioc PicoContainer used for component resolution at task execution time.

required
celery_app Celery

The Celery application instance (provided by CeleryFactory) on which tasks are registered.

required
Source code in src/pico_celery/registrar.py
@component
class PicoTaskRegistrar:
    """Discovers ``@task`` methods and registers them with the Celery app.

    This component is created during container initialisation. Its
    ``register_tasks`` method (annotated with ``@configure``) is called
    automatically by pico-ioc after all components are registered,
    ensuring that every ``@task``-decorated async method is made available
    to the Celery worker.

    Args:
        container: The pico-ioc ``PicoContainer`` used for component
            resolution at task execution time.
        celery_app: The ``Celery`` application instance (provided by
            ``CeleryFactory``) on which tasks are registered.
    """

    def __init__(self, container: PicoContainer, celery_app: Celery):
        self._container = container
        self._celery_app = celery_app

    @configure
    def register_tasks(self) -> None:
        """Scan all container components and register ``@task`` methods.

        Iterates over every component registered in the container's
        locator. For each class that has methods decorated with
        ``@task``, a synchronous wrapper is created and registered on the
        Celery application via ``celery_app.task()``.

        This method is invoked automatically by pico-ioc during the
        ``@configure`` lifecycle phase.
        """
        locator = getattr(self._container, "_locator", None)
        if locator is None:
            return
        metadata_map = getattr(locator, "_metadata", {})
        for md in metadata_map.values():
            component_cls = getattr(md, "concrete_class", None)
            if not inspect.isclass(component_cls):
                continue
            for method_name, method_func in inspect.getmembers(component_cls, inspect.isfunction):
                if not hasattr(method_func, PICO_CELERY_METHOD_META):
                    continue
                meta = getattr(method_func, PICO_CELERY_METHOD_META)
                task_name = meta.get("name")
                celery_options = meta.get("options", {})
                wrapper = self._create_task_wrapper(component_cls, method_name, self._container)
                self._celery_app.task(name=task_name, **celery_options)(wrapper)

    def _create_task_wrapper(
        self, component_cls: Type, method_name: str, container: PicoContainer
    ) -> Callable[..., Any]:
        """Build a sync wrapper that resolves the component and runs the async task.

        The wrapper handles event-loop detection: if no loop is running it
        uses ``asyncio.run()``; if a loop is already active (e.g. inside
        an ``eventlet``/``gevent`` worker) it offloads execution to a
        ``ThreadPoolExecutor`` to avoid blocking.

        Args:
            component_cls: The ``@component`` class that owns the task
                method.
            method_name: Name of the ``@task``-decorated method on
                *component_cls*.
            container: The ``PicoContainer`` used to resolve a fresh
                instance of *component_cls* for each execution.

        Returns:
            A synchronous callable suitable for registration with
            ``celery_app.task()``.
        """

        def sync_task_executor(*args: Any, **kwargs: Any) -> Any:
            async def run_task_logic() -> Any:
                component_instance = await container.aget(component_cls)
                task_method = getattr(component_instance, method_name)
                return await task_method(*args, **kwargs)

            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:
                loop = None

            if loop and loop.is_running():
                import concurrent.futures

                with concurrent.futures.ThreadPoolExecutor() as pool:
                    return pool.submit(asyncio.run, run_task_logic()).result()

            return asyncio.run(run_task_logic())

        return sync_task_executor

register_tasks()

Scan all container components and register @task methods.

Iterates over every component registered in the container's locator. For each class that has methods decorated with @task, a synchronous wrapper is created and registered on the Celery application via celery_app.task().

This method is invoked automatically by pico-ioc during the @configure lifecycle phase.

Source code in src/pico_celery/registrar.py
@configure
def register_tasks(self) -> None:
    """Scan all container components and register ``@task`` methods.

    Iterates over every component registered in the container's
    locator. For each class that has methods decorated with
    ``@task``, a synchronous wrapper is created and registered on the
    Celery application via ``celery_app.task()``.

    This method is invoked automatically by pico-ioc during the
    ``@configure`` lifecycle phase.
    """
    locator = getattr(self._container, "_locator", None)
    if locator is None:
        return
    metadata_map = getattr(locator, "_metadata", {})
    for md in metadata_map.values():
        component_cls = getattr(md, "concrete_class", None)
        if not inspect.isclass(component_cls):
            continue
        for method_name, method_func in inspect.getmembers(component_cls, inspect.isfunction):
            if not hasattr(method_func, PICO_CELERY_METHOD_META):
                continue
            meta = getattr(method_func, PICO_CELERY_METHOD_META)
            task_name = meta.get("name")
            celery_options = meta.get("options", {})
            wrapper = self._create_task_wrapper(component_cls, method_name, self._container)
            self._celery_app.task(name=task_name, **celery_options)(wrapper)