"""Example usage of NATS.io event bus with JetStream.""" import asyncio import logging from libs.events import EventPayload, NATSEventBus # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def example_handler(topic: str, payload: EventPayload) -> None: """Example event handler.""" logger.info( f"Received event on topic '{topic}': " f"ID={payload.event_id}, " f"Actor={payload.actor}, " f"Data={payload.data}" ) async def main() -> None: """Main example function.""" # Method 1: Direct instantiation nats_bus = NATSEventBus( servers="nats://localhost:4222", # Can be a list for cluster stream_name="TAX_AGENT_EVENTS", consumer_group="tax-agent", ) # Method 2: Using factory # nats_bus = create_event_bus( # "nats", # servers="nats://localhost:4222", # stream_name="TAX_AGENT_EVENTS", # consumer_group="tax-agent", # ) try: # Start the event bus await nats_bus.start() logger.info("NATS event bus started") # Subscribe to a topic await nats_bus.subscribe("user.created", example_handler) await nats_bus.subscribe("user.updated", example_handler) logger.info("Subscribed to topics") # Publish some events for i in range(5): payload = EventPayload( data={"user_id": f"user-{i}", "name": f"User {i}"}, actor="system", tenant_id="tenant-123", trace_id=f"trace-{i}", ) success = await nats_bus.publish("user.created", payload) if success: logger.info(f"Published event {i}") else: logger.error(f"Failed to publish event {i}") # Wait a bit for messages to be processed await asyncio.sleep(2) # Publish an update event update_payload = EventPayload( data={ "user_id": "user-1", "name": "Updated User 1", "email": "user1@example.com", }, actor="admin", tenant_id="tenant-123", ) await nats_bus.publish("user.updated", update_payload) logger.info("Published update event") # Wait for processing await asyncio.sleep(2) except Exception as e: logger.error(f"Error in example: {e}") finally: # Stop the event bus await nats_bus.stop() logger.info("NATS event bus stopped") async def cluster_example() -> None: """Example with NATS cluster configuration.""" # Connect to a NATS cluster cluster_bus = NATSEventBus( servers=[ "nats://nats1.example.com:4222", "nats://nats2.example.com:4222", "nats://nats3.example.com:4222", ], stream_name="PRODUCTION_EVENTS", consumer_group="tax-agent-prod", ) try: await cluster_bus.start() logger.info("Connected to NATS cluster") # Subscribe to multiple topics topics = ["document.uploaded", "document.processed", "tax.calculated"] for topic in topics: await cluster_bus.subscribe(topic, example_handler) logger.info(f"Subscribed to {len(topics)} topics") # Keep running for a while await asyncio.sleep(10) finally: await cluster_bus.stop() async def error_handling_example() -> None: """Example showing error handling.""" async def failing_handler(topic: str, payload: EventPayload) -> None: """Handler that sometimes fails.""" if payload.data.get("should_fail"): raise ValueError("Simulated handler failure") logger.info(f"Successfully processed event {payload.event_id}") bus = NATSEventBus() try: await bus.start() await bus.subscribe("test.events", failing_handler) # Publish a good event good_payload = EventPayload( data={"message": "This will succeed"}, actor="test", tenant_id="test-tenant", ) await bus.publish("test.events", good_payload) # Publish a bad event bad_payload = EventPayload( data={"message": "This will fail", "should_fail": True}, actor="test", tenant_id="test-tenant", ) await bus.publish("test.events", bad_payload) await asyncio.sleep(2) finally: await bus.stop() if __name__ == "__main__": # Run the basic example asyncio.run(main()) # Uncomment to run other examples: # asyncio.run(cluster_example()) # asyncio.run(error_handling_example())