Advanced: The Event Bus¶
As your application grows, services often become tightly coupled.
Problem: Your UserService needs to know about many other services. When a user is created, it must directly call all of them:
@component
class UserService:
def __init__(
self,
db: Database,
email_service: EmailService,
analytics: AnalyticsService,
audit_log: AuditLogService
):
self.db = db
self.email_service = email_service
self.analytics = analytics
self.audit_log = audit_log
async def create_user(self, email: str):
# 1. Business Logic
user = self.db.save(email)
# 2. Tightly Coupled Calls
# What if EmailService fails?
# What if we add a new service? We have to edit this file.
await self.email_service.send_welcome_email(user)
await self.analytics.track_event("user_created", user.id)
await self.audit_log.record(f"User {user.id} created")
return user
This is brittle and hard to maintain. UserService shouldn't have to know about all these other concerns.
Solution: pico-ioc provides a built-in asynchronous event bus. This allows you to decouple your services using a Publish/Subscribe pattern.
Instead of calling other services, UserService simply publishes an Event. Other services can then "subscribe" to that event and react to it independently.
1. The Core Concepts¶
Event¶
An Event is a simple class (like a dataclass) that holds information about what happened.
from dataclasses import dataclass
from pico_ioc.event_bus import Event
@dataclass
class UserCreatedEvent(Event):
user_id: int
email: str
EventBus¶
The EventBus is a component provided by pico-ioc that you can inject. It has two key methods:
await bus.publish(event): Publishes an event and waits for allINLINEsubscribers to finish.bus.post(event): Puts an event on a background queue to be processed later (requires starting the worker).
subscribe¶
The @subscribe decorator allows a method to listen for a specific event type. Subscribers are discovered and registered automatically when their component instances are created.
2. Step-by-Step Example¶
Let's refactor our UserService to use the event bus.
Step 1: Define the Event¶
First, we define the event that will be published.
# events.py
from dataclasses import dataclass
from pico_ioc.event_bus import Event
@dataclass
class UserCreatedEvent(Event):
user_id: int
email: str
Step 2: Refactor UserService to Publish¶
Now, UserService only needs to know about the EventBus. Its dependencies are drastically reduced.
# services/user_service.py
from pico_ioc import component
from pico_ioc.event_bus import EventBus
from .events import UserCreatedEvent
# Assume Database is defined elsewhere
class Database:
def save(self, email: str) -> 'User': ... # Mock
class User:
id: int
email: str
@component
class UserService:
def __init__(self, db: Database, bus: EventBus):
self.db = db
self.bus = bus # Just inject the bus
async def create_user(self, email: str):
# 1. Business Logic
user = self.db.save(email) # Assume save returns a User object with id and email
# 2. Publish Event
# We just shout "this happened!" and don't care who is listening.
event = UserCreatedEvent(user_id=user.id, email=user.email)
await self.bus.publish(event)
return user
Important: UserService is now completely decoupled from the email, analytics, and audit services.
Step 3: Create Subscribers¶
Next, we create our listener components. Use AutoSubscriberMixin to automatically find and register @subscribe methods when the component instance is created.
# services/email_service.py
import asyncio # For simulation
from pico_ioc import component
from pico_ioc.event_bus import AutoSubscriberMixin, subscribe
from ..events import UserCreatedEvent
@component
class EmailService(AutoSubscriberMixin):
@subscribe(UserCreatedEvent)
async def on_user_created(self, event: UserCreatedEvent):
# This method is automatically called when UserCreatedEvent is published
print(f"EMAIL: Sending welcome email to {event.email}")
await self.send_email(event.email, "Welcome!")
async def send_email(self, to, body):
await asyncio.sleep(0.01) # Simulate sending
pass
# services/analytics_service.py
import asyncio # For simulation
from pico_ioc import component
from pico_ioc.event_bus import AutoSubscriberMixin, subscribe, ExecPolicy
from ..events import UserCreatedEvent
@component
class AnalyticsService(AutoSubscriberMixin):
@subscribe(UserCreatedEvent, policy=ExecPolicy.TASK)
async def on_user_created(self, event: UserCreatedEvent):
# This handler is non-critical, so we run it as a
# "fire and forget" task that doesn't block the publisher.
print(f"ANALYTICS: Tracking event for user {event.user_id}")
await self.track(event.user_id)
async def track(self, user_id):
await asyncio.sleep(0.02) # Simulate tracking
pass
Step 4: Run It¶
You must include pico_ioc.event_bus in your init() call's modules list to register the EventBus component itself.
# main.py
import asyncio
import pico_ioc.event_bus # Import the module to be scanned
from pico_ioc import init, component
# Adjust imports based on your actual project structure
# Example assumes services are in ./app/services/ and events in ./app/events/
from app.services.user_service import UserService
from app.services.email_service import EmailService # Need these for scanning
from app.services.analytics_service import AnalyticsService
from app.events import UserCreatedEvent
# Mock Database for runnable example
class MockUser:
id: int = 1
email: str = "test@example.com"
@component
class MockDatabase:
def save(self, email: str) -> MockUser:
print(f"DB: Saving {email}")
return MockUser()
container = init(
modules=[
"app.events", # Contains UserCreatedEvent definition
"app.services.user_service",
"app.services.email_service",
"app.services.analytics_service",
pico_ioc.event_bus, # Don't forget this!
__name__, # Include current module for MockDatabase
]
)
async def run_example():
user_service = await container.aget(UserService)
print("Creating user...")
await user_service.create_user("alice@example.com")
print("User creation initiated (event published).")
# Give background tasks (like analytics) a moment to run in this example
await asyncio.sleep(0.1)
print("Cleaning up...")
await container.cleanup_all_async()
if __name__ == "__main__":
asyncio.run(run_example())
# --- Example Output ---
# Creating user...
# DB: Saving alice@example.com
# EMAIL: Sending welcome email to test@example.com
# ANALYTICS: Tracking event for user 1
# User creation initiated (event published).
# Cleaning up...
Notes: - Subscribers are discovered when their component instance is created. Ensure the container scans the modules where your subscriber components live, or explicitly resolve them so their @subscribe methods are registered.
3. Execution Policies (ExecPolicy)¶
By default, await bus.publish(event) waits for all INLINE subscribers to complete.
You can control this behavior using the policy argument in @subscribe:
ExecPolicy.INLINE(Default): The publisher awaits this handler (if it's async). Use this for critical, blocking tasks (like sending the welcome email).ExecPolicy.TASK: The bus starts this handler as a fire-and-forgetasyncio.Taskand does not wait for it to complete. Use this for non-critical background tasks (like analytics or logging).ExecPolicy.THREADPOOL: For sync (non-async) handlers. Runs the handler in a separate thread so it doesn't block theasyncioevent loop. Use this for blocking I/O in sync handlers within an async application.
Behavioral notes: - Exceptions in INLINE handlers propagate to the publisher. - Exceptions in TASK or THREADPOOL handlers are caught and logged; they do not block the publisher.
4. publish() vs. post()¶
Method comparison:
await bus.publish(event)- Execution: Synchronous (in-process)
- How it works: Immediately finds and awaits all
INLINEsubscribers. RunsTASKsubscribers concurrently without waiting. ExecutesTHREADPOOLsubscribers in threads. - Use Case: 99% of the time. You want the event handled now or initiated immediately.
-
Blocking (Publisher): Waits only for
INLINEhandlers. -
bus.post(event) - Execution: Asynchronous (queued)
- How it works: Puts the event on a background queue. A separate worker task (started via
await bus.start_worker()) processes the queue later by callingpublishinternally. - Use Case: Fire-and-forget from a sync context, or when you need queuing behavior (
max_queue_size). Requires managing the worker lifecycle (start_worker/stop_worker). - Blocking (Publisher): Does not wait (non-blocking).
Rule of Thumb: Always use await bus.publish() unless you specifically need the queuing behavior and are prepared to manage the worker task lifecycle.
Worker lifecycle for post()¶
If you use bus.post(...), you must manage the worker:
from pico_ioc.event_bus import EventBus
bus = await container.aget(EventBus)
await bus.start_worker() # starts the background task to drain the queue
# ... now bus.post(event) will enqueue and be processed in the background ...
await bus.stop_worker() # gracefully stops the worker (e.g., on shutdown)
You can optionally configure the queue size when initializing the bus (see your container configuration or EventBus options in your project).
Next Steps¶
You've seen how to decouple services using the event bus. The next guide covers how to control which services are even registered in the first place, based on your environment.
- Conditional Binding: Learn how to use
primary=True,on_missing_selector, andconditional_*parameters to control your container's setup.