"""Document upload, storage, checksum validation, metadata extraction service.""" import hashlib import mimetypes import os # Import shared libraries import sys from datetime import UTC, datetime from typing import Any, cast import structlog import ulid from fastapi import Depends, File, HTTPException, Request, UploadFile sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) from libs.app_factory import create_app, get_tenant_dependency, get_user_dependency from libs.config import BaseAppSettings, create_event_bus, create_minio_client from libs.events import EventBus, EventPayload, EventTopics from libs.observability import get_metrics, get_tracer from libs.schemas import DocumentKind, DocumentUploadResponse from libs.storage import DocumentStorage, StorageClient logger = structlog.get_logger() class IngestionSettings(BaseAppSettings): """Settings for ingestion service""" service_name: str = "svc-ingestion" # File upload limits max_file_size: int = 50 * 1024 * 1024 # 50MB allowed_mime_types: list[str] = [ "application/pdf", "image/jpeg", "image/png", "image/tiff", "text/csv", "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ] # Storage configuration raw_documents_bucket: str = "raw-documents" evidence_bucket: str = "evidence" # Global clients (will be initialized in startup) storage_client: StorageClient | None = None document_storage: DocumentStorage | None = None event_bus: EventBus | None = None # Settings will be initialized after app creation settings: IngestionSettings def init_dependencies(app_settings: IngestionSettings) -> None: """Initialize service dependencies""" global storage_client, document_storage, event_bus, settings settings = app_settings logger.info( "Starting ingestion service", minio_endpoint=settings.minio_endpoint, minio_access_key=settings.minio_access_key, ) # Initialize clients minio_client = create_minio_client(settings) storage_client = StorageClient(minio_client) document_storage = DocumentStorage(storage_client) event_bus = create_event_bus(settings) logger.info("Ingestion service started successfully") # Create app and settings async def startup_event() -> None: """Initialize service dependencies""" if event_bus is None: raise ValueError("Event bus not initialized") await event_bus.start() app, _settings = create_app( service_name="svc-ingestion", title="Tax Agent Ingestion Service", description="Document upload and storage service", settings_class=IngestionSettings, startup_hooks=[startup_event], ) # Initialize dependencies immediately init_dependencies(cast(IngestionSettings, _settings)) # Get observability components tracer = get_tracer("svc-ingestion") metrics = get_metrics("svc-ingestion") # Health endpoints are provided by app_factory @app.post("/upload", response_model=DocumentUploadResponse) async def upload_document( request: Request, file: UploadFile = File(...), kind: DocumentKind = DocumentKind.INVOICE, source: str = "manual_upload", current_user: dict[str, Any] = Depends(get_user_dependency()), tenant_id: str = Depends(get_tenant_dependency()), ) -> DocumentUploadResponse: """Upload document for processing""" # Check if services are initialized if document_storage is None or event_bus is None: raise HTTPException( status_code=503, detail="Service not ready - dependencies not initialized" ) with tracer.start_as_current_span("upload_document") as span: span.set_attribute("tenant_id", tenant_id) span.set_attribute("document_kind", kind.value) span.set_attribute("source", source) try: # Validate file await _validate_upload(file) # Generate document ID doc_id = f"doc_{ulid.new()}" span.set_attribute("doc_id", doc_id) # Read file content content = await file.read() # Calculate checksum checksum = hashlib.sha256(content).hexdigest() # Detect MIME type detected_mime = None if file.filename: detected_mime = mimetypes.guess_type(file.filename)[0] content_type = ( detected_mime or file.content_type or "application/octet-stream" ) # Store document storage_result = await document_storage.store_document( tenant_id=tenant_id, doc_id=doc_id, content=content, content_type=content_type, metadata={ "original_filename": file.filename or "unknown", "kind": kind.value, "source": source, "uploaded_by": current_user.get("sub", "unknown"), "uploaded_at": datetime.now(UTC).isoformat(), }, ) # Publish event event_payload = EventPayload( data={ "doc_id": doc_id, "tenant_id": tenant_id, "filename": file.filename or "unknown", "kind": kind.value, "source": source, "checksum_sha256": checksum, "size_bytes": len(content), "mime_type": content_type, "storage_path": storage_result["s3_url"], }, actor=current_user.get("sub", "system"), tenant_id=tenant_id, trace_id=str(span.get_span_context().trace_id), ) await event_bus.publish(EventTopics.DOC_INGESTED, event_payload) # Update metrics metrics.counter( "documents_uploaded_total", labelnames=["tenant_id", "kind", "source"] ).labels(tenant_id=tenant_id, kind=kind.value, source=source).inc() metrics.histogram( "document_size_bytes", labelnames=["tenant_id", "kind"] ).labels(tenant_id=tenant_id, kind=kind.value).observe(len(content)) logger.info( "Document uploaded successfully", doc_id=doc_id, tenant_id=tenant_id, kind=kind.value, size=len(content), checksum=checksum, ) return DocumentUploadResponse( doc_id=doc_id, s3_url=storage_result["s3_url"], checksum=checksum ) except ValueError as e: logger.warning("Upload validation failed", error=str(e)) # Track validation errors try: metrics.counter( "upload_errors_total", labelnames=["tenant_id", "error_type"] ).labels(tenant_id=tenant_id, error_type="ValueError").inc() except Exception: pass # Don't fail on metrics errors raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error("Upload failed", error=str(e)) # Track upload errors try: metrics.counter( "upload_errors_total", labelnames=["tenant_id", "error_type"] ).labels(tenant_id=tenant_id, error_type=type(e).__name__).inc() except Exception: pass # Don't fail on metrics errors raise HTTPException(status_code=500, detail="Upload failed") @app.get("/documents/{doc_id}") async def get_document_info( doc_id: str, current_user: dict[str, Any] = Depends(get_user_dependency()), tenant_id: str = Depends(get_tenant_dependency()), ) -> dict[str, str]: """Get document information""" # Check if services are initialized if storage_client is None: raise HTTPException( status_code=503, detail="Service not ready - dependencies not initialized" ) with tracer.start_as_current_span("get_document_info") as span: span.set_attribute("doc_id", doc_id) span.set_attribute("tenant_id", tenant_id) try: # Check if document exists ingestion_settings = settings bucket_name = ingestion_settings.raw_documents_bucket object_key = f"tenants/{tenant_id}/raw/{doc_id}.pdf" exists = await storage_client.object_exists(bucket_name, object_key) if not exists: raise HTTPException(status_code=404, detail="Document not found") # Get presigned URL for download download_url = await storage_client.get_presigned_url( bucket_name=bucket_name, object_name=object_key, method="GET" ) if not download_url: raise HTTPException( status_code=500, detail="Failed to generate download URL" ) return { "doc_id": doc_id, "download_url": download_url, "s3_url": f"s3://{bucket_name}/{object_key}", } except HTTPException: raise except Exception as e: logger.error("Failed to get document info", doc_id=doc_id, error=str(e)) raise HTTPException(status_code=500, detail="Failed to get document info") @app.delete("/documents/{doc_id}") async def delete_document( doc_id: str, current_user: dict[str, Any] = Depends(get_user_dependency()), tenant_id: str = Depends(get_tenant_dependency()), ) -> dict[str, str]: """Delete document""" # Check if services are initialized if storage_client is None: raise HTTPException( status_code=503, detail="Service not ready - dependencies not initialized" ) with tracer.start_as_current_span("delete_document") as span: span.set_attribute("doc_id", doc_id) span.set_attribute("tenant_id", tenant_id) try: # Delete from storage ingestion_settings = settings bucket_name = ingestion_settings.raw_documents_bucket object_key = f"tenants/{tenant_id}/raw/{doc_id}.pdf" success = await storage_client.delete_object(bucket_name, object_key) if not success: raise HTTPException(status_code=404, detail="Document not found") logger.info("Document deleted", doc_id=doc_id, tenant_id=tenant_id) return {"message": "Document deleted successfully"} except HTTPException: raise except Exception as e: logger.error("Failed to delete document", doc_id=doc_id, error=str(e)) raise HTTPException(status_code=500, detail="Failed to delete document") async def _validate_upload(file: UploadFile) -> None: """Validate uploaded file""" # Cast settings to the correct type ingestion_settings = settings # Check file size if file.size and file.size > ingestion_settings.max_file_size: raise ValueError( f"File too large: {file.size} bytes (max: {ingestion_settings.max_file_size})" ) # Check MIME type if file.content_type not in ingestion_settings.allowed_mime_types: # Try to detect MIME type from filename detected_mime = None if file.filename: detected_mime = mimetypes.guess_type(file.filename)[0] if detected_mime not in ingestion_settings.allowed_mime_types: raise ValueError(f"Unsupported file type: {file.content_type}") # Check filename if not file.filename: raise ValueError("Filename is required") # Check for malicious filenames if ".." in file.filename or "/" in file.filename or "\\" in file.filename: raise ValueError("Invalid filename") if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True, log_config=None, # Use structlog configuration )