API Reference¶
Complete API reference for Pico-Celery, auto-generated from source code docstrings.
Module Overview¶
| Module | Description |
|---|---|
pico_celery | Package exports and public API |
pico_celery.config | Celery configuration settings |
pico_celery.decorators | @task and @celery decorators |
pico_celery.client | Celery client wrapper |
pico_celery.factory | Container integration factory |
pico_celery.registrar | Task registrar for auto-discovery |
pico_celery¶
pico_celery ¶
Async-first Celery integration for pico-ioc.
pico-celery provides dependency-injected Celery task workers and declarative task clients, bridging pico-ioc's inversion-of-control container with Celery 5 distributed task execution.
Worker side
Use @task on async def methods inside @component classes to define tasks that are automatically discovered and registered by PicoTaskRegistrar.
Client side
Use @celery on a class and @send_task on its methods to create declarative, injectable clients whose calls are transparently converted into celery_app.send_task() invocations by CeleryClientInterceptor.
Example
.. code-block:: python
from pico_ioc import component
from pico_celery import task, celery, send_task
@component(scope="prototype")
class EmailTasks:
def __init__(self, mailer: MailerService):
self.mailer = mailer
@task(name="tasks.send_email")
async def send_email(self, to: str, body: str):
await self.mailer.send(to, body)
@celery
class EmailClient:
@send_task(name="tasks.send_email")
def send_email(self, to: str, body: str):
pass # body is never executed
CeleryClient ¶
Bases: Protocol
Marker protocol for Celery client classes.
Classes decorated with @celery may optionally implement this protocol. It carries no methods -- its sole purpose is to provide a type that can be used for isinstance checks and type-hint annotations.
Source code in src/pico_celery/client.py
CeleryClientInterceptor ¶
Bases: MethodInterceptor
AOP interceptor that converts method calls into send_task invocations.
Registered as a pico-ioc @component and injected with the Celery application. When a @send_task-decorated method is called, this interceptor reads the stored metadata and delegates to celery_app.send_task().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
celery_app | Celery | The | required |
Source code in src/pico_celery/client.py
invoke(ctx, call_next) ¶
Intercept a method call and dispatch it as a Celery task.
If the called method carries @send_task metadata, the interceptor extracts the task name and options and calls celery_app.send_task(). Otherwise the call is forwarded to the next handler in the interceptor chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx | MethodCtx | The method invocation context containing the class, method name, positional args, and keyword args. | required |
call_next | Callable[[MethodCtx], Any] | Callable to invoke the next interceptor or the original method. | required |
Returns:
| Type | Description |
|---|---|
Any | A |
Any |
|
Any | non-sender methods. |
Source code in src/pico_celery/client.py
CelerySettings dataclass ¶
Settings required to create a Celery application instance.
Fields are populated from the pico-ioc configuration source under the "celery" prefix. For example, a DictSource entry of {"celery": {"broker_url": "redis://..."}} maps to CelerySettings.broker_url.
Attributes:
| Name | Type | Description |
|---|---|---|
broker_url | str | URL of the message broker (e.g. |
backend_url | str | URL of the result backend (e.g. |
task_track_started | bool | When |
Example
.. code-block:: python
from pico_ioc import configuration, DictSource
config = configuration(DictSource({
"celery": {
"broker_url": "redis://localhost:6379/0",
"backend_url": "redis://localhost:6379/1",
}
}))
Source code in src/pico_celery/config.py
CeleryFactory ¶
IoC factory that provides a singleton Celery application.
Registered with pico-ioc via the @factory decorator. Its create_celery_app method is discovered automatically and called once to produce the shared Celery instance.
Example
The factory is auto-discovered; no manual instantiation is needed. Simply inject Celery wherever required:
.. code-block:: python
from celery import Celery
from pico_ioc import component
@component
class MyService:
def __init__(self, app: Celery):
self._app = app
Source code in src/pico_celery/factory.py
create_celery_app(settings) ¶
Create and configure a Celery application instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings | CelerySettings | A | required |
Returns:
| Type | Description |
|---|---|
Celery | A fully configured |
Celery | backend, and tracking options applied. |
Source code in src/pico_celery/factory.py
PicoTaskRegistrar ¶
Discovers @task methods and registers them with the Celery app.
This component is created during container initialisation. Its register_tasks method (annotated with @configure) is called automatically by pico-ioc after all components are registered, ensuring that every @task-decorated async method is made available to the Celery worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container | PicoContainer | The pico-ioc | required |
celery_app | Celery | The | required |
Source code in src/pico_celery/registrar.py
register_tasks() ¶
Scan all container components and register @task methods.
Iterates over every component registered in the container's locator. For each class that has methods decorated with @task, a synchronous wrapper is created and registered on the Celery application via celery_app.task().
This method is invoked automatically by pico-ioc during the @configure lifecycle phase.
Source code in src/pico_celery/registrar.py
celery(cls=None, *, scope='singleton', **kwargs) ¶
Class decorator that registers a Celery client or worker component.
Scans the decorated class for methods marked with @send_task or @task. For @send_task methods it wires the CeleryClientInterceptor via pico-ioc's @intercepted_by. The class is then registered as a pico-ioc @component.
Can be used with or without parentheses::
@celery
class MyClient: ...
@celery(scope="prototype")
class MyClient: ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls | Optional[type] | The class to decorate. Passed automatically when the decorator is used without parentheses. | None |
scope | str | The pico-ioc scope for the component. Defaults to | 'singleton' |
**kwargs | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
| The decorated class registered as a pico-ioc component, or a | |
| decorator if cls is |
Raises:
| Type | Description |
|---|---|
ValueError | If the class contains no |
Example
.. code-block:: python
from pico_celery import celery, send_task
@celery
class OrderClient:
@send_task(name="tasks.place_order")
def place_order(self, order_id: int):
pass
Source code in src/pico_celery/client.py
send_task(name, **celery_options) ¶
Mark a method as a Celery task sender.
When a method decorated with @send_task is called on a class that has been decorated with @celery, the CeleryClientInterceptor intercepts the call and converts it into celery_app.send_task(). The method body is never executed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The Celery task name to send (e.g. | required |
**celery_options | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., Any]], Callable[..., Any]] | A decorator that attaches sender metadata to the function and |
Callable[[Callable[..., Any]], Callable[..., Any]] | returns the original function unchanged. |
Raises:
| Type | Description |
|---|---|
TypeError | If the target is not a function or method. |
Example
.. code-block:: python
from pico_celery import celery, send_task
@celery
class NotificationClient:
@send_task(name="tasks.notify", queue="high")
def notify(self, user_id: int, msg: str):
pass # body is never executed
Source code in src/pico_celery/client.py
task(name, **celery_options) ¶
Mark an async method as a Celery task.
The decorated method must be an async def coroutine. The decorator attaches metadata (task name and Celery options) to the function so that PicoTaskRegistrar can discover and register it at startup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The Celery task name used for routing (e.g. | required |
**celery_options | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
Callable[[Callable], Callable] | A decorator that attaches task metadata to the function and |
Callable[[Callable], Callable] | returns the original function unchanged. |
Raises:
| Type | Description |
|---|---|
TypeError | If the decorated function is not an |
Example
.. code-block:: python
from pico_ioc import component
from pico_celery import task
@component(scope="prototype")
class NotificationTasks:
@task(name="tasks.notify", queue="high")
async def notify(self, user_id: int, msg: str):
...
Source code in src/pico_celery/decorators.py
Configuration¶
pico_celery.config ¶
Configuration dataclass for the Celery application.
CelerySettings is populated automatically from the pico-ioc configuration tree under the celery prefix using the @configured decorator.
CelerySettings dataclass ¶
Settings required to create a Celery application instance.
Fields are populated from the pico-ioc configuration source under the "celery" prefix. For example, a DictSource entry of {"celery": {"broker_url": "redis://..."}} maps to CelerySettings.broker_url.
Attributes:
| Name | Type | Description |
|---|---|---|
broker_url | str | URL of the message broker (e.g. |
backend_url | str | URL of the result backend (e.g. |
task_track_started | bool | When |
Example
.. code-block:: python
from pico_ioc import configuration, DictSource
config = configuration(DictSource({
"celery": {
"broker_url": "redis://localhost:6379/0",
"backend_url": "redis://localhost:6379/1",
}
}))
Source code in src/pico_celery/config.py
Decorators¶
pico_celery.decorators ¶
Worker-side @task decorator for marking async methods as Celery tasks.
This module provides the @task decorator used on async def methods inside pico-ioc @component classes. Decorated methods are later discovered by PicoTaskRegistrar and registered with the Celery application.
PICO_CELERY_METHOD_META = '_pico_celery_method_meta' module-attribute ¶
str: Attribute name used to store task metadata on decorated methods.
task(name, **celery_options) ¶
Mark an async method as a Celery task.
The decorated method must be an async def coroutine. The decorator attaches metadata (task name and Celery options) to the function so that PicoTaskRegistrar can discover and register it at startup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The Celery task name used for routing (e.g. | required |
**celery_options | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
Callable[[Callable], Callable] | A decorator that attaches task metadata to the function and |
Callable[[Callable], Callable] | returns the original function unchanged. |
Raises:
| Type | Description |
|---|---|
TypeError | If the decorated function is not an |
Example
.. code-block:: python
from pico_ioc import component
from pico_celery import task
@component(scope="prototype")
class NotificationTasks:
@task(name="tasks.notify", queue="high")
async def notify(self, user_id: int, msg: str):
...
Source code in src/pico_celery/decorators.py
Client¶
pico_celery.client ¶
Client-side decorators and interceptor for sending Celery tasks.
This module provides:
CeleryClient-- a runtime-checkableProtocolused as a marker interface for client classes.@send_task-- method decorator that marks a method as a task sender.@celery-- class decorator that registers a class as a pico-ioc component and wiresCeleryClientInterceptorto its@send_taskmethods.CeleryClientInterceptor-- aMethodInterceptorthat intercepts calls to@send_taskmethods and converts them intocelery_app.send_task()invocations.
PICO_CELERY_SENDER_META = '_pico_celery_sender_meta' module-attribute ¶
str: Attribute name used to store sender metadata on decorated methods.
CeleryClient ¶
Bases: Protocol
Marker protocol for Celery client classes.
Classes decorated with @celery may optionally implement this protocol. It carries no methods -- its sole purpose is to provide a type that can be used for isinstance checks and type-hint annotations.
Source code in src/pico_celery/client.py
CeleryClientInterceptor ¶
Bases: MethodInterceptor
AOP interceptor that converts method calls into send_task invocations.
Registered as a pico-ioc @component and injected with the Celery application. When a @send_task-decorated method is called, this interceptor reads the stored metadata and delegates to celery_app.send_task().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
celery_app | Celery | The | required |
Source code in src/pico_celery/client.py
invoke(ctx, call_next) ¶
Intercept a method call and dispatch it as a Celery task.
If the called method carries @send_task metadata, the interceptor extracts the task name and options and calls celery_app.send_task(). Otherwise the call is forwarded to the next handler in the interceptor chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx | MethodCtx | The method invocation context containing the class, method name, positional args, and keyword args. | required |
call_next | Callable[[MethodCtx], Any] | Callable to invoke the next interceptor or the original method. | required |
Returns:
| Type | Description |
|---|---|
Any | A |
Any |
|
Any | non-sender methods. |
Source code in src/pico_celery/client.py
send_task(name, **celery_options) ¶
Mark a method as a Celery task sender.
When a method decorated with @send_task is called on a class that has been decorated with @celery, the CeleryClientInterceptor intercepts the call and converts it into celery_app.send_task(). The method body is never executed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name | str | The Celery task name to send (e.g. | required |
**celery_options | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., Any]], Callable[..., Any]] | A decorator that attaches sender metadata to the function and |
Callable[[Callable[..., Any]], Callable[..., Any]] | returns the original function unchanged. |
Raises:
| Type | Description |
|---|---|
TypeError | If the target is not a function or method. |
Example
.. code-block:: python
from pico_celery import celery, send_task
@celery
class NotificationClient:
@send_task(name="tasks.notify", queue="high")
def notify(self, user_id: int, msg: str):
pass # body is never executed
Source code in src/pico_celery/client.py
celery(cls=None, *, scope='singleton', **kwargs) ¶
Class decorator that registers a Celery client or worker component.
Scans the decorated class for methods marked with @send_task or @task. For @send_task methods it wires the CeleryClientInterceptor via pico-ioc's @intercepted_by. The class is then registered as a pico-ioc @component.
Can be used with or without parentheses::
@celery
class MyClient: ...
@celery(scope="prototype")
class MyClient: ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls | Optional[type] | The class to decorate. Passed automatically when the decorator is used without parentheses. | None |
scope | str | The pico-ioc scope for the component. Defaults to | 'singleton' |
**kwargs | Any | Additional keyword arguments forwarded to | {} |
Returns:
| Type | Description |
|---|---|
| The decorated class registered as a pico-ioc component, or a | |
| decorator if cls is |
Raises:
| Type | Description |
|---|---|
ValueError | If the class contains no |
Example
.. code-block:: python
from pico_celery import celery, send_task
@celery
class OrderClient:
@send_task(name="tasks.place_order")
def place_order(self, order_id: int):
pass
Source code in src/pico_celery/client.py
Factory¶
pico_celery.factory ¶
Factory that creates a singleton Celery application from CelerySettings.
The CeleryFactory is a pico-ioc @factory whose @provides method builds and configures a celery.Celery instance, making it available for injection throughout the application.
CeleryFactory ¶
IoC factory that provides a singleton Celery application.
Registered with pico-ioc via the @factory decorator. Its create_celery_app method is discovered automatically and called once to produce the shared Celery instance.
Example
The factory is auto-discovered; no manual instantiation is needed. Simply inject Celery wherever required:
.. code-block:: python
from celery import Celery
from pico_ioc import component
@component
class MyService:
def __init__(self, app: Celery):
self._app = app
Source code in src/pico_celery/factory.py
create_celery_app(settings) ¶
Create and configure a Celery application instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings | CelerySettings | A | required |
Returns:
| Type | Description |
|---|---|
Celery | A fully configured |
Celery | backend, and tracking options applied. |
Source code in src/pico_celery/factory.py
Registrar¶
pico_celery.registrar ¶
Auto-discovery and registration of @task-decorated methods with Celery.
PicoTaskRegistrar is a pico-ioc @component that runs during the container's @configure phase. It scans all registered components for methods carrying @task metadata and registers a synchronous wrapper with the Celery application so that the worker can execute them.
PicoTaskRegistrar ¶
Discovers @task methods and registers them with the Celery app.
This component is created during container initialisation. Its register_tasks method (annotated with @configure) is called automatically by pico-ioc after all components are registered, ensuring that every @task-decorated async method is made available to the Celery worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container | PicoContainer | The pico-ioc | required |
celery_app | Celery | The | required |
Source code in src/pico_celery/registrar.py
register_tasks() ¶
Scan all container components and register @task methods.
Iterates over every component registered in the container's locator. For each class that has methods decorated with @task, a synchronous wrapper is created and registered on the Celery application via celery_app.task().
This method is invoked automatically by pico-ioc during the @configure lifecycle phase.