Skip to content

EventBus API Reference 📨

The EventBus provides a lightweight, asynchronous pub–sub mechanism for decoupling components. It is provided by default when scanning the pico_ioc.event_bus module.


Class: EventBus

subscribe(event_type, fn, *, priority=0, policy=ExecPolicy.INLINE, once=False)

Registers a callback function (handler) for a specific event type.

  • event_type: Type[Event]: The event class to listen for.
  • fn: Callable: The function or async coroutine to call.
  • priority: int = 0: Handlers with higher priority run first.
  • policy: ExecPolicy = ExecPolicy.INLINE:
  • INLINE: (Default) The handler is awaited by publish.
  • TASK: Runs as a fire-and-forget asyncio.Task.
  • THREADPOOL: Runs a sync handler in a thread pool.
  • once: bool = False: If True, the handler is removed after one execution.

Notes: - Handlers can be sync or async. Sync handlers published with INLINE will be executed in place; use THREADPOOL to run sync handlers off the main loop. - Priority ordering applies within the same event type; higher numbers execute first. - Registering the same (event_type, fn) multiple times will result in multiple executions (one per registration).


unsubscribe(event_type, fn)

Removes a specific handler for an event type. If the handler is not registered, this is a no-op.


async publish(event: Event)

Asynchronously publishes an event, dispatching it to all registered subscribers.

  • Behavior:
  • Immediately finds all subscribers for type(event).
  • Executes handlers based on their ExecPolicy.
  • Awaits the completion of all INLINE handlers before returning.
  • Exceptions raised by handlers are surfaced as EventBusHandlerError unless otherwise handled by the policy.

  • Usage:

    await event_bus.publish(UserCreatedEvent(user_id=123))
    


publish_sync(event: Event)

Synchronously publishes an event.

  • Behavior:
  • If an event loop is running, it creates a task for publish(event).
  • If no loop is running, it calls asyncio.run(self.publish(event)).

  • Usage: Use when you must publish from a def function.


post(event: Event)

Posts an event to an internal queue for processing by a background worker.

  • Behavior: Non-blocking. Places the event in an asyncio.Queue.
  • Requires: The background worker must be started via await event_bus.start_worker() for queued events to be processed.
  • Thread Safety: Thread-safe and can be called from non-async threads.
  • Queue Capacity: If a maximum queue size was configured, posting to a full queue raises EventBusQueueFullError.

  • Usage: Advanced use for fire-and-forget queuing from any context, provided the worker is running.


async start_worker()

Starts an asyncio.Task that continuously processes events from the internal queue (fed by post()). The task runs on the same event loop that start_worker was awaited on. Calling this when the worker is already running is a no-op.


async stop_worker()

Gracefully stops the background worker task by queuing a None signal and waiting for the queue to be processed. Safe to call if the worker is not running (no-op).


async aclose()

Stops the worker (if running) and cleans up all resources, clearing all subscribers. After closing, any call to publish or post raises EventBusClosedError. This is called automatically by @cleanup if the PicoEventBusProvider is used.


Decorator: @subscribe(...)

A decorator to mark methods as event handlers. Used with AutoSubscriberMixin.

from pico_ioc import component, subscribe
from pico_ioc.event_bus import AutoSubscriberMixin, Event, ExecPolicy

class MyEvent(Event):
    ...

@component
class MyListener(AutoSubscriberMixin):

    @subscribe(MyEvent, policy=ExecPolicy.TASK, priority=5, once=False)
    async def on_my_event(self, event: MyEvent):
        print("Got event in the background!")
  • Supported options: priority=0, policy=ExecPolicy.INLINE, once=False.
  • Methods can be sync or async; choose policy accordingly.
  • AutoSubscriberMixin scans decorated methods and registers/unregisters them automatically during component lifecycle.

Exceptions

Exception Raised When
EventBusClosedError publish or post is called after aclose().
EventBusQueueFullError post() is called on a full queue (if max_queue_size was set).
EventBusHandlerError A subscriber function raises an unhandled exception.
EventBusError post() is called without the worker running.

Notes

  • Threading and concurrency:
  • post() is safe to call from any thread; events are processed on the loop where start_worker() was called.
  • Use THREADPOOL for CPU-bound or blocking sync handlers to avoid blocking the event loop.
  • Integration:
  • The default EventBus instance is provided by scanning pico_ioc.event_bus.
  • Cleanup is automatic when using PicoEventBusProvider together with @cleanup.

Auto-generated API

pico_ioc.event_bus

Asynchronous in-process event bus.

Provides :class:EventBus for publishing and subscribing to typed events, :func:subscribe as a declarative decorator, and :class:AutoSubscriberMixin for automatic wiring of @subscribe-decorated methods during container startup.

ExecPolicy

Bases: Enum

Execution policy for event handlers.

Attributes:

Name Type Description
INLINE

Execute the handler synchronously in the current coroutine.

THREADPOOL

Run sync handlers in a thread-pool executor.

TASK

Schedule async handlers as asyncio.Task instances.

Source code in src/pico_ioc/event_bus.py
class ExecPolicy(Enum):
    """Execution policy for event handlers.

    Attributes:
        INLINE: Execute the handler synchronously in the current coroutine.
        THREADPOOL: Run sync handlers in a thread-pool executor.
        TASK: Schedule async handlers as ``asyncio.Task`` instances.
    """

    INLINE = auto()
    THREADPOOL = auto()
    TASK = auto()

ErrorPolicy

Bases: Enum

Error handling policy for event handlers.

Attributes:

Name Type Description
LOG

Log handler errors and continue dispatching.

RAISE

Re-raise the first handler error immediately.

Source code in src/pico_ioc/event_bus.py
class ErrorPolicy(Enum):
    """Error handling policy for event handlers.

    Attributes:
        LOG: Log handler errors and continue dispatching.
        RAISE: Re-raise the first handler error immediately.
    """

    LOG = auto()
    RAISE = auto()

Event

Base class for application events.

Subclass this to define typed events::

class OrderPlaced(Event):
    def __init__(self, order_id: str):
        self.order_id = order_id
Source code in src/pico_ioc/event_bus.py
class Event:
    """Base class for application events.

    Subclass this to define typed events::

        class OrderPlaced(Event):
            def __init__(self, order_id: str):
                self.order_id = order_id
    """

    ...

EventBus

Asynchronous, typed, in-process event bus.

Supports both sync and async handlers with configurable execution and error policies. Optionally runs a background worker for queued dispatch.

Parameters:

Name Type Description Default
default_exec_policy ExecPolicy

Default execution policy for handlers that do not specify their own.

INLINE
error_policy ErrorPolicy

How handler errors are treated (log or raise).

LOG
max_queue_size int

Maximum size of the background worker queue. 0 means unbounded.

0
Example

bus = EventBus() bus.subscribe(OrderPlaced, lambda e: print(e.order_id)) await bus.publish(OrderPlaced(order_id="123"))

Source code in src/pico_ioc/event_bus.py
class EventBus:
    """Asynchronous, typed, in-process event bus.

    Supports both sync and async handlers with configurable execution and
    error policies. Optionally runs a background worker for queued dispatch.

    Args:
        default_exec_policy: Default execution policy for handlers that do
            not specify their own.
        error_policy: How handler errors are treated (log or raise).
        max_queue_size: Maximum size of the background worker queue. ``0``
            means unbounded.

    Example:
        >>> bus = EventBus()
        >>> bus.subscribe(OrderPlaced, lambda e: print(e.order_id))
        >>> await bus.publish(OrderPlaced(order_id="123"))
    """

    def __init__(
        self,
        *,
        default_exec_policy: ExecPolicy = ExecPolicy.INLINE,
        error_policy: ErrorPolicy = ErrorPolicy.LOG,
        max_queue_size: int = 0,
    ):
        self._subs: Dict[Type[Event], List[_Subscriber]] = {}
        self._default_policy = default_exec_policy
        self._error_policy = error_policy
        self._queue: Optional[asyncio.Queue[Event]] = asyncio.Queue(max_queue_size) if max_queue_size >= 0 else None
        self._worker_task: Optional[asyncio.Task] = None
        self._worker_loop: Optional[asyncio.AbstractEventLoop] = None
        self._closed = False
        self._lock = threading.RLock()

    def subscribe(
        self,
        event_type: Type[Event],
        fn: Callable[[Event], Any] | Callable[[Event], Awaitable[Any]],
        *,
        priority: int = 0,
        policy: Optional[ExecPolicy] = None,
        once: bool = False,
    ) -> None:
        with self._lock:
            if self._closed:
                raise EventBusClosedError()
            sub = _Subscriber(priority=priority, callback=fn, policy=policy or self._default_policy, once=once)
            lst = self._subs.setdefault(event_type, [])
            if any(s.callback is fn for s in lst):
                return
            lst.append(sub)
            lst.sort()

    def unsubscribe(
        self, event_type: Type[Event], fn: Callable[[Event], Any] | Callable[[Event], Awaitable[Any]]
    ) -> None:
        with self._lock:
            lst = self._subs.get(event_type, [])
            self._subs[event_type] = [s for s in lst if s.callback is not fn]

    def publish_sync(self, event: Event) -> None:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            asyncio.run(self.publish(event))
            return
        if loop.is_running():

            async def _bridge():
                await self.publish(event)

            loop.create_task(_bridge())
        else:
            asyncio.run(self.publish(event))

    async def publish(self, event: Event) -> None:
        if self._closed:
            raise EventBusClosedError()
        with self._lock:
            subs = list(self._subs.get(type(event), []))
        to_remove: List[_Subscriber] = []
        pending: List[asyncio.Task] = []
        for sub in subs:
            try:
                cb = sub.callback
                if inspect.iscoroutinefunction(cb):
                    if sub.policy is ExecPolicy.TASK:
                        pending.append(asyncio.create_task(cb(event)))
                    else:
                        await cb(event)
                else:
                    if sub.policy is ExecPolicy.THREADPOOL:
                        loop = asyncio.get_running_loop()
                        await loop.run_in_executor(None, cb, event)
                    else:
                        cb(event)
                if sub.once:
                    to_remove.append(sub)
            except Exception as ex:
                self._handle_error(
                    EventBusHandlerError(type(event).__name__, getattr(sub.callback, "__name__", "<callback>"), ex)
                )
        if pending:
            try:
                await asyncio.gather(*pending, return_exceptions=False)
            except Exception as ex:
                self._handle_error(EventBusError(f"Unhandled error awaiting event tasks: {ex}"))
        if to_remove:
            with self._lock:
                lst = self._subs.get(type(event), [])
                self._subs[type(event)] = [s for s in lst if s not in to_remove]

    async def start_worker(self) -> None:
        if self._closed:
            raise EventBusClosedError()
        if self._worker_task:
            return
        if self._queue is None:
            self._queue = asyncio.Queue()
        loop = asyncio.get_running_loop()
        self._worker_loop = loop

        async def _worker():
            while True:
                evt = await self._queue.get()
                if evt is None:
                    self._queue.task_done()
                    break
                try:
                    await self.publish(evt)
                finally:
                    self._queue.task_done()

        self._worker_task = asyncio.create_task(_worker())

    async def stop_worker(self) -> None:
        if self._worker_task and self._queue and self._worker_loop:
            await self._queue.put(None)
            await self._queue.join()
            await self._worker_task
            self._worker_task = None
            self._worker_loop = None

    def post(self, event: Event) -> None:
        with self._lock:
            if self._closed:
                raise EventBusClosedError()
            if self._queue is None:
                raise EventBusError("Worker queue not initialized. Call start_worker().")

            queue_ref = self._queue
            loop_ref = self._worker_loop

        if loop_ref and loop_ref.is_running():
            try:
                current_loop = asyncio.get_running_loop()
                if current_loop is loop_ref:
                    try:
                        queue_ref.put_nowait(event)
                        return
                    except asyncio.QueueFull:
                        raise EventBusQueueFullError()
            except RuntimeError:
                pass
            try:
                loop_ref.call_soon_threadsafe(queue_ref.put_nowait, event)
                return
            except asyncio.QueueFull:
                raise EventBusQueueFullError()
        else:
            raise EventBusError("Worker queue not initialized or loop not running. Call start_worker().")

    async def aclose(self) -> None:
        await self.stop_worker()
        with self._lock:
            self._closed = True
            self._subs.clear()

    def _handle_error(self, ex: EventBusError) -> None:
        if self._error_policy is ErrorPolicy.RAISE:
            raise ex
        if self._error_policy is ErrorPolicy.LOG:
            log.exception("%s", ex)

AutoSubscriberMixin

Mixin that auto-subscribes @subscribe-decorated methods to the EventBus.

Add this mixin to a @component class. During @configure, all methods carrying @subscribe metadata are registered with the :class:EventBus.

Source code in src/pico_ioc/event_bus.py
class AutoSubscriberMixin:
    """Mixin that auto-subscribes ``@subscribe``-decorated methods to the EventBus.

    Add this mixin to a ``@component`` class. During ``@configure``, all
    methods carrying ``@subscribe`` metadata are registered with the
    :class:`EventBus`.
    """

    @configure
    def _pico_autosubscribe(self, event_bus: EventBus) -> None:
        for _, attr in inspect.getmembers(self, predicate=callable):
            subs: Iterable[Tuple[Type[Event], int, ExecPolicy, bool]] = getattr(attr, "_pico_subscriptions_", ())
            for evt_t, pr, pol, once in subs:
                event_bus.subscribe(evt_t, attr, priority=pr, policy=pol, once=once)

subscribe(event_type, *, priority=0, policy=ExecPolicy.INLINE, once=False)

Decorator that marks a method for auto-subscription via AutoSubscriberMixin.

Apply to methods on a @component class that mixes in :class:AutoSubscriberMixin::

@component
class OrderHandler(AutoSubscriberMixin):
    @subscribe(OrderPlaced, priority=10)
    def on_order(self, event: OrderPlaced):
        print(f"Order {event.order_id} placed")

Parameters:

Name Type Description Default
event_type Type[Event]

The :class:Event subclass to subscribe to.

required
priority int

Higher-priority handlers run first. Default 0.

0
policy ExecPolicy

Execution policy for this handler.

INLINE
once bool

If True, the handler is unsubscribed after the first dispatch.

False

Returns:

Type Description

A decorator that attaches subscription metadata to the function.

Source code in src/pico_ioc/event_bus.py
def subscribe(
    event_type: Type[Event], *, priority: int = 0, policy: ExecPolicy = ExecPolicy.INLINE, once: bool = False
):
    """Decorator that marks a method for auto-subscription via ``AutoSubscriberMixin``.

    Apply to methods on a ``@component`` class that mixes in
    :class:`AutoSubscriberMixin`::

        @component
        class OrderHandler(AutoSubscriberMixin):
            @subscribe(OrderPlaced, priority=10)
            def on_order(self, event: OrderPlaced):
                print(f"Order {event.order_id} placed")

    Args:
        event_type: The :class:`Event` subclass to subscribe to.
        priority: Higher-priority handlers run first. Default ``0``.
        policy: Execution policy for this handler.
        once: If ``True``, the handler is unsubscribed after the first dispatch.

    Returns:
        A decorator that attaches subscription metadata to the function.
    """

    def dec(fn: Callable[[Event], Any] | Callable[[Event], Awaitable[Any]]):
        subs: Iterable[Tuple[Type[Event], int, ExecPolicy, bool]] = getattr(fn, "_pico_subscriptions_", ())
        subs = list(subs)
        subs.append((event_type, int(priority), policy, bool(once)))
        setattr(fn, "_pico_subscriptions_", tuple(subs))
        return fn

    return dec