Files
ai-tax-agent/tests/unit/test_dlq.py
harkon fdba81809f
Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
completed local setup with compose
2025-11-26 13:17:17 +00:00

318 lines
10 KiB
Python

"""Tests for Dead Letter Queue (DLQ) handler."""
import json
from unittest.mock import AsyncMock, patch
import pytest
from libs.events.base import EventPayload
from libs.events.dlq import DLQHandler, DLQMetrics
@pytest.fixture
def event_payload():
"""Create a test event payload."""
return EventPayload(
data={"test": "data", "value": 123},
actor="test-user",
tenant_id="test-tenant",
trace_id="test-trace-123",
schema_version="1.0",
)
@pytest.fixture
def mock_js():
"""Create a mock JetStream context."""
js = AsyncMock()
js.stream_info = AsyncMock()
js.add_stream = AsyncMock()
js.publish = AsyncMock()
return js
class TestDLQHandler:
"""Test cases for DLQ handler."""
@pytest.mark.asyncio
async def test_initialization(self, mock_js):
"""Test DLQ handler initialization."""
handler = DLQHandler(
js=mock_js,
dlq_stream_name="TEST_DLQ",
max_retries=5,
backoff_base_ms=500,
)
assert handler.js == mock_js
assert handler.dlq_stream_name == "TEST_DLQ"
assert handler.max_retries == 5
assert handler.backoff_base_ms == 500
@pytest.mark.asyncio
async def test_ensure_dlq_stream_exists_already_exists(self, mock_js):
"""Test ensuring DLQ stream when it already exists."""
mock_js.stream_info.return_value = {"name": "TEST_DLQ"}
handler = DLQHandler(js=mock_js, dlq_stream_name="TEST_DLQ")
await handler.ensure_dlq_stream_exists()
mock_js.stream_info.assert_called_once_with("TEST_DLQ")
mock_js.add_stream.assert_not_called()
@pytest.mark.asyncio
async def test_ensure_dlq_stream_creates_stream(self, mock_js):
"""Test ensuring DLQ stream when it doesn't exist."""
from nats.js.errors import NotFoundError
mock_js.stream_info.side_effect = NotFoundError
mock_js.add_stream = AsyncMock()
handler = DLQHandler(js=mock_js, dlq_stream_name="TEST_DLQ")
await handler.ensure_dlq_stream_exists()
mock_js.add_stream.assert_called_once()
call_kwargs = mock_js.add_stream.call_args[1]
assert call_kwargs["name"] == "TEST_DLQ"
assert call_kwargs["subjects"] == ["TEST_DLQ.*"]
@pytest.mark.asyncio
async def test_send_to_dlq(self, mock_js, event_payload):
"""Test sending event to DLQ."""
handler = DLQHandler(js=mock_js)
error = ValueError("Test error message")
await handler.send_to_dlq(
topic="test-topic",
payload=event_payload,
error=error,
retry_count=3,
)
mock_js.publish.assert_called_once()
call_kwargs = mock_js.publish.call_args[1]
# Verify subject
assert call_kwargs["subject"] == "TAX_AGENT_DLQ.test-topic"
# Verify payload content
payload_data = json.loads(call_kwargs["payload"].decode())
assert payload_data["original_topic"] == "test-topic"
assert payload_data["retry_count"] == 3
assert payload_data["error"]["type"] == "ValueError"
assert payload_data["error"]["message"] == "Test error message"
# Verify headers
headers = call_kwargs["headers"]
assert headers["original_topic"] == "test-topic"
assert headers["event_id"] == event_payload.event_id
assert headers["error_type"] == "ValueError"
@pytest.mark.asyncio
async def test_send_to_dlq_with_original_message(self, mock_js, event_payload):
"""Test sending event to DLQ with original message data."""
handler = DLQHandler(js=mock_js)
original_message = b'{"test": "original"}'
error = RuntimeError("Processing failed")
await handler.send_to_dlq(
topic="test-topic",
payload=event_payload,
error=error,
retry_count=2,
original_message_data=original_message,
)
call_kwargs = mock_js.publish.call_args[1]
payload_data = json.loads(call_kwargs["payload"].decode())
assert "original_message_data" in payload_data
assert payload_data["original_message_data"] == '{"test": "original"}'
@pytest.mark.asyncio
async def test_send_to_dlq_handles_publish_failure(self, mock_js, event_payload):
"""Test DLQ handler when DLQ publish fails."""
mock_js.publish.side_effect = Exception("DLQ publish failed")
handler = DLQHandler(js=mock_js)
# Should not raise, but log critical error
await handler.send_to_dlq(
topic="test-topic",
payload=event_payload,
error=ValueError("Original error"),
retry_count=1,
)
# Verify publish was attempted
mock_js.publish.assert_called_once()
def test_calculate_backoff(self, mock_js):
"""Test exponential backoff calculation."""
handler = DLQHandler(
js=mock_js,
backoff_base_ms=1000,
backoff_multiplier=2.0,
backoff_max_ms=10000,
)
# First retry: 1000ms * 2^0 = 1000ms = 1s
assert handler.calculate_backoff(0) == 1.0
# Second retry: 1000ms * 2^1 = 2000ms = 2s
assert handler.calculate_backoff(1) == 2.0
# Third retry: 1000ms * 2^2 = 4000ms = 4s
assert handler.calculate_backoff(2) == 4.0
# Fourth retry: 1000ms * 2^3 = 8000ms = 8s
assert handler.calculate_backoff(3) == 8.0
# Fifth retry: would be 16000ms but capped at 10000ms = 10s
assert handler.calculate_backoff(4) == 10.0
@pytest.mark.asyncio
async def test_retry_with_backoff_success_first_attempt(self, mock_js):
"""Test successful operation on first attempt."""
handler = DLQHandler(js=mock_js, max_retries=3)
async def successful_func():
return "success"
success, error = await handler.retry_with_backoff(successful_func)
assert success is True
assert error is None
@pytest.mark.asyncio
async def test_retry_with_backoff_success_after_retries(self, mock_js):
"""Test successful operation after retries."""
handler = DLQHandler(
js=mock_js,
max_retries=3,
backoff_base_ms=100, # Short backoff for testing
)
attempt_count = 0
async def flaky_func():
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
raise ValueError(f"Fail attempt {attempt_count}")
return "success"
with patch("asyncio.sleep", new=AsyncMock()): # Speed up test
success, error = await handler.retry_with_backoff(flaky_func)
assert success is True
assert error is None
assert attempt_count == 3
@pytest.mark.asyncio
async def test_retry_with_backoff_all_attempts_fail(self, mock_js):
"""Test operation that fails all retry attempts."""
handler = DLQHandler(
js=mock_js,
max_retries=2,
backoff_base_ms=100,
)
async def always_fails():
raise ValueError("Always fails")
with patch("asyncio.sleep", new=AsyncMock()): # Speed up test
success, error = await handler.retry_with_backoff(always_fails)
assert success is False
assert isinstance(error, ValueError)
assert str(error) == "Always fails"
@pytest.mark.asyncio
async def test_retry_with_backoff_applies_delay(self, mock_js):
"""Test that retry applies backoff delay."""
handler = DLQHandler(
js=mock_js,
max_retries=2,
backoff_base_ms=1000,
backoff_multiplier=2.0,
)
attempt_count = 0
async def failing_func():
nonlocal attempt_count
attempt_count += 1
raise ValueError("Fail")
with patch("asyncio.sleep", new=AsyncMock()) as mock_sleep:
await handler.retry_with_backoff(failing_func)
# Should have called sleep twice (after 1st and 2nd failures)
assert mock_sleep.call_count == 2
# Verify backoff delays
calls = mock_sleep.call_args_list
assert calls[0][0][0] == 1.0 # First retry: 1s
assert calls[1][0][0] == 2.0 # Second retry: 2s
class TestDLQMetrics:
"""Test cases for DLQ metrics."""
def test_initialization(self):
"""Test metrics initialization."""
metrics = DLQMetrics()
assert metrics.total_dlq_events == 0
assert len(metrics.dlq_events_by_topic) == 0
assert len(metrics.dlq_events_by_error_type) == 0
def test_record_dlq_event(self):
"""Test recording DLQ events."""
metrics = DLQMetrics()
metrics.record_dlq_event("topic1", "ValueError")
metrics.record_dlq_event("topic1", "ValueError")
metrics.record_dlq_event("topic2", "RuntimeError")
assert metrics.total_dlq_events == 3
assert metrics.dlq_events_by_topic["topic1"] == 2
assert metrics.dlq_events_by_topic["topic2"] == 1
assert metrics.dlq_events_by_error_type["ValueError"] == 2
assert metrics.dlq_events_by_error_type["RuntimeError"] == 1
def test_get_metrics(self):
"""Test getting metrics snapshot."""
metrics = DLQMetrics()
metrics.record_dlq_event("topic1", "ValueError")
metrics.record_dlq_event("topic1", "RuntimeError")
snapshot = metrics.get_metrics()
assert snapshot["total_dlq_events"] == 2
assert snapshot["by_topic"]["topic1"] == 2
assert snapshot["by_error_type"]["ValueError"] == 1
assert snapshot["by_error_type"]["RuntimeError"] == 1
# Verify it's a copy, not a reference
snapshot["total_dlq_events"] = 999
assert metrics.total_dlq_events == 2
def test_reset(self):
"""Test resetting metrics."""
metrics = DLQMetrics()
metrics.record_dlq_event("topic1", "ValueError")
metrics.record_dlq_event("topic2", "RuntimeError")
assert metrics.total_dlq_events == 2
metrics.reset()
assert metrics.total_dlq_events == 0
assert len(metrics.dlq_events_by_topic) == 0
assert len(metrics.dlq_events_by_error_type) == 0