# NATS.io Event Bus with JetStream This document describes the NATS.io event bus implementation with JetStream support for the AI Tax Agent project. ## Overview The `NATSEventBus` class provides a robust, scalable event streaming solution using NATS.io with JetStream for persistent messaging. It implements the same `EventBus` interface as other event bus implementations (Kafka, SQS, Memory) for consistency. ## Features - **JetStream Integration**: Uses NATS JetStream for persistent, reliable message delivery - **Automatic Stream Management**: Creates and manages JetStream streams automatically - **Pull-based Consumers**: Uses pull-based consumers for better flow control - **Cluster Support**: Supports NATS cluster configurations for high availability - **Error Handling**: Comprehensive error handling with automatic retries - **Message Acknowledgment**: Explicit message acknowledgment with configurable retry policies - **Durable Consumers**: Creates durable consumers for guaranteed message processing ## Configuration ### Basic Configuration ```python from libs.events import NATSEventBus # Single server bus = NATSEventBus( servers="nats://localhost:4222", stream_name="TAX_AGENT_EVENTS", consumer_group="tax-agent" ) # Multiple servers (cluster) bus = NATSEventBus( servers=[ "nats://nats1.example.com:4222", "nats://nats2.example.com:4222", "nats://nats3.example.com:4222" ], stream_name="PRODUCTION_EVENTS", consumer_group="tax-agent-prod" ) ``` ### Factory Configuration ```python from libs.events import create_event_bus bus = create_event_bus( "nats", servers="nats://localhost:4222", stream_name="TAX_AGENT_EVENTS", consumer_group="tax-agent" ) ``` ## Usage ### Publishing Events ```python from libs.events import EventPayload # Create event payload payload = EventPayload( data={"user_id": "123", "action": "login"}, actor="user-service", tenant_id="tenant-456", trace_id="trace-789" ) # Publish event success = await bus.publish("user.login", payload) if success: print("Event published successfully") ``` ### Subscribing to Events ```python async def handle_user_login(topic: str, payload: EventPayload) -> None: print(f"User {payload.data['user_id']} logged in") # Process the event... # Subscribe to topic await bus.subscribe("user.login", handle_user_login) ``` ### Complete Example ```python import asyncio from libs.events import NATSEventBus, EventPayload async def main(): bus = NATSEventBus() try: # Start the bus await bus.start() # Subscribe to events await bus.subscribe("user.created", handle_user_created) # Publish an event payload = EventPayload( data={"user_id": "123", "email": "user@example.com"}, actor="registration-service", tenant_id="tenant-456" ) await bus.publish("user.created", payload) # Wait for processing await asyncio.sleep(1) finally: await bus.stop() asyncio.run(main()) ``` ## JetStream Configuration The NATS event bus automatically creates and configures JetStream streams with the following settings: - **Retention Policy**: Work Queue (messages are removed after acknowledgment) - **Max Age**: 7 days (messages older than 7 days are automatically deleted) - **Storage**: File-based storage for persistence - **Subject Pattern**: `{stream_name}.*` (e.g., `TAX_AGENT_EVENTS.*`) ### Consumer Configuration - **Durable Consumers**: Each topic subscription creates a durable consumer - **Ack Policy**: Explicit acknowledgment required - **Deliver Policy**: New messages only (doesn't replay old messages) - **Max Deliver**: 3 attempts before message is considered failed - **Ack Wait**: 30 seconds timeout for acknowledgment ## Error Handling The NATS event bus includes comprehensive error handling: ### Publishing Errors - Network failures are logged and return `False` - Automatic retry logic can be implemented at the application level ### Consumer Errors - Handler exceptions are caught and logged - Failed messages are negatively acknowledged (NAK) for retry - Messages that fail multiple times are moved to a dead letter queue (if configured) ### Connection Errors - Automatic reconnection is handled by the NATS client - Consumer tasks are gracefully shut down on connection loss ## Monitoring and Observability The implementation includes structured logging with the following information: - Event publishing success/failure - Consumer subscription status - Message processing metrics - Error details and stack traces ### Log Examples ``` INFO: Event published topic=user.created event_id=01HK... stream_seq=123 INFO: Subscribed to topic topic=user.login consumer=tax-agent-user.login ERROR: Handler failed topic=user.created event_id=01HK... error=... ``` ## Performance Considerations ### Throughput - Pull-based consumers allow for controlled message processing - Batch fetching (up to 10 messages per fetch) improves throughput - Async processing enables high concurrency ### Memory Usage - File-based storage keeps memory usage low - Configurable message retention prevents unbounded growth ### Network Efficiency - Binary protocol with minimal overhead - Connection pooling and reuse - Efficient subject-based routing ## Deployment ### Docker Compose Example ```yaml services: nats: image: nats:2.10-alpine ports: - "4222:4222" - "8222:8222" command: - "--jetstream" - "--store_dir=/data" - "--http_port=8222" volumes: - nats_data:/data volumes: nats_data: ``` ### Kubernetes Example ```yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: nats spec: serviceName: nats replicas: 3 selector: matchLabels: app: nats template: metadata: labels: app: nats spec: containers: - name: nats image: nats:2.10-alpine args: - "--cluster_name=nats-cluster" - "--jetstream" - "--store_dir=/data" ports: - containerPort: 4222 - containerPort: 6222 - containerPort: 8222 volumeMounts: - name: nats-storage mountPath: /data volumeClaimTemplates: - metadata: name: nats-storage spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 10Gi ``` ## Dependencies The NATS event bus requires the following Python package: ``` nats-py>=2.6.0 ``` This is automatically included in `libs/requirements.txt`. ## Comparison with Other Event Buses | Feature | NATS | Kafka | SQS | |---------|------|-------|-----| | Setup Complexity | Low | Medium | Low | | Throughput | High | Very High | Medium | | Latency | Very Low | Low | Medium | | Persistence | Yes (JetStream) | Yes | Yes | | Ordering | Per Subject | Per Partition | FIFO Queues | | Clustering | Built-in | Built-in | Managed | | Operational Overhead | Low | High | None | ## Best Practices 1. **Use meaningful subject names**: Follow a hierarchical naming convention (e.g., `service.entity.action`) 2. **Handle failures gracefully**: Implement proper error handling in event handlers 3. **Monitor consumer lag**: Track message processing delays 4. **Use appropriate retention**: Configure message retention based on business requirements 5. **Test failure scenarios**: Verify behavior during network partitions and service failures