Files
ai-tax-agent/libs/events/memory_bus.py
harkon b324ff09ef
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
Initial commit
2025-10-11 08:41:36 +01:00

65 lines
2.1 KiB
Python

"""In-memory event bus for local development and testing."""
import asyncio
import logging
from collections import defaultdict
from collections.abc import Awaitable, Callable
from .base import EventBus, EventPayload
logger = logging.getLogger(__name__)
class MemoryEventBus(EventBus):
"""In-memory event bus implementation for local development"""
def __init__(self) -> None:
self.handlers: dict[
str, list[Callable[[str, EventPayload], Awaitable[None]]]
] = defaultdict(list)
self.running = False
async def publish(self, topic: str, payload: EventPayload) -> bool:
"""Publish event to topic"""
try:
if not self.running:
logger.warning(
"Event bus not running, skipping publish to topic: %s", topic
)
return False
handlers = self.handlers.get(topic, [])
if not handlers:
logger.debug("No handlers for topic: %s", topic)
return True
# Execute all handlers concurrently
tasks = [handler(topic, payload) for handler in handlers]
await asyncio.gather(*tasks, return_exceptions=True)
logger.debug(
"Published event to topic %s with %d handlers", topic, len(handlers)
)
return True
except Exception as e:
logger.error("Failed to publish event to topic %s: %s", topic, e)
return False
async def subscribe(
self, topic: str, handler: Callable[[str, EventPayload], Awaitable[None]]
) -> None:
"""Subscribe to topic with handler"""
self.handlers[topic].append(handler)
logger.debug("Subscribed handler to topic: %s", topic)
async def start(self) -> None:
"""Start the event bus"""
self.running = True
logger.info("Memory event bus started")
async def stop(self) -> None:
"""Stop the event bus"""
self.running = False
self.handlers.clear()
logger.info("Memory event bus stopped")