Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
28 lines
1.1 KiB
Python
28 lines
1.1 KiB
Python
"""Factory function for creating event bus instances."""
|
|
|
|
from typing import Any
|
|
|
|
from .base import EventBus
|
|
from .nats_bus import NATSEventBus
|
|
|
|
|
|
def create_event_bus(bus_type: str, **kwargs: Any) -> EventBus:
|
|
"""Factory function to create event bus"""
|
|
if bus_type.lower() == "kafka":
|
|
# Lazy import to avoid ModuleNotFoundError when aiokafka is not installed
|
|
from .contrib.kafka_bus import KafkaEventBus
|
|
|
|
return KafkaEventBus(kwargs.get("bootstrap_servers", "localhost:9092"))
|
|
if bus_type.lower() == "sqs":
|
|
# Lazy import to avoid ModuleNotFoundError when boto3 is not installed
|
|
from .contrib.sqs_bus import SQSEventBus
|
|
|
|
return SQSEventBus(kwargs.get("region_name", "us-east-1"))
|
|
if bus_type.lower() == "nats":
|
|
return NATSEventBus(
|
|
servers=kwargs.get("servers", "nats://localhost:4222"),
|
|
stream_name=kwargs.get("stream_name", "TAX_AGENT_EVENTS"),
|
|
consumer_group=kwargs.get("consumer_group", "tax-agent"),
|
|
)
|
|
raise ValueError(f"Unsupported event bus type: {bus_type}")
|