Frequently Asked Questions¶
General¶
What is Pico-Celery?¶
Pico-Celery integrates pico-ioc with Celery 5, enabling dependency injection for both task workers and task clients. It has two sides:
- Worker:
@taskmarks async methods on@componentclasses as Celery tasks - Client:
@celery+@send_taskprovides declarative, injectable task clients
What Python versions are supported?¶
Pico-Celery requires Python 3.11 or later.
Worker Side¶
How do I define a task?¶
Use @task on an async def method inside a @component class:
from pico_ioc import component
from pico_celery import task
@component(scope="prototype")
class MyTasks:
def __init__(self, service: MyService):
self.service = service
@task(name="tasks.process")
async def process(self, data: dict):
return await self.service.process(data)
Why must tasks be async?¶
Pico-Celery is async-native. The @task decorator enforces async def and raises TypeError for sync methods. Use an eventlet or gevent worker pool.
What scope should task components use?¶
Use prototype scope for task components. Each task execution gets a fresh instance, ensuring state isolation between concurrent tasks.
Client Side¶
How do I send a task?¶
Define a client class with @celery and @send_task:
from pico_celery import celery, send_task
@celery
class MyClient:
@send_task(name="tasks.process")
def process(self, data: dict):
pass # Body is never executed
The CeleryClientInterceptor intercepts method calls and converts them into celery_app.send_task() calls. The method body is never executed.
Can I pass Celery options to send_task?¶
Yes, pass them as keyword arguments:
@send_task(name="tasks.process", queue="high_priority", countdown=10)
def process(self, data: dict):
pass
Configuration¶
How do I configure the Celery broker?¶
Use pico-ioc configuration with the "celery" prefix:
from pico_ioc import configuration, DictSource
config = configuration(DictSource({
"celery": {
"broker_url": "redis://localhost:6379/0",
"backend_url": "redis://localhost:6379/1",
}
}))
Can I use different configurations for different environments?¶
Yes, use environment-specific configuration sources:
from pico_ioc import configuration, EnvSource, DictSource
config = configuration(
EnvSource(prefix="CELERY_"),
DictSource({"celery": {"broker_url": "redis://localhost:6379/0"}})
)
Troubleshooting¶
This section documents every error message emitted by pico-celery with the exact text, its cause, and how to fix it.
TypeError: @task decorator can only be applied to async methods, got: <method_name>¶
Source: pico_celery.decorators.task
Cause: The @task decorator was applied to a synchronous function or method. pico-celery only supports async def task handlers.
Fix: Convert the method to an async coroutine:
# Wrong
@task(name="tasks.process")
def process(self, data: dict): # sync -- raises TypeError
...
# Correct
@task(name="tasks.process")
async def process(self, data: dict): # async
...
TypeError: send_task can only decorate methods or functions¶
Source: pico_celery.client.send_task
Cause: The @send_task decorator was applied to something that is not a function or method (e.g. a class or a property).
Fix: Apply @send_task only to regular methods or functions:
# Wrong
@send_task(name="tasks.process")
class NotAMethod: # raises TypeError
pass
# Correct
@send_task(name="tasks.process")
def process(self, data: dict):
pass
ValueError: No @send_task or @task methods found on <ClassName>¶
Source: pico_celery.client.celery
Cause: A class was decorated with @celery but none of its methods are decorated with @send_task or @task. The @celery decorator requires at least one task-related method.
Fix: Add at least one @send_task or @task method to the class:
# Wrong -- no task methods
@celery
class EmptyClient:
def regular_method(self): # raises ValueError
pass
# Correct
@celery
class MyClient:
@send_task(name="tasks.process")
def process(self, data: dict):
pass
Tasks are not being discovered¶
Cause: The modules containing your @component classes with @task methods are not being scanned by pico-ioc.
Fix: Ensure your task modules are included in the modules list when initialising the container:
If using pico-boot, verify that your package is listed in the pico_boot.modules entry point in pyproject.toml.
Dependencies not injected into task components¶
Cause: The services required by your task component are not registered as pico-ioc components.
Fix: Decorate every dependency with @component:
from pico_ioc import component
@component
class MyService:
pass
@component(scope="prototype")
class MyTasks:
def __init__(self, service: MyService): # MyService must be a @component
self.service = service
Task executes but the component has stale state¶
Cause: The task component is using singleton scope, so the same instance is reused across all task executions.
Fix: Use prototype scope for task components to get a fresh instance per execution:
@component(scope="prototype") # not "singleton"
class MyTasks:
def __init__(self, service: MyService):
self.service = service
RuntimeError or event loop errors in worker¶
Cause: The Celery worker already has a running event loop (e.g. with eventlet or gevent pool), and asyncio.run() cannot create a nested loop.
How pico-celery handles it: PicoTaskRegistrar._create_task_wrapper detects a running loop and offloads the async task to a ThreadPoolExecutor automatically. If you see this error, it may indicate a conflict with another library that patches the event loop.
Fix: Ensure you are using a compatible worker pool:
Client method body is executed instead of sending a task¶
Cause: The class is not decorated with @celery, so the CeleryClientInterceptor is not wired and the method body runs directly.
Fix: Add the @celery decorator to the class: