Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
65 lines
2.1 KiB
Python
65 lines
2.1 KiB
Python
"""In-memory event bus for local development and testing."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from collections import defaultdict
|
|
from collections.abc import Awaitable, Callable
|
|
|
|
from .base import EventBus, EventPayload
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MemoryEventBus(EventBus):
|
|
"""In-memory event bus implementation for local development"""
|
|
|
|
def __init__(self) -> None:
|
|
self.handlers: dict[
|
|
str, list[Callable[[str, EventPayload], Awaitable[None]]]
|
|
] = defaultdict(list)
|
|
self.running = False
|
|
|
|
async def publish(self, topic: str, payload: EventPayload) -> bool:
|
|
"""Publish event to topic"""
|
|
try:
|
|
if not self.running:
|
|
logger.warning(
|
|
"Event bus not running, skipping publish to topic: %s", topic
|
|
)
|
|
return False
|
|
|
|
handlers = self.handlers.get(topic, [])
|
|
if not handlers:
|
|
logger.debug("No handlers for topic: %s", topic)
|
|
return True
|
|
|
|
# Execute all handlers concurrently
|
|
tasks = [handler(topic, payload) for handler in handlers]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
logger.debug(
|
|
"Published event to topic %s with %d handlers", topic, len(handlers)
|
|
)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to publish event to topic %s: %s", topic, e)
|
|
return False
|
|
|
|
async def subscribe(
|
|
self, topic: str, handler: Callable[[str, EventPayload], Awaitable[None]]
|
|
) -> None:
|
|
"""Subscribe to topic with handler"""
|
|
self.handlers[topic].append(handler)
|
|
logger.debug("Subscribed handler to topic: %s", topic)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the event bus"""
|
|
self.running = True
|
|
logger.info("Memory event bus started")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the event bus"""
|
|
self.running = False
|
|
self.handlers.clear()
|
|
logger.info("Memory event bus stopped")
|