EventBus API Reference 📨¶
The EventBus provides a lightweight, asynchronous pub–sub mechanism for decoupling components. It is provided by default when scanning the pico_ioc.event_bus module.
Class: EventBus¶
subscribe(event_type, fn, *, priority=0, policy=ExecPolicy.INLINE, once=False)¶
Registers a callback function (handler) for a specific event type.
event_type: Type[Event]: The event class to listen for.fn: Callable: The function or async coroutine to call.priority: int = 0: Handlers with higher priority run first.policy: ExecPolicy = ExecPolicy.INLINE:INLINE: (Default) The handler is awaited bypublish.TASK: Runs as a fire-and-forgetasyncio.Task.THREADPOOL: Runs a sync handler in a thread pool.once: bool = False: IfTrue, the handler is removed after one execution.
Notes: - Handlers can be sync or async. Sync handlers published with INLINE will be executed in place; use THREADPOOL to run sync handlers off the main loop. - Priority ordering applies within the same event type; higher numbers execute first. - Registering the same (event_type, fn) multiple times will result in multiple executions (one per registration).
unsubscribe(event_type, fn)¶
Removes a specific handler for an event type. If the handler is not registered, this is a no-op.
async publish(event: Event)¶
Asynchronously publishes an event, dispatching it to all registered subscribers.
- Behavior:
- Immediately finds all subscribers for
type(event). - Executes handlers based on their
ExecPolicy. - Awaits the completion of all
INLINEhandlers before returning. -
Exceptions raised by handlers are surfaced as
EventBusHandlerErrorunless otherwise handled by the policy. -
Usage:
publish_sync(event: Event)¶
Synchronously publishes an event.
- Behavior:
- If an event loop is running, it creates a task for
publish(event). -
If no loop is running, it calls
asyncio.run(self.publish(event)). -
Usage: Use when you must publish from a
deffunction.
post(event: Event)¶
Posts an event to an internal queue for processing by a background worker.
- Behavior: Non-blocking. Places the event in an
asyncio.Queue. - Requires: The background worker must be started via
await event_bus.start_worker()for queued events to be processed. - Thread Safety: Thread-safe and can be called from non-async threads.
-
Queue Capacity: If a maximum queue size was configured, posting to a full queue raises
EventBusQueueFullError. -
Usage: Advanced use for fire-and-forget queuing from any context, provided the worker is running.
async start_worker()¶
Starts an asyncio.Task that continuously processes events from the internal queue (fed by post()). The task runs on the same event loop that start_worker was awaited on. Calling this when the worker is already running is a no-op.
async stop_worker()¶
Gracefully stops the background worker task by queuing a None signal and waiting for the queue to be processed. Safe to call if the worker is not running (no-op).
async aclose()¶
Stops the worker (if running) and cleans up all resources, clearing all subscribers. After closing, any call to publish or post raises EventBusClosedError. This is called automatically by @cleanup if the PicoEventBusProvider is used.
Decorator: @subscribe(...)¶
A decorator to mark methods as event handlers. Used with AutoSubscriberMixin.
from pico_ioc import component, subscribe
from pico_ioc.event_bus import AutoSubscriberMixin, Event, ExecPolicy
class MyEvent(Event):
...
@component
class MyListener(AutoSubscriberMixin):
@subscribe(MyEvent, policy=ExecPolicy.TASK, priority=5, once=False)
async def on_my_event(self, event: MyEvent):
print("Got event in the background!")
- Supported options:
priority=0,policy=ExecPolicy.INLINE,once=False. - Methods can be sync or async; choose
policyaccordingly. AutoSubscriberMixinscans decorated methods and registers/unregisters them automatically during component lifecycle.
Exceptions¶
| Exception | Raised When |
|---|---|
EventBusClosedError | publish or post is called after aclose(). |
EventBusQueueFullError | post() is called on a full queue (if max_queue_size was set). |
EventBusHandlerError | A subscriber function raises an unhandled exception. |
EventBusError | post() is called without the worker running. |
Notes¶
- Threading and concurrency:
post()is safe to call from any thread; events are processed on the loop wherestart_worker()was called.- Use
THREADPOOLfor CPU-bound or blocking sync handlers to avoid blocking the event loop. - Integration:
- The default
EventBusinstance is provided by scanningpico_ioc.event_bus. - Cleanup is automatic when using
PicoEventBusProvidertogether with@cleanup.
Auto-generated API¶
pico_ioc.event_bus ¶
Asynchronous in-process event bus.
Provides :class:EventBus for publishing and subscribing to typed events, :func:subscribe as a declarative decorator, and :class:AutoSubscriberMixin for automatic wiring of @subscribe-decorated methods during container startup.
ExecPolicy ¶
Bases: Enum
Execution policy for event handlers.
Attributes:
| Name | Type | Description |
|---|---|---|
INLINE | Execute the handler synchronously in the current coroutine. | |
THREADPOOL | Run sync handlers in a thread-pool executor. | |
TASK | Schedule async handlers as |
Source code in src/pico_ioc/event_bus.py
ErrorPolicy ¶
Bases: Enum
Error handling policy for event handlers.
Attributes:
| Name | Type | Description |
|---|---|---|
LOG | Log handler errors and continue dispatching. | |
RAISE | Re-raise the first handler error immediately. |
Source code in src/pico_ioc/event_bus.py
Event ¶
Base class for application events.
Subclass this to define typed events::
class OrderPlaced(Event):
def __init__(self, order_id: str):
self.order_id = order_id
EventBus ¶
Asynchronous, typed, in-process event bus.
Supports both sync and async handlers with configurable execution and error policies. Optionally runs a background worker for queued dispatch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
default_exec_policy | ExecPolicy | Default execution policy for handlers that do not specify their own. | INLINE |
error_policy | ErrorPolicy | How handler errors are treated (log or raise). | LOG |
max_queue_size | int | Maximum size of the background worker queue. | 0 |
Example
bus = EventBus() bus.subscribe(OrderPlaced, lambda e: print(e.order_id)) await bus.publish(OrderPlaced(order_id="123"))
Source code in src/pico_ioc/event_bus.py
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 | |
AutoSubscriberMixin ¶
Mixin that auto-subscribes @subscribe-decorated methods to the EventBus.
Add this mixin to a @component class. During @configure, all methods carrying @subscribe metadata are registered with the :class:EventBus.
Source code in src/pico_ioc/event_bus.py
subscribe(event_type, *, priority=0, policy=ExecPolicy.INLINE, once=False) ¶
Decorator that marks a method for auto-subscription via AutoSubscriberMixin.
Apply to methods on a @component class that mixes in :class:AutoSubscriberMixin::
@component
class OrderHandler(AutoSubscriberMixin):
@subscribe(OrderPlaced, priority=10)
def on_order(self, event: OrderPlaced):
print(f"Order {event.order_id} placed")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type | Type[Event] | The :class: | required |
priority | int | Higher-priority handlers run first. Default | 0 |
policy | ExecPolicy | Execution policy for this handler. | INLINE |
once | bool | If | False |
Returns:
| Type | Description |
|---|---|
| A decorator that attaches subscription metadata to the function. |