Files
harkon fdba81809f
Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
completed local setup with compose
2025-11-26 13:17:17 +00:00

272 lines
8.5 KiB
Python

"""Dead Letter Queue (DLQ) handler for failed event processing."""
import asyncio
import json
from datetime import UTC, datetime
from typing import Any
import structlog
from nats.js import JetStreamContext
from .base import EventPayload
logger = structlog.get_logger()
class DLQHandler:
"""
Dead Letter Queue handler for processing failed events.
Captures events that fail processing after max retries and stores them
in a separate NATS stream for manual review and retry.
"""
def __init__(
self,
js: JetStreamContext,
dlq_stream_name: str = "TAX_AGENT_DLQ",
max_retries: int = 3,
backoff_base_ms: int = 1000,
backoff_multiplier: float = 2.0,
backoff_max_ms: int = 30000,
):
"""
Initialize DLQ handler.
Args:
js: NATS JetStream context
dlq_stream_name: Name of the DLQ stream
max_retries: Maximum number of retry attempts
backoff_base_ms: Base backoff time in milliseconds
backoff_multiplier: Exponential backoff multiplier
backoff_max_ms: Maximum backoff time in milliseconds
"""
self.js = js
self.dlq_stream_name = dlq_stream_name
self.max_retries = max_retries
self.backoff_base_ms = backoff_base_ms
self.backoff_multiplier = backoff_multiplier
self.backoff_max_ms = backoff_max_ms
async def ensure_dlq_stream_exists(self) -> None:
"""Ensure DLQ stream exists in JetStream."""
try:
# Try to get stream info
await self.js.stream_info(self.dlq_stream_name)
logger.debug("DLQ stream already exists", stream=self.dlq_stream_name)
except Exception:
# Stream doesn't exist, create it
try:
await self.js.add_stream(
name=self.dlq_stream_name,
subjects=[f"{self.dlq_stream_name}.>"],
# Keep DLQ messages for 30 days
max_age=30 * 24 * 60 * 60, # 30 days in seconds
)
logger.info("Created DLQ stream", stream=self.dlq_stream_name)
except Exception as e:
logger.error(
"Failed to create DLQ stream",
stream=self.dlq_stream_name,
error=str(e),
)
raise
async def send_to_dlq(
self,
topic: str,
payload: EventPayload,
error: Exception,
retry_count: int,
original_message_data: bytes | None = None,
) -> None:
"""
Send failed event to DLQ.
Args:
topic: Original topic name
payload: Event payload
error: Exception that caused the failure
retry_count: Number of retry attempts made
original_message_data: Original message data (optional, for debugging)
"""
try:
# Create DLQ subject
dlq_subject = f"{self.dlq_stream_name}.{topic}"
# Create DLQ payload with metadata
dlq_payload = {
"original_topic": topic,
"original_payload": payload.to_dict(),
"error": {
"type": type(error).__name__,
"message": str(error),
},
"retry_count": retry_count,
"failed_at": datetime.now(UTC).isoformat(),
"tenant_id": payload.tenant_id,
"event_id": payload.event_id,
"trace_id": payload.trace_id,
}
# Add original message data if available
if original_message_data:
try:
dlq_payload["original_message_data"] = original_message_data.decode(
"utf-8"
)
except UnicodeDecodeError:
dlq_payload["original_message_data"] = "<binary data>"
# Publish to DLQ
headers = {
"original_topic": topic,
"tenant_id": payload.tenant_id,
"event_id": payload.event_id,
"error_type": type(error).__name__,
"retry_count": str(retry_count),
}
await self.js.publish(
subject=dlq_subject,
payload=json.dumps(dlq_payload).encode(),
headers=headers,
)
logger.error(
"Event sent to DLQ",
topic=topic,
event_id=payload.event_id,
error=str(error),
retry_count=retry_count,
dlq_subject=dlq_subject,
)
except Exception as dlq_error:
logger.critical(
"Failed to send event to DLQ - EVENT LOST",
topic=topic,
event_id=payload.event_id,
original_error=str(error),
dlq_error=str(dlq_error),
)
def calculate_backoff(self, retry_count: int) -> float:
"""
Calculate exponential backoff delay.
Args:
retry_count: Current retry attempt (0-indexed)
Returns:
Backoff delay in seconds
"""
# Calculate exponential backoff: base * (multiplier ^ retry_count)
backoff_ms = self.backoff_base_ms * (self.backoff_multiplier**retry_count)
# Cap at maximum backoff
backoff_ms = min(backoff_ms, self.backoff_max_ms)
# Convert to seconds
return backoff_ms / 1000.0
async def retry_with_backoff(
self,
func: Any,
*args: Any,
**kwargs: Any,
) -> tuple[bool, Exception | None]:
"""
Retry a function with exponential backoff.
Args:
func: Async function to retry
*args: Position arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Tuple of (success: bool, last_error: Exception | None)
"""
last_error: Exception | None = None
for attempt in range(self.max_retries + 1):
try:
await func(*args, **kwargs)
return (True, None)
except Exception as e: # pylint: disable=broad-exception-caught
last_error = e
if attempt < self.max_retries:
# Calculate and apply backoff
backoff_seconds = self.calculate_backoff(attempt)
logger.warning(
"Retry attempt failed, backing off",
attempt=attempt + 1,
max_retries=self.max_retries,
backoff_seconds=backoff_seconds,
error=str(e),
)
await asyncio.sleep(backoff_seconds)
else:
logger.error(
"All retry attempts exhausted",
attempts=self.max_retries + 1,
error=str(e),
)
return (False, last_error)
class DLQMetrics:
"""Metrics for DLQ operations."""
def __init__(self) -> None:
"""Initialize DLQ metrics."""
self.total_dlq_events = 0
self.dlq_events_by_topic: dict[str, int] = {}
self.dlq_events_by_error_type: dict[str, int] = {}
def record_dlq_event(self, topic: str, error_type: str) -> None:
"""
Record a DLQ event.
Args:
topic: Original topic name
error_type: Type of error that caused DLQ
"""
self.total_dlq_events += 1
# Track by topic
if topic not in self.dlq_events_by_topic:
self.dlq_events_by_topic[topic] = 0
self.dlq_events_by_topic[topic] += 1
# Track by error type
if error_type not in self.dlq_events_by_error_type:
self.dlq_events_by_error_type[error_type] = 0
self.dlq_events_by_error_type[error_type] += 1
def get_metrics(self) -> dict[str, Any]:
"""
Get DLQ metrics.
Returns:
Dictionary of metrics
"""
return {
"total_dlq_events": self.total_dlq_events,
"by_topic": self.dlq_events_by_topic.copy(),
"by_error_type": self.dlq_events_by_error_type.copy(),
}
def reset(self) -> None:
"""Reset all metrics to zero."""
self.total_dlq_events = 0
self.dlq_events_by_topic.clear()
self.dlq_events_by_error_type.clear()