Some checks failed
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
318 lines
10 KiB
Python
318 lines
10 KiB
Python
"""Tests for Dead Letter Queue (DLQ) handler."""
|
|
|
|
import json
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
|
|
from libs.events.base import EventPayload
|
|
from libs.events.dlq import DLQHandler, DLQMetrics
|
|
|
|
|
|
@pytest.fixture
|
|
def event_payload():
|
|
"""Create a test event payload."""
|
|
return EventPayload(
|
|
data={"test": "data", "value": 123},
|
|
actor="test-user",
|
|
tenant_id="test-tenant",
|
|
trace_id="test-trace-123",
|
|
schema_version="1.0",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_js():
|
|
"""Create a mock JetStream context."""
|
|
js = AsyncMock()
|
|
js.stream_info = AsyncMock()
|
|
js.add_stream = AsyncMock()
|
|
js.publish = AsyncMock()
|
|
return js
|
|
|
|
|
|
class TestDLQHandler:
|
|
"""Test cases for DLQ handler."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialization(self, mock_js):
|
|
"""Test DLQ handler initialization."""
|
|
handler = DLQHandler(
|
|
js=mock_js,
|
|
dlq_stream_name="TEST_DLQ",
|
|
max_retries=5,
|
|
backoff_base_ms=500,
|
|
)
|
|
|
|
assert handler.js == mock_js
|
|
assert handler.dlq_stream_name == "TEST_DLQ"
|
|
assert handler.max_retries == 5
|
|
assert handler.backoff_base_ms == 500
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ensure_dlq_stream_exists_already_exists(self, mock_js):
|
|
"""Test ensuring DLQ stream when it already exists."""
|
|
mock_js.stream_info.return_value = {"name": "TEST_DLQ"}
|
|
|
|
handler = DLQHandler(js=mock_js, dlq_stream_name="TEST_DLQ")
|
|
await handler.ensure_dlq_stream_exists()
|
|
|
|
mock_js.stream_info.assert_called_once_with("TEST_DLQ")
|
|
mock_js.add_stream.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ensure_dlq_stream_creates_stream(self, mock_js):
|
|
"""Test ensuring DLQ stream when it doesn't exist."""
|
|
from nats.js.errors import NotFoundError
|
|
|
|
mock_js.stream_info.side_effect = NotFoundError
|
|
mock_js.add_stream = AsyncMock()
|
|
|
|
handler = DLQHandler(js=mock_js, dlq_stream_name="TEST_DLQ")
|
|
await handler.ensure_dlq_stream_exists()
|
|
|
|
mock_js.add_stream.assert_called_once()
|
|
call_kwargs = mock_js.add_stream.call_args[1]
|
|
assert call_kwargs["name"] == "TEST_DLQ"
|
|
assert call_kwargs["subjects"] == ["TEST_DLQ.*"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_to_dlq(self, mock_js, event_payload):
|
|
"""Test sending event to DLQ."""
|
|
handler = DLQHandler(js=mock_js)
|
|
|
|
error = ValueError("Test error message")
|
|
await handler.send_to_dlq(
|
|
topic="test-topic",
|
|
payload=event_payload,
|
|
error=error,
|
|
retry_count=3,
|
|
)
|
|
|
|
mock_js.publish.assert_called_once()
|
|
call_kwargs = mock_js.publish.call_args[1]
|
|
|
|
# Verify subject
|
|
assert call_kwargs["subject"] == "TAX_AGENT_DLQ.test-topic"
|
|
|
|
# Verify payload content
|
|
payload_data = json.loads(call_kwargs["payload"].decode())
|
|
assert payload_data["original_topic"] == "test-topic"
|
|
assert payload_data["retry_count"] == 3
|
|
assert payload_data["error"]["type"] == "ValueError"
|
|
assert payload_data["error"]["message"] == "Test error message"
|
|
|
|
# Verify headers
|
|
headers = call_kwargs["headers"]
|
|
assert headers["original_topic"] == "test-topic"
|
|
assert headers["event_id"] == event_payload.event_id
|
|
assert headers["error_type"] == "ValueError"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_to_dlq_with_original_message(self, mock_js, event_payload):
|
|
"""Test sending event to DLQ with original message data."""
|
|
handler = DLQHandler(js=mock_js)
|
|
|
|
original_message = b'{"test": "original"}'
|
|
error = RuntimeError("Processing failed")
|
|
|
|
await handler.send_to_dlq(
|
|
topic="test-topic",
|
|
payload=event_payload,
|
|
error=error,
|
|
retry_count=2,
|
|
original_message_data=original_message,
|
|
)
|
|
|
|
call_kwargs = mock_js.publish.call_args[1]
|
|
payload_data = json.loads(call_kwargs["payload"].decode())
|
|
|
|
assert "original_message_data" in payload_data
|
|
assert payload_data["original_message_data"] == '{"test": "original"}'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_to_dlq_handles_publish_failure(self, mock_js, event_payload):
|
|
"""Test DLQ handler when DLQ publish fails."""
|
|
mock_js.publish.side_effect = Exception("DLQ publish failed")
|
|
|
|
handler = DLQHandler(js=mock_js)
|
|
|
|
# Should not raise, but log critical error
|
|
await handler.send_to_dlq(
|
|
topic="test-topic",
|
|
payload=event_payload,
|
|
error=ValueError("Original error"),
|
|
retry_count=1,
|
|
)
|
|
|
|
# Verify publish was attempted
|
|
mock_js.publish.assert_called_once()
|
|
|
|
def test_calculate_backoff(self, mock_js):
|
|
"""Test exponential backoff calculation."""
|
|
handler = DLQHandler(
|
|
js=mock_js,
|
|
backoff_base_ms=1000,
|
|
backoff_multiplier=2.0,
|
|
backoff_max_ms=10000,
|
|
)
|
|
|
|
# First retry: 1000ms * 2^0 = 1000ms = 1s
|
|
assert handler.calculate_backoff(0) == 1.0
|
|
|
|
# Second retry: 1000ms * 2^1 = 2000ms = 2s
|
|
assert handler.calculate_backoff(1) == 2.0
|
|
|
|
# Third retry: 1000ms * 2^2 = 4000ms = 4s
|
|
assert handler.calculate_backoff(2) == 4.0
|
|
|
|
# Fourth retry: 1000ms * 2^3 = 8000ms = 8s
|
|
assert handler.calculate_backoff(3) == 8.0
|
|
|
|
# Fifth retry: would be 16000ms but capped at 10000ms = 10s
|
|
assert handler.calculate_backoff(4) == 10.0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retry_with_backoff_success_first_attempt(self, mock_js):
|
|
"""Test successful operation on first attempt."""
|
|
handler = DLQHandler(js=mock_js, max_retries=3)
|
|
|
|
async def successful_func():
|
|
return "success"
|
|
|
|
success, error = await handler.retry_with_backoff(successful_func)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retry_with_backoff_success_after_retries(self, mock_js):
|
|
"""Test successful operation after retries."""
|
|
handler = DLQHandler(
|
|
js=mock_js,
|
|
max_retries=3,
|
|
backoff_base_ms=100, # Short backoff for testing
|
|
)
|
|
|
|
attempt_count = 0
|
|
|
|
async def flaky_func():
|
|
nonlocal attempt_count
|
|
attempt_count += 1
|
|
if attempt_count < 3:
|
|
raise ValueError(f"Fail attempt {attempt_count}")
|
|
return "success"
|
|
|
|
with patch("asyncio.sleep", new=AsyncMock()): # Speed up test
|
|
success, error = await handler.retry_with_backoff(flaky_func)
|
|
|
|
assert success is True
|
|
assert error is None
|
|
assert attempt_count == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retry_with_backoff_all_attempts_fail(self, mock_js):
|
|
"""Test operation that fails all retry attempts."""
|
|
handler = DLQHandler(
|
|
js=mock_js,
|
|
max_retries=2,
|
|
backoff_base_ms=100,
|
|
)
|
|
|
|
async def always_fails():
|
|
raise ValueError("Always fails")
|
|
|
|
with patch("asyncio.sleep", new=AsyncMock()): # Speed up test
|
|
success, error = await handler.retry_with_backoff(always_fails)
|
|
|
|
assert success is False
|
|
assert isinstance(error, ValueError)
|
|
assert str(error) == "Always fails"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_retry_with_backoff_applies_delay(self, mock_js):
|
|
"""Test that retry applies backoff delay."""
|
|
handler = DLQHandler(
|
|
js=mock_js,
|
|
max_retries=2,
|
|
backoff_base_ms=1000,
|
|
backoff_multiplier=2.0,
|
|
)
|
|
|
|
attempt_count = 0
|
|
|
|
async def failing_func():
|
|
nonlocal attempt_count
|
|
attempt_count += 1
|
|
raise ValueError("Fail")
|
|
|
|
with patch("asyncio.sleep", new=AsyncMock()) as mock_sleep:
|
|
await handler.retry_with_backoff(failing_func)
|
|
|
|
# Should have called sleep twice (after 1st and 2nd failures)
|
|
assert mock_sleep.call_count == 2
|
|
|
|
# Verify backoff delays
|
|
calls = mock_sleep.call_args_list
|
|
assert calls[0][0][0] == 1.0 # First retry: 1s
|
|
assert calls[1][0][0] == 2.0 # Second retry: 2s
|
|
|
|
|
|
class TestDLQMetrics:
|
|
"""Test cases for DLQ metrics."""
|
|
|
|
def test_initialization(self):
|
|
"""Test metrics initialization."""
|
|
metrics = DLQMetrics()
|
|
|
|
assert metrics.total_dlq_events == 0
|
|
assert len(metrics.dlq_events_by_topic) == 0
|
|
assert len(metrics.dlq_events_by_error_type) == 0
|
|
|
|
def test_record_dlq_event(self):
|
|
"""Test recording DLQ events."""
|
|
metrics = DLQMetrics()
|
|
|
|
metrics.record_dlq_event("topic1", "ValueError")
|
|
metrics.record_dlq_event("topic1", "ValueError")
|
|
metrics.record_dlq_event("topic2", "RuntimeError")
|
|
|
|
assert metrics.total_dlq_events == 3
|
|
assert metrics.dlq_events_by_topic["topic1"] == 2
|
|
assert metrics.dlq_events_by_topic["topic2"] == 1
|
|
assert metrics.dlq_events_by_error_type["ValueError"] == 2
|
|
assert metrics.dlq_events_by_error_type["RuntimeError"] == 1
|
|
|
|
def test_get_metrics(self):
|
|
"""Test getting metrics snapshot."""
|
|
metrics = DLQMetrics()
|
|
|
|
metrics.record_dlq_event("topic1", "ValueError")
|
|
metrics.record_dlq_event("topic1", "RuntimeError")
|
|
|
|
snapshot = metrics.get_metrics()
|
|
|
|
assert snapshot["total_dlq_events"] == 2
|
|
assert snapshot["by_topic"]["topic1"] == 2
|
|
assert snapshot["by_error_type"]["ValueError"] == 1
|
|
assert snapshot["by_error_type"]["RuntimeError"] == 1
|
|
|
|
# Verify it's a copy, not a reference
|
|
snapshot["total_dlq_events"] = 999
|
|
assert metrics.total_dlq_events == 2
|
|
|
|
def test_reset(self):
|
|
"""Test resetting metrics."""
|
|
metrics = DLQMetrics()
|
|
|
|
metrics.record_dlq_event("topic1", "ValueError")
|
|
metrics.record_dlq_event("topic2", "RuntimeError")
|
|
|
|
assert metrics.total_dlq_events == 2
|
|
|
|
metrics.reset()
|
|
|
|
assert metrics.total_dlq_events == 0
|
|
assert len(metrics.dlq_events_by_topic) == 0
|
|
assert len(metrics.dlq_events_by_error_type) == 0
|