""" Unit tests for svc-forms service Tests actual business logic: PDF form filling, evidence pack generation, currency formatting, and field mapping """ import os import sys from unittest.mock import AsyncMock, Mock, patch import pytest # Add the project root to the path so we can import from apps sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) # Import the actual service code from apps.svc_forms.main import FormsSettings # pylint: disable=wrong-import-position,import-error,too-few-public-methods # pylint: disable=global-statement,raise-missing-from,unused-argument # pylint: disable=too-many-arguments,too-many-positional-arguments # pylint: disable=too-many-locals,import-outside-toplevel # mypy: disable-error-code=union-attr class TestFormsSettings: """Test FormsSettings configuration""" def test_default_settings(self) -> None: """Test default FormsSettings values""" settings = FormsSettings() # Test service configuration assert settings.service_name == "svc-forms" # Test form templates configuration assert settings.forms_template_dir == "forms/templates" assert settings.output_bucket == "filled-forms" assert settings.evidence_packs_bucket == "evidence-packs" # Test supported forms expected_forms = ["SA100", "SA103", "SA105", "SA106"] assert settings.supported_forms == expected_forms # Test PDF configuration assert settings.pdf_quality == "high" assert settings.flatten_forms is True def test_custom_settings(self) -> None: """Test custom FormsSettings values""" custom_settings = FormsSettings( forms_template_dir="custom/templates", output_bucket="custom-forms", evidence_packs_bucket="custom-evidence", supported_forms=["SA100", "SA103"], pdf_quality="medium", flatten_forms=False, ) assert custom_settings.forms_template_dir == "custom/templates" assert custom_settings.output_bucket == "custom-forms" assert custom_settings.evidence_packs_bucket == "custom-evidence" assert custom_settings.supported_forms == ["SA100", "SA103"] assert custom_settings.pdf_quality == "medium" assert custom_settings.flatten_forms is False class TestFormSupport: """Test form support validation""" def test_supported_forms_list(self) -> None: """Test supported forms list""" settings = FormsSettings() supported_forms = settings.supported_forms # Test that key UK tax forms are supported assert "SA100" in supported_forms # Main self-assessment form assert "SA103" in supported_forms # Self-employment assert "SA105" in supported_forms # Property income assert "SA106" in supported_forms # Foreign income def test_form_validation(self) -> None: """Test form ID validation logic""" settings = FormsSettings() valid_forms = settings.supported_forms # Test valid form IDs for form_id in valid_forms: assert form_id in valid_forms assert form_id.startswith("SA") # UK self-assessment forms assert len(form_id) >= 5 # Minimum length # Test invalid form IDs invalid_forms = ["INVALID", "CT600", "VAT100", ""] for invalid_form in invalid_forms: assert invalid_form not in valid_forms class TestPDFConfiguration: """Test PDF configuration and quality settings""" def test_pdf_quality_options(self) -> None: """Test PDF quality configuration""" # Test different quality settings quality_options = ["low", "medium", "high", "maximum"] for quality in quality_options: settings = FormsSettings(pdf_quality=quality) assert settings.pdf_quality == quality def test_flatten_forms_option(self) -> None: """Test form flattening configuration""" # Test flattening enabled (default) settings_flat = FormsSettings(flatten_forms=True) assert settings_flat.flatten_forms is True # Test flattening disabled settings_editable = FormsSettings(flatten_forms=False) assert settings_editable.flatten_forms is False def test_pdf_configuration_validation(self) -> None: """Test PDF configuration validation""" settings = FormsSettings() # Test that quality is a string assert isinstance(settings.pdf_quality, str) assert len(settings.pdf_quality) > 0 # Test that flatten_forms is boolean assert isinstance(settings.flatten_forms, bool) class TestFormFieldMapping: """Test form field mapping concepts""" def test_sa100_field_mapping(self) -> None: """Test SA100 form field mapping structure""" # Test the concept of SA100 field mapping # In a real implementation, this would test actual field mapping logic sa100_fields = { # Personal details "1.1": "forename", "1.2": "surname", "1.3": "date_of_birth", "1.4": "national_insurance_number", # Income summary "2.1": "total_income_from_employment", "2.2": "total_income_from_self_employment", "2.3": "total_income_from_property", "2.4": "total_income_from_savings", # Tax calculation "3.1": "total_income_tax_due", "3.2": "total_national_insurance_due", "3.3": "total_tax_and_ni_due", } # Test field mapping structure for box_number, field_name in sa100_fields.items(): assert isinstance(box_number, str) assert "." in box_number # Box numbers have section.item format assert isinstance(field_name, str) assert len(field_name) > 0 def test_sa103_field_mapping(self) -> None: """Test SA103 (self-employment) field mapping structure""" sa103_fields = { # Business details "3.1": "business_name", "3.2": "business_description", "3.3": "business_address", "3.4": "accounting_period_start", "3.5": "accounting_period_end", # Income "3.11": "turnover", "3.12": "other_business_income", # Expenses "3.13": "cost_of_goods_sold", "3.14": "construction_industry_subcontractor_costs", "3.15": "other_direct_costs", "3.16": "employee_costs", "3.17": "premises_costs", "3.18": "repairs_and_renewals", "3.19": "general_administrative_expenses", "3.20": "motor_expenses", "3.21": "travel_and_subsistence", "3.22": "advertising_and_entertainment", "3.23": "legal_and_professional_costs", "3.24": "bad_debts", "3.25": "interest_and_alternative_finance_payments", "3.26": "other_finance_charges", "3.27": "depreciation_and_loss_on_disposal", "3.28": "other_business_expenses", # Profit calculation "3.29": "total_expenses", "3.30": "net_profit_or_loss", } # Test field mapping structure for box_number, field_name in sa103_fields.items(): assert isinstance(box_number, str) assert box_number.startswith("3.") # SA103 fields start with 3. assert isinstance(field_name, str) assert len(field_name) > 0 def test_currency_formatting(self) -> None: """Test currency formatting for form fields""" # Test currency formatting concepts test_amounts = [ (1234.56, "1,234.56"), (1000000.00, "1,000,000.00"), (0.50, "0.50"), (0.00, "0.00"), (999.99, "999.99"), ] for amount, expected_format in test_amounts: # Test that amounts can be formatted correctly formatted = f"{amount:,.2f}" assert formatted == expected_format def test_date_formatting(self) -> None: """Test date formatting for form fields""" # Test date formatting concepts test_dates = [ ("2024-04-05", "05/04/2024"), # UK date format ("2023-12-31", "31/12/2023"), ("2024-01-01", "01/01/2024"), ] for iso_date, expected_format in test_dates: # Test that dates can be formatted correctly for UK forms from datetime import datetime date_obj = datetime.fromisoformat(iso_date) formatted = date_obj.strftime("%d/%m/%Y") assert formatted == expected_format class TestEvidencePackGeneration: """Test evidence pack generation concepts""" def test_evidence_pack_structure(self) -> None: """Test evidence pack structure""" # Test the concept of evidence pack structure evidence_pack = { "taxpayer_id": "taxpayer_123", "tax_year": "2023-24", "generated_at": "2024-01-15T10:30:00Z", "documents": [ { "type": "filled_form", "form_id": "SA100", "filename": "SA100_2023-24_taxpayer_123.pdf", "size_bytes": 245760, }, { "type": "supporting_document", "document_type": "bank_statement", "filename": "bank_statement_jan_2024.pdf", "size_bytes": 512000, }, { "type": "supporting_document", "document_type": "receipt", "filename": "office_supplies_receipt.pdf", "size_bytes": 128000, }, ], "total_size_bytes": 885760, "checksum": "sha256:abc123def456...", } # Test evidence pack structure assert "taxpayer_id" in evidence_pack assert "tax_year" in evidence_pack assert "generated_at" in evidence_pack assert "documents" in evidence_pack assert "total_size_bytes" in evidence_pack assert "checksum" in evidence_pack # Test documents structure for document in evidence_pack["documents"]: assert "type" in document assert "filename" in document assert "size_bytes" in document def test_evidence_pack_validation(self) -> None: """Test evidence pack validation concepts""" # Test validation rules for evidence packs validation_rules = { "max_total_size_mb": 100, # 100MB limit "max_documents": 50, # Maximum 50 documents "allowed_document_types": [ "filled_form", "supporting_document", "calculation_summary", "audit_trail", ], "required_forms": ["SA100"], # SA100 is always required "supported_file_formats": [".pdf", ".jpg", ".png"], } # Test validation rule structure assert isinstance(validation_rules["max_total_size_mb"], int) assert isinstance(validation_rules["max_documents"], int) assert isinstance(validation_rules["allowed_document_types"], list) assert isinstance(validation_rules["required_forms"], list) assert isinstance(validation_rules["supported_file_formats"], list) # Test that SA100 is required assert "SA100" in validation_rules["required_forms"] # Test that PDF is supported assert ".pdf" in validation_rules["supported_file_formats"] class TestHealthEndpoint: """Test health check endpoint""" @pytest.mark.asyncio async def test_health_check_endpoint(self) -> None: """Test health check endpoint returns correct data""" from apps.svc_forms.main import health_check result = await health_check() assert result["status"] == "healthy" assert result["service"] == "svc-forms" assert "timestamp" in result assert "supported_forms" in result assert isinstance(result["supported_forms"], list) class TestFormFilling: """Test form filling functionality""" @pytest.mark.asyncio async def test_fill_form_async_sa100(self) -> None: """Test async form filling for SA100""" from apps.svc_forms.main import _fill_form_async form_id = "SA100" field_values = { "taxpayer_name": "John Smith", "nino": "AB123456C", "total_income": "50000.00", } tenant_id = "tenant1" filling_id = "FILL123" actor = "user1" with ( patch("apps.svc_forms.main.pdf_form_filler") as mock_pdf_filler, patch("apps.svc_forms.main.storage_client") as mock_storage, patch("apps.svc_forms.main.event_bus") as mock_event_bus, patch("apps.svc_forms.main.metrics") as mock_metrics, ): # Mock PDF form filler mock_pdf_filler.fill_form.return_value = b"mock_filled_pdf_content" # Mock storage operations (async) mock_storage.put_object = AsyncMock(return_value=True) mock_event_bus.publish = AsyncMock(return_value=None) # Mock metrics mock_counter = Mock() mock_counter.labels.return_value = mock_counter mock_counter.inc.return_value = None mock_metrics.counter.return_value = mock_counter # Call the function await _fill_form_async(form_id, field_values, tenant_id, filling_id, actor) # Verify operations were called mock_pdf_filler.fill_form.assert_called_once_with(form_id, field_values) mock_storage.put_object.assert_called() mock_event_bus.publish.assert_called() @pytest.mark.asyncio async def test_fill_form_async_error_handling(self) -> None: """Test error handling in async form filling""" from apps.svc_forms.main import _fill_form_async form_id = "SA100" field_values = {"taxpayer_name": "John Smith"} tenant_id = "tenant1" filling_id = "FILL123" actor = "user1" with ( patch("apps.svc_forms.main.pdf_form_filler") as mock_pdf_filler, patch("apps.svc_forms.main.event_bus") as mock_event_bus, patch("apps.svc_forms.main.metrics") as mock_metrics, ): # Mock PDF processing to raise an error mock_pdf_filler.fill_form.side_effect = Exception("PDF processing failed") mock_event_bus.publish = AsyncMock(return_value=None) # Mock metrics mock_counter = Mock() mock_counter.labels.return_value = mock_counter mock_counter.inc.return_value = None mock_metrics.counter.return_value = mock_counter # Call the function - should not raise but log error and update metrics await _fill_form_async(form_id, field_values, tenant_id, filling_id, actor) # Verify error metrics were updated mock_metrics.counter.assert_called_with("form_filling_errors_total") mock_counter.labels.assert_called_with( tenant_id=tenant_id, form_id=form_id, error_type="Exception" ) mock_counter.inc.assert_called() class TestEvidencePackCreation: """Test evidence pack creation functionality""" @pytest.mark.asyncio async def test_create_evidence_pack_async(self) -> None: """Test async evidence pack creation""" from apps.svc_forms.main import _create_evidence_pack_async taxpayer_id = "TP123456" tax_year = "2023-24" scope = "full_submission" evidence_items = [ { "type": "calculation", "calculation_id": "CALC123", "description": "Tax calculation for 2023-24", }, { "type": "document", "document_id": "DOC456", "description": "P60 for 2023-24", }, ] tenant_id = "tenant1" pack_id = "PACK123" actor = "user1" with ( patch("apps.svc_forms.main.evidence_pack_generator") as mock_evidence_gen, patch("apps.svc_forms.main.storage_client") as mock_storage, patch("apps.svc_forms.main.event_bus") as mock_event_bus, patch("apps.svc_forms.main.metrics") as mock_metrics, ): # Mock evidence pack generator mock_evidence_gen.create_evidence_pack = AsyncMock( return_value={ "pack_size": 1024, "evidence_count": 2, "pack_data": b"mock_pack_data", } ) # Mock metrics mock_counter = Mock() mock_counter.labels.return_value = mock_counter mock_counter.inc.return_value = None mock_metrics.counter.return_value = mock_counter # Call the function await _create_evidence_pack_async( taxpayer_id, tax_year, scope, evidence_items, tenant_id, pack_id, actor ) # Verify operations were called mock_evidence_gen.create_evidence_pack.assert_called_once_with( taxpayer_id=taxpayer_id, tax_year=tax_year, scope=scope, evidence_items=evidence_items, ) mock_metrics.counter.assert_called_with("evidence_packs_created_total") mock_counter.labels.assert_called_with(tenant_id=tenant_id, scope=scope) mock_counter.inc.assert_called() @pytest.mark.asyncio async def test_create_evidence_pack_async_error_handling(self) -> None: """Test error handling in async evidence pack creation""" from apps.svc_forms.main import _create_evidence_pack_async taxpayer_id = "TP123456" tax_year = "2023-24" scope = "full_submission" evidence_items = [{"type": "calculation", "calculation_id": "CALC123"}] tenant_id = "tenant1" pack_id = "PACK123" actor = "user1" with ( patch("apps.svc_forms.main.evidence_pack_generator") as mock_evidence_gen, patch("apps.svc_forms.main.event_bus") as mock_event_bus, ): # Mock evidence pack generator to raise an error mock_evidence_gen.create_evidence_pack = AsyncMock( side_effect=Exception("Evidence pack creation failed") ) mock_event_bus.publish = AsyncMock(return_value=None) # Call the function - should not raise but log error await _create_evidence_pack_async( taxpayer_id, tax_year, scope, evidence_items, tenant_id, pack_id, actor ) # Verify evidence pack generator was called and failed mock_evidence_gen.create_evidence_pack.assert_called_once_with( taxpayer_id=taxpayer_id, tax_year=tax_year, scope=scope, evidence_items=evidence_items, ) class TestEventHandling: """Test event handling functionality""" @pytest.mark.asyncio async def test_handle_calculation_ready(self) -> None: """Test handling calculation ready events""" from apps.svc_forms.main import _handle_calculation_ready from libs.events import EventPayload # Create mock event payload payload = EventPayload( actor="user1", tenant_id="tenant1", data={ "calculation_id": "CALC123", "schedule": "SA100", "taxpayer_id": "TP123", "tenant_id": "tenant1", "actor": "user1", }, ) with patch("apps.svc_forms.main.BackgroundTasks") as mock_bg_tasks: mock_bg_tasks.return_value = Mock() # Call the function await _handle_calculation_ready("calculation_ready", payload) # Should not raise an error assert True # If we get here, the function completed successfully @pytest.mark.asyncio async def test_handle_calculation_ready_missing_data(self) -> None: """Test handling calculation ready events with missing data""" from apps.svc_forms.main import _handle_calculation_ready from libs.events import EventPayload # Create mock event payload with missing data payload = EventPayload( data={}, # Missing required fields actor="test_user", tenant_id="tenant1", ) # Call the function - should handle gracefully await _handle_calculation_ready("calculation_ready", payload) # Should not raise an error assert True class TestHealthEndpoints: """Test health check endpoints""" @pytest.mark.asyncio async def test_health_check_endpoint(self) -> None: """Test health check endpoint""" from apps.svc_forms.main import health_check result = await health_check() assert result["status"] == "healthy" assert result["service"] == "svc-forms" assert "version" in result assert "timestamp" in result assert "supported_forms" in result @pytest.mark.asyncio async def test_list_supported_forms_endpoint(self) -> None: """Test list supported forms endpoint""" from apps.svc_forms.main import list_supported_forms # Mock dependencies current_user = {"user_id": "test_user"} tenant_id = "test_tenant" result = await list_supported_forms(current_user, tenant_id) assert isinstance(result, dict) assert "supported_forms" in result assert isinstance(result["supported_forms"], list) assert "total_forms" in result class TestFormValidation: """Test form validation business logic""" def test_supported_form_validation_sa100(self) -> None: """Test validation of supported SA100 form""" from apps.svc_forms.main import settings form_id = "SA100" # Test that SA100 is in supported forms assert form_id in settings.supported_forms # Test form validation logic is_supported = form_id in settings.supported_forms assert is_supported is True def test_supported_form_validation_invalid(self) -> None: """Test validation of unsupported form""" from apps.svc_forms.main import settings form_id = "INVALID_FORM" # Test that invalid form is not supported is_supported = form_id in settings.supported_forms assert is_supported is False def test_field_values_processing_basic(self) -> None: """Test basic field values processing""" field_values = { "taxpayer_name": "John Smith", "nino": "AB123456C", "total_income": "50000.00", "box_1": "25000", "box_2": "15000", } # Test field count assert len(field_values) == 5 # Test field types assert isinstance(field_values["taxpayer_name"], str) assert isinstance(field_values["total_income"], str) # Test box field processing box_fields = {k: v for k, v in field_values.items() if k.startswith("box_")} assert len(box_fields) == 2 assert "box_1" in box_fields assert "box_2" in box_fields def test_form_boxes_to_field_values_conversion(self) -> None: """Test conversion from form boxes to field values""" form_boxes = { "1": {"value": 50000, "description": "Total income"}, "2": {"value": 5000, "description": "Tax deducted"}, "3": {"value": 2000, "description": "Other income"}, } # Convert to field values format field_values = {} for box_id, box_data in form_boxes.items(): field_values[f"box_{box_id}"] = box_data["value"] # Test conversion assert len(field_values) == 3 assert field_values["box_1"] == 50000 assert field_values["box_2"] == 5000 assert field_values["box_3"] == 2000 class TestEvidencePackLogic: """Test evidence pack business logic""" def test_evidence_items_validation_basic(self) -> None: """Test basic evidence items validation""" evidence_items = [ { "type": "calculation", "calculation_id": "CALC123", "description": "Tax calculation for 2023-24", }, { "type": "document", "document_id": "DOC456", "description": "P60 for 2023-24", }, ] # Test evidence items structure assert len(evidence_items) == 2 # Test first item calc_item = evidence_items[0] assert calc_item["type"] == "calculation" assert "calculation_id" in calc_item assert "description" in calc_item # Test second item doc_item = evidence_items[1] assert doc_item["type"] == "document" assert "document_id" in doc_item assert "description" in doc_item def test_evidence_pack_scope_validation(self) -> None: """Test evidence pack scope validation""" valid_scopes = ["full_submission", "partial_submission", "supporting_docs"] for scope in valid_scopes: # Test that scope is a valid string assert isinstance(scope, str) assert len(scope) > 0 # Test invalid scope invalid_scope = "" assert len(invalid_scope) == 0 def test_taxpayer_id_validation(self) -> None: """Test taxpayer ID validation""" valid_taxpayer_ids = ["TP123456", "TAXPAYER_001", "12345678"] for taxpayer_id in valid_taxpayer_ids: # Test basic validation assert isinstance(taxpayer_id, str) assert len(taxpayer_id) > 0 assert taxpayer_id.strip() == taxpayer_id # No leading/trailing spaces def test_tax_year_format_validation(self) -> None: """Test tax year format validation""" valid_tax_years = ["2023-24", "2022-23", "2021-22"] for tax_year in valid_tax_years: # Test format assert isinstance(tax_year, str) assert len(tax_year) == 7 # Format: YYYY-YY assert "-" in tax_year # Test year parts parts = tax_year.split("-") assert len(parts) == 2 assert len(parts[0]) == 4 # Full year assert len(parts[1]) == 2 # Short year class TestFormFillingLogic: """Test form filling business logic""" def test_filling_id_generation_format(self) -> None: """Test filling ID generation format""" import ulid # Generate filling ID like the service does filling_id = str(ulid.new()) # Test format assert isinstance(filling_id, str) assert len(filling_id) == 26 # ULID length # Test uniqueness filling_id2 = str(ulid.new()) assert filling_id != filling_id2 def test_object_key_generation(self) -> None: """Test S3 object key generation""" tenant_id = "tenant123" filling_id = "01HKQM7XQZX8QZQZQZQZQZQZQZ" # Generate object key like the service does object_key = f"tenants/{tenant_id}/filled/{filling_id}.pdf" # Test format assert object_key == "tenants/tenant123/filled/01HKQM7XQZX8QZQZQZQZQZQZQZ.pdf" assert object_key.startswith("tenants/") assert object_key.endswith(".pdf") assert tenant_id in object_key assert filling_id in object_key def test_form_metadata_generation(self) -> None: """Test form metadata generation""" from datetime import datetime form_id = "SA100" filling_id = "FILL123" tenant_id = "tenant1" calculation_id = "CALC456" # Generate metadata like the service does metadata = { "form_id": form_id, "filling_id": filling_id, "tenant_id": tenant_id, "calculation_id": calculation_id or "", "filled_at": datetime.utcnow().isoformat(), } # Test metadata structure assert "form_id" in metadata assert "filling_id" in metadata assert "tenant_id" in metadata assert "calculation_id" in metadata assert "filled_at" in metadata # Test values assert metadata["form_id"] == form_id assert metadata["filling_id"] == filling_id assert metadata["tenant_id"] == tenant_id assert metadata["calculation_id"] == calculation_id if __name__ == "__main__": pytest.main([__file__])