"""In-memory event bus for local development and testing.""" import asyncio import logging from collections import defaultdict from collections.abc import Awaitable, Callable from .base import EventBus, EventPayload logger = logging.getLogger(__name__) class MemoryEventBus(EventBus): """In-memory event bus implementation for local development""" def __init__(self) -> None: self.handlers: dict[ str, list[Callable[[str, EventPayload], Awaitable[None]]] ] = defaultdict(list) self.running = False async def publish(self, topic: str, payload: EventPayload) -> bool: """Publish event to topic""" try: if not self.running: logger.warning( "Event bus not running, skipping publish to topic: %s", topic ) return False handlers = self.handlers.get(topic, []) if not handlers: logger.debug("No handlers for topic: %s", topic) return True # Execute all handlers concurrently tasks = [handler(topic, payload) for handler in handlers] await asyncio.gather(*tasks, return_exceptions=True) logger.debug( "Published event to topic %s with %d handlers", topic, len(handlers) ) return True except Exception as e: logger.error("Failed to publish event to topic %s: %s", topic, e) return False async def subscribe( self, topic: str, handler: Callable[[str, EventPayload], Awaitable[None]] ) -> None: """Subscribe to topic with handler""" self.handlers[topic].append(handler) logger.debug("Subscribed handler to topic: %s", topic) async def start(self) -> None: """Start the event bus""" self.running = True logger.info("Memory event bus started") async def stop(self) -> None: """Stop the event bus""" self.running = False self.handlers.clear() logger.info("Memory event bus stopped")