How to Test Celery Tasks Without a Broker¶
This guide covers strategies for unit-testing pico-celery worker tasks and client senders without running a live message broker.
Testing Worker Tasks¶
Because pico-celery tasks are regular async def methods on pico-ioc @component classes, you can test them by instantiating the component directly and calling the method.
Direct Instantiation¶
import pytest
from unittest.mock import AsyncMock
from myapp.tasks import EmailTasks
@pytest.mark.asyncio
async def test_send_email():
# Mock the dependency
mock_mailer = AsyncMock()
mock_mailer.send.return_value = {"status": "sent"}
# Instantiate the task component directly -- no container needed
tasks = EmailTasks(mailer=mock_mailer)
result = await tasks.send_email("user@example.com", "Hello", "Body")
mock_mailer.send.assert_called_once_with(
"user@example.com", "Hello", "Body"
)
No broker, no Celery app, no container -- the task method is just an async function.
Using the pico-ioc Container¶
For integration-style tests you can wire the container with mock dependencies:
import pytest
from unittest.mock import AsyncMock
from pico_ioc import init, configuration, DictSource
from myapp.tasks import EmailTasks
@pytest.mark.asyncio
async def test_send_email_with_container():
config = configuration(DictSource({
"celery": {
"broker_url": "memory://",
"backend_url": "cache+memory://",
}
}))
container = init(modules=["myapp.tasks"], config=config)
tasks = await container.aget(EmailTasks)
result = await tasks.send_email("user@example.com", "Hello", "Body")
assert result is not None
Testing Client Senders¶
Client classes decorated with @celery and @send_task delegate calls to celery_app.send_task() via the CeleryClientInterceptor. To test them without a broker, mock the Celery app.
Mocking the Celery App¶
import pytest
from unittest.mock import MagicMock, patch
@pytest.mark.asyncio
async def test_client_sends_task():
mock_celery = MagicMock()
mock_celery.send_task.return_value = MagicMock(id="abc-123")
# Patch the Celery app in the interceptor
from myapp.clients import NotificationClient
from pico_celery import CeleryClientInterceptor
interceptor = CeleryClientInterceptor(celery_app=mock_celery)
# Verify send_task is called with correct arguments
mock_celery.send_task.assert_not_called()
Using Celery's Built-in Test Helpers¶
Celery provides app.conf.update(task_always_eager=True) which executes tasks synchronously in the same process:
import pytest
from celery import Celery
@pytest.fixture
def celery_app():
app = Celery("test")
app.conf.update(
task_always_eager=True,
task_eager_propagates=True,
broker_url="memory://",
result_backend="cache+memory://",
)
return app
With task_always_eager=True, calling send_task() or delay() executes the task immediately and returns the result, with no broker required.
Testing Task Registration¶
To verify that PicoTaskRegistrar correctly discovers and registers tasks:
import pytest
from unittest.mock import MagicMock
from celery import Celery
from pico_ioc import init, configuration, DictSource
@pytest.mark.asyncio
async def test_task_registration():
config = configuration(DictSource({
"celery": {
"broker_url": "memory://",
"backend_url": "cache+memory://",
}
}))
container = init(modules=["myapp"], config=config)
celery_app = container.get(Celery)
# Verify tasks are registered
assert "tasks.send_email" in celery_app.tasks
Testing Client Interception End-to-End¶
For a full integration test of the client path:
import pytest
from unittest.mock import MagicMock
from pico_ioc import init, configuration, DictSource
from myapp.clients import NotificationClient
@pytest.mark.asyncio
async def test_client_integration():
config = configuration(DictSource({
"celery": {
"broker_url": "memory://",
"backend_url": "cache+memory://",
}
}))
container = init(modules=["myapp"], config=config)
client = await container.aget(NotificationClient)
result = client.notify(user_id=42, msg="Hello")
# result is an AsyncResult from celery_app.send_task()
assert result is not None
Summary¶
| What to test | Strategy | Broker needed |
|---|---|---|
| Task logic | Instantiate component, call await method() | No |
| Task with DI | Use container.aget() with memory:// broker | No |
| Client sends task | Mock Celery app, assert send_task() | No |
| Task registration | Init container, check celery_app.tasks | No |
| Full round-trip | task_always_eager=True | No |