completed local setup with compose
Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled

This commit is contained in:
harkon
2025-11-26 13:17:17 +00:00
parent 8fe5e62fee
commit fdba81809f
87 changed files with 5610 additions and 3376 deletions

View File

@@ -1,20 +1,52 @@
"""Event-driven architecture with Kafka, SQS, NATS, and Memory support."""
from libs.schemas.events import (
EVENT_SCHEMA_MAP,
BaseEventData,
CalculationReadyEventData,
DocumentExtractedEventData,
DocumentIngestedEventData,
DocumentOCRReadyEventData,
FirmSyncCompletedEventData,
FormFilledEventData,
HMRCSubmittedEventData,
KGUpsertedEventData,
KGUpsertReadyEventData,
RAGIndexedEventData,
ReviewCompletedEventData,
ReviewRequestedEventData,
get_schema_for_topic,
validate_event_data,
)
from .base import EventBus, EventPayload
from .factory import create_event_bus
from .kafka_bus import KafkaEventBus
from .memory_bus import MemoryEventBus
from .nats_bus import NATSEventBus
from .sqs_bus import SQSEventBus
from .topics import EventTopics
__all__ = [
"EventPayload",
"EventBus",
"KafkaEventBus",
"MemoryEventBus",
"NATSEventBus",
"SQSEventBus",
"create_event_bus",
"EventTopics",
# Event schemas
"BaseEventData",
"DocumentIngestedEventData",
"DocumentOCRReadyEventData",
"DocumentExtractedEventData",
"KGUpsertReadyEventData",
"KGUpsertedEventData",
"RAGIndexedEventData",
"CalculationReadyEventData",
"FormFilledEventData",
"HMRCSubmittedEventData",
"ReviewRequestedEventData",
"ReviewCompletedEventData",
"FirmSyncCompletedEventData",
"EVENT_SCHEMA_MAP",
"validate_event_data",
"get_schema_for_topic",
]

View File

@@ -3,7 +3,7 @@
import json
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from datetime import datetime
from datetime import UTC, datetime
from typing import Any
import ulid
@@ -22,7 +22,7 @@ class EventPayload:
schema_version: str = "1.0",
):
self.event_id = str(ulid.new())
self.occurred_at = datetime.utcnow().isoformat() + "Z"
self.occurred_at = datetime.now(UTC).isoformat()
self.actor = actor
self.tenant_id = tenant_id
self.trace_id = trace_id

View File

@@ -7,7 +7,7 @@ from collections.abc import Awaitable, Callable
import structlog
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer # type: ignore
from .base import EventBus, EventPayload
from ..base import EventBus, EventPayload
logger = structlog.get_logger()

View File

@@ -9,7 +9,7 @@ import boto3 # type: ignore
import structlog
from botocore.exceptions import ClientError # type: ignore
from .base import EventBus, EventPayload
from ..base import EventBus, EventPayload
logger = structlog.get_logger()

271
libs/events/dlq.py Normal file
View File

@@ -0,0 +1,271 @@
"""Dead Letter Queue (DLQ) handler for failed event processing."""
import asyncio
import json
from datetime import UTC, datetime
from typing import Any
import structlog
from nats.js import JetStreamContext
from .base import EventPayload
logger = structlog.get_logger()
class DLQHandler:
"""
Dead Letter Queue handler for processing failed events.
Captures events that fail processing after max retries and stores them
in a separate NATS stream for manual review and retry.
"""
def __init__(
self,
js: JetStreamContext,
dlq_stream_name: str = "TAX_AGENT_DLQ",
max_retries: int = 3,
backoff_base_ms: int = 1000,
backoff_multiplier: float = 2.0,
backoff_max_ms: int = 30000,
):
"""
Initialize DLQ handler.
Args:
js: NATS JetStream context
dlq_stream_name: Name of the DLQ stream
max_retries: Maximum number of retry attempts
backoff_base_ms: Base backoff time in milliseconds
backoff_multiplier: Exponential backoff multiplier
backoff_max_ms: Maximum backoff time in milliseconds
"""
self.js = js
self.dlq_stream_name = dlq_stream_name
self.max_retries = max_retries
self.backoff_base_ms = backoff_base_ms
self.backoff_multiplier = backoff_multiplier
self.backoff_max_ms = backoff_max_ms
async def ensure_dlq_stream_exists(self) -> None:
"""Ensure DLQ stream exists in JetStream."""
try:
# Try to get stream info
await self.js.stream_info(self.dlq_stream_name)
logger.debug("DLQ stream already exists", stream=self.dlq_stream_name)
except Exception:
# Stream doesn't exist, create it
try:
await self.js.add_stream(
name=self.dlq_stream_name,
subjects=[f"{self.dlq_stream_name}.>"],
# Keep DLQ messages for 30 days
max_age=30 * 24 * 60 * 60, # 30 days in seconds
)
logger.info("Created DLQ stream", stream=self.dlq_stream_name)
except Exception as e:
logger.error(
"Failed to create DLQ stream",
stream=self.dlq_stream_name,
error=str(e),
)
raise
async def send_to_dlq(
self,
topic: str,
payload: EventPayload,
error: Exception,
retry_count: int,
original_message_data: bytes | None = None,
) -> None:
"""
Send failed event to DLQ.
Args:
topic: Original topic name
payload: Event payload
error: Exception that caused the failure
retry_count: Number of retry attempts made
original_message_data: Original message data (optional, for debugging)
"""
try:
# Create DLQ subject
dlq_subject = f"{self.dlq_stream_name}.{topic}"
# Create DLQ payload with metadata
dlq_payload = {
"original_topic": topic,
"original_payload": payload.to_dict(),
"error": {
"type": type(error).__name__,
"message": str(error),
},
"retry_count": retry_count,
"failed_at": datetime.now(UTC).isoformat(),
"tenant_id": payload.tenant_id,
"event_id": payload.event_id,
"trace_id": payload.trace_id,
}
# Add original message data if available
if original_message_data:
try:
dlq_payload["original_message_data"] = original_message_data.decode(
"utf-8"
)
except UnicodeDecodeError:
dlq_payload["original_message_data"] = "<binary data>"
# Publish to DLQ
headers = {
"original_topic": topic,
"tenant_id": payload.tenant_id,
"event_id": payload.event_id,
"error_type": type(error).__name__,
"retry_count": str(retry_count),
}
await self.js.publish(
subject=dlq_subject,
payload=json.dumps(dlq_payload).encode(),
headers=headers,
)
logger.error(
"Event sent to DLQ",
topic=topic,
event_id=payload.event_id,
error=str(error),
retry_count=retry_count,
dlq_subject=dlq_subject,
)
except Exception as dlq_error:
logger.critical(
"Failed to send event to DLQ - EVENT LOST",
topic=topic,
event_id=payload.event_id,
original_error=str(error),
dlq_error=str(dlq_error),
)
def calculate_backoff(self, retry_count: int) -> float:
"""
Calculate exponential backoff delay.
Args:
retry_count: Current retry attempt (0-indexed)
Returns:
Backoff delay in seconds
"""
# Calculate exponential backoff: base * (multiplier ^ retry_count)
backoff_ms = self.backoff_base_ms * (self.backoff_multiplier**retry_count)
# Cap at maximum backoff
backoff_ms = min(backoff_ms, self.backoff_max_ms)
# Convert to seconds
return backoff_ms / 1000.0
async def retry_with_backoff(
self,
func: Any,
*args: Any,
**kwargs: Any,
) -> tuple[bool, Exception | None]:
"""
Retry a function with exponential backoff.
Args:
func: Async function to retry
*args: Position arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Tuple of (success: bool, last_error: Exception | None)
"""
last_error: Exception | None = None
for attempt in range(self.max_retries + 1):
try:
await func(*args, **kwargs)
return (True, None)
except Exception as e: # pylint: disable=broad-exception-caught
last_error = e
if attempt < self.max_retries:
# Calculate and apply backoff
backoff_seconds = self.calculate_backoff(attempt)
logger.warning(
"Retry attempt failed, backing off",
attempt=attempt + 1,
max_retries=self.max_retries,
backoff_seconds=backoff_seconds,
error=str(e),
)
await asyncio.sleep(backoff_seconds)
else:
logger.error(
"All retry attempts exhausted",
attempts=self.max_retries + 1,
error=str(e),
)
return (False, last_error)
class DLQMetrics:
"""Metrics for DLQ operations."""
def __init__(self) -> None:
"""Initialize DLQ metrics."""
self.total_dlq_events = 0
self.dlq_events_by_topic: dict[str, int] = {}
self.dlq_events_by_error_type: dict[str, int] = {}
def record_dlq_event(self, topic: str, error_type: str) -> None:
"""
Record a DLQ event.
Args:
topic: Original topic name
error_type: Type of error that caused DLQ
"""
self.total_dlq_events += 1
# Track by topic
if topic not in self.dlq_events_by_topic:
self.dlq_events_by_topic[topic] = 0
self.dlq_events_by_topic[topic] += 1
# Track by error type
if error_type not in self.dlq_events_by_error_type:
self.dlq_events_by_error_type[error_type] = 0
self.dlq_events_by_error_type[error_type] += 1
def get_metrics(self) -> dict[str, Any]:
"""
Get DLQ metrics.
Returns:
Dictionary of metrics
"""
return {
"total_dlq_events": self.total_dlq_events,
"by_topic": self.dlq_events_by_topic.copy(),
"by_error_type": self.dlq_events_by_error_type.copy(),
}
def reset(self) -> None:
"""Reset all metrics to zero."""
self.total_dlq_events = 0
self.dlq_events_by_topic.clear()
self.dlq_events_by_error_type.clear()

View File

@@ -3,16 +3,20 @@
from typing import Any
from .base import EventBus
from .kafka_bus import KafkaEventBus
from .nats_bus import NATSEventBus
from .sqs_bus import SQSEventBus
def create_event_bus(bus_type: str, **kwargs: Any) -> EventBus:
"""Factory function to create event bus"""
if bus_type.lower() == "kafka":
# Lazy import to avoid ModuleNotFoundError when aiokafka is not installed
from .contrib.kafka_bus import KafkaEventBus
return KafkaEventBus(kwargs.get("bootstrap_servers", "localhost:9092"))
if bus_type.lower() == "sqs":
# Lazy import to avoid ModuleNotFoundError when boto3 is not installed
from .contrib.sqs_bus import SQSEventBus
return SQSEventBus(kwargs.get("region_name", "us-east-1"))
if bus_type.lower() == "nats":
return NATSEventBus(

225
libs/events/metrics.py Normal file
View File

@@ -0,0 +1,225 @@
"""Prometheus metrics for event bus monitoring."""
from prometheus_client import Counter, Histogram
from prometheus_client.registry import CollectorRegistry
# Global registry for event metrics
_event_registry = CollectorRegistry()
# Event publishing metrics
event_published_total = Counter(
"event_published_total",
"Total number of events published",
["topic"],
registry=_event_registry,
)
event_publish_errors_total = Counter(
"event_publish_errors_total",
"Total number of event publishing errors",
["topic", "error_type"],
registry=_event_registry,
)
event_publishing_duration_seconds = Histogram(
"event_publishing_duration_seconds",
"Time spent publishing events in seconds",
["topic"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
registry=_event_registry,
)
# Event consumption metrics
event_consumed_total = Counter(
"event_consumed_total",
"Total number of events consumed",
["topic", "consumer_group"],
registry=_event_registry,
)
event_processing_duration_seconds = Histogram(
"event_processing_duration_seconds",
"Time spent processing events in seconds",
["topic", "consumer_group"],
buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0),
registry=_event_registry,
)
event_processing_errors_total = Counter(
"event_processing_errors_total",
"Total number of event processing errors",
["topic", "consumer_group", "error_type"],
registry=_event_registry,
)
# DLQ metrics
event_dlq_total = Counter(
"event_dlq_total",
"Total number of events sent to dead letter queue",
["topic", "error_type"],
registry=_event_registry,
)
event_retry_total = Counter(
"event_retry_total",
"Total number of event retry attempts",
["topic", "retry_attempt"],
registry=_event_registry,
)
# Schema validation metrics
event_schema_validation_errors_total = Counter(
"event_schema_validation_errors_total",
"Total number of event schema validation errors",
["topic", "validation_error"],
registry=_event_registry,
)
# NATS JetStream specific metrics
nats_stream_messages_total = Counter(
"nats_stream_messages_total",
"Total messages in NATS stream",
["stream_name"],
registry=_event_registry,
)
nats_consumer_lag_messages = Histogram(
"nats_consumer_lag_messages",
"Number of messages consumer is lagging behind",
["stream_name", "consumer_group"],
buckets=(0, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 5000, 10000),
registry=_event_registry,
)
def get_event_metrics_registry() -> CollectorRegistry:
"""
Get the Prometheus registry for event metrics.
Returns:
CollectorRegistry for event metrics
"""
return _event_registry
class EventMetricsCollector:
"""Helper class for collecting event metrics."""
@staticmethod
def record_publish(
topic: str,
duration_seconds: float,
success: bool = True,
error_type: str | None = None,
) -> None:
"""
Record event publishing metrics.
Args:
topic: Event topic name
duration_seconds: Time taken to publish
success: Whether publishing succeeded
error_type: Type of error if failed
"""
if success:
event_published_total.labels(topic=topic).inc()
else:
event_publish_errors_total.labels(
topic=topic, error_type=error_type or "unknown"
).inc()
event_publishing_duration_seconds.labels(topic=topic).observe(duration_seconds)
@staticmethod
def record_consume(
topic: str,
consumer_group: str,
duration_seconds: float,
success: bool = True,
error_type: str | None = None,
) -> None:
"""
Record event consumption metrics.
Args:
topic: Event topic name
consumer_group: Consumer group name
duration_seconds: Time taken to process event
success: Whether processing succeeded
error_type: Type of error if failed
"""
if success:
event_consumed_total.labels(
topic=topic, consumer_group=consumer_group
).inc()
else:
event_processing_errors_total.labels(
topic=topic,
consumer_group=consumer_group,
error_type=error_type or "unknown",
).inc()
event_processing_duration_seconds.labels(
topic=topic, consumer_group=consumer_group
).observe(duration_seconds)
@staticmethod
def record_dlq(topic: str, error_type: str) -> None:
"""
Record event sent to DLQ.
Args:
topic: Event topic name
error_type: Type of error that caused DLQ
"""
event_dlq_total.labels(topic=topic, error_type=error_type).inc()
@staticmethod
def record_retry(topic: str, retry_attempt: int) -> None:
"""
Record event retry attempt.
Args:
topic: Event topic name
retry_attempt: Retry attempt number (1-indexed)
"""
event_retry_total.labels(topic=topic, retry_attempt=str(retry_attempt)).inc()
@staticmethod
def record_schema_validation_error(topic: str, validation_error: str) -> None:
"""
Record schema validation error.
Args:
topic: Event topic name
validation_error: Type of validation error
"""
event_schema_validation_errors_total.labels(
topic=topic, validation_error=validation_error
).inc()
@staticmethod
def record_nats_stream_message(stream_name: str) -> None:
"""
Record message added to NATS stream.
Args:
stream_name: NATS stream name
"""
nats_stream_messages_total.labels(stream_name=stream_name).inc()
@staticmethod
def record_consumer_lag(
stream_name: str, consumer_group: str, lag_messages: int
) -> None:
"""
Record consumer lag.
Args:
stream_name: NATS stream name
consumer_group: Consumer group name
lag_messages: Number of messages consumer is behind
"""
nats_consumer_lag_messages.labels(
stream_name=stream_name, consumer_group=consumer_group
).observe(lag_messages)

View File

@@ -2,6 +2,7 @@
import asyncio
import json
import time
from collections.abc import Awaitable, Callable
from typing import Any
@@ -12,6 +13,8 @@ from nats.js import JetStreamContext
from nats.js.api import AckPolicy, ConsumerConfig, DeliverPolicy
from .base import EventBus, EventPayload
from .dlq import DLQHandler
from .metrics import EventMetricsCollector
logger = structlog.get_logger()
@@ -24,6 +27,8 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
servers: str | list[str] = "nats://localhost:4222",
stream_name: str = "TAX_AGENT_EVENTS",
consumer_group: str = "tax-agent",
dlq_stream_name: str = "TAX_AGENT_DLQ",
max_retries: int = 3,
):
if isinstance(servers, str):
self.servers = [servers]
@@ -32,8 +37,13 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
self.stream_name = stream_name
self.consumer_group = consumer_group
self.dlq_stream_name = dlq_stream_name
self.max_retries = max_retries
self.nc: NATS | None = None
self.js: JetStreamContext | None = None
self.dlq: DLQHandler | None = None
self.handlers: dict[
str, list[Callable[[str, EventPayload], Awaitable[None]]]
] = {}
@@ -48,19 +58,32 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
try:
# Connect to NATS
self.nc = await nats.connect(servers=self.servers)
self.nc = await nats.connect(
servers=self.servers,
connect_timeout=10,
reconnect_time_wait=1,
)
# Get JetStream context
self.js = self.nc.jetstream()
self.js = self.nc.jetstream(timeout=10)
# Ensure stream exists
# Initialize DLQ handler
self.dlq = DLQHandler(
js=self.js,
dlq_stream_name=self.dlq_stream_name,
max_retries=self.max_retries,
)
# Ensure streams exist
await self._ensure_stream_exists()
await self.dlq.ensure_dlq_stream_exists()
self.running = True
logger.info(
"NATS event bus started",
servers=self.servers,
stream=self.stream_name,
dlq_stream=self.dlq_stream_name,
)
except Exception as e:
@@ -98,6 +121,7 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
if not self.js:
raise RuntimeError("Event bus not started")
start_time = time.perf_counter()
try:
# Create subject name from topic
subject = f"{self.stream_name}.{topic}"
@@ -117,6 +141,13 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
headers=headers,
)
duration = time.perf_counter() - start_time
EventMetricsCollector.record_publish(
topic=topic,
duration_seconds=duration,
success=True,
)
logger.info(
"Event published",
topic=topic,
@@ -127,6 +158,14 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
return True
except Exception as e: # pylint: disable=broad-exception-caught
duration = time.perf_counter() - start_time
EventMetricsCollector.record_publish(
topic=topic,
duration_seconds=duration,
success=False,
error_type=type(e).__name__,
)
logger.error(
"Failed to publish event",
topic=topic,
@@ -152,9 +191,13 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
subject = f"{self.stream_name}.{topic}"
# Create durable consumer
consumer_name = f"{self.consumer_group}-{topic}"
# Durable names cannot contain dots, so we replace them
safe_topic = topic.replace(".", "-")
consumer_name = f"{self.consumer_group}-{safe_topic}"
# Subscribe with pull-based consumer
# Set max_deliver to max_retries + 1 (initial + retries)
# We handle DLQ manually before NATS gives up
subscription = await self.js.pull_subscribe(
subject=subject,
durable=consumer_name,
@@ -162,7 +205,7 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
durable_name=consumer_name,
ack_policy=AckPolicy.EXPLICIT,
deliver_policy=DeliverPolicy.NEW,
max_deliver=3,
max_deliver=self.max_retries + 2, # Give us room to handle DLQ
ack_wait=30, # 30 seconds
),
)
@@ -193,13 +236,14 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
# Try to get stream info
await self.js.stream_info(self.stream_name)
logger.debug("Stream already exists", stream=self.stream_name)
EventMetricsCollector.record_nats_stream_message(self.stream_name)
except Exception:
# Stream doesn't exist, create it
try:
await self.js.add_stream(
name=self.stream_name,
subjects=[f"{self.stream_name}.*"],
subjects=[f"{self.stream_name}.>"],
)
logger.info("Created JetStream stream", stream=self.stream_name)
@@ -214,12 +258,17 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
while self.running:
try:
# Fetch messages in batches
messages = await subscription.fetch(batch=10, timeout=20)
messages = await subscription.fetch(batch=10, timeout=5)
for message in messages:
start_time = time.perf_counter()
payload = None
try:
print(f"DEBUG: Received message: {message.data}")
# Parse message payload
payload_dict = json.loads(message.data.decode())
print(f"DEBUG: Parsed payload: {payload_dict}")
payload = EventPayload(
data=payload_dict["data"],
@@ -230,38 +279,87 @@ class NATSEventBus(EventBus): # pylint: disable=too-many-instance-attributes
)
payload.event_id = payload_dict["event_id"]
payload.occurred_at = payload_dict["occurred_at"]
print(f"DEBUG: Reconstructed payload: {payload.event_id}")
# Call all handlers for this topic
for handler in self.handlers.get(topic, []):
try:
await handler(topic, payload)
except (
Exception
) as e: # pylint: disable=broad-exception-caught
logger.error(
"Handler failed",
topic=topic,
event_id=payload.event_id,
error=str(e),
)
print(f"DEBUG: Calling handler for topic {topic}")
await handler(topic, payload)
# Acknowledge message
await message.ack()
print("DEBUG: Message acked")
except json.JSONDecodeError as e:
logger.error(
"Failed to decode message", topic=topic, error=str(e)
# Record metrics
duration = time.perf_counter() - start_time
EventMetricsCollector.record_consume(
topic=topic,
consumer_group=self.consumer_group,
duration_seconds=duration,
success=True,
)
await message.nak()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(
"Failed to process message", topic=topic, error=str(e)
duration = time.perf_counter() - start_time
error_type = type(e).__name__
# Record failure metric
EventMetricsCollector.record_consume(
topic=topic,
consumer_group=self.consumer_group,
duration_seconds=duration,
success=False,
error_type=error_type,
)
await message.nak()
# Check delivery count for DLQ
try:
metadata = message.metadata
num_delivered = (
metadata.sequence.consumer
) # This might be wrong, check docs
# Actually nats-py MsgMetadata has num_delivered
num_delivered = metadata.num_delivered
except Exception:
num_delivered = 1
if num_delivered >= self.max_retries:
logger.error(
"Max retries exceeded, sending to DLQ",
topic=topic,
event_id=payload.event_id if payload else "unknown",
error=str(e),
num_delivered=num_delivered,
)
if self.dlq and payload:
await self.dlq.send_to_dlq(
topic=topic,
payload=payload,
error=e,
retry_count=num_delivered,
original_message_data=message.data,
)
EventMetricsCollector.record_dlq(topic, error_type)
# Ack to remove from main stream
await message.ack()
else:
# Retry (Nak)
logger.warning(
"Processing failed, retrying",
topic=topic,
event_id=payload.event_id if payload else "unknown",
error=str(e),
attempt=num_delivered,
)
EventMetricsCollector.record_retry(topic, num_delivered)
await message.nak()
except TimeoutError:
# No messages available, continue polling
continue
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Consumer error", topic=topic, error=str(e))
await asyncio.sleep(5) # Wait before retrying
await asyncio.sleep(1) # Wait before retrying

View File

@@ -7,6 +7,7 @@ class EventTopics: # pylint: disable=too-few-public-methods
DOC_INGESTED = "doc.ingested"
DOC_OCR_READY = "doc.ocr_ready"
DOC_EXTRACTED = "doc.extracted"
KG_UPSERT_READY = "kg.upsert.ready"
KG_UPSERTED = "kg.upserted"
RAG_INDEXED = "rag.indexed"
CALC_SCHEDULE_READY = "calc.schedule_ready"