Files
ai-tax-agent/libs/security/dependencies.py
harkon db61b05c80
Some checks failed
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
full ingestion -> OCR -> extraction flow is now working correctly.
2025-11-26 15:46:59 +00:00

85 lines
2.7 KiB
Python

"""FastAPI dependency functions for authentication and authorization."""
from collections.abc import Callable
from typing import Any
from fastapi import HTTPException, Request, status
def require_admin_role(request: Request) -> None:
"""Dependency to require admin role"""
auth = getattr(request.state, "auth", None)
if not auth:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required"
)
auth.require_role("admin")
def require_reviewer_role(request: Request) -> None:
"""Dependency to require reviewer role"""
auth = getattr(request.state, "auth", None)
if not auth:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required"
)
auth.require_any_role(["admin", "reviewer"])
def get_current_tenant(request: Request) -> str | None:
"""Extract tenant ID from user context or headers"""
# This could be extracted from JWT claims, user groups, or custom headers
# For now, we'll use a simple mapping from user to tenant
user = getattr(request.state, "user", None)
if not user:
return None
# Simple tenant extraction - in production this would be more sophisticated
# Could be from JWT claims, database lookup, or group membership
roles = getattr(request.state, "roles", [])
for role in roles:
if role.startswith("tenant:"):
return str(role.split(":", 1)[1])
# Check for explicit tenant header (useful for testing/API keys)
tenant_header = request.headers.get("X-Tenant-ID")
if tenant_header:
return tenant_header
# Default tenant for development
return "default"
# Dependency functions for FastAPI
def get_current_user() -> Callable[[Request], dict[str, Any]]:
"""FastAPI dependency to get current user"""
def _get_current_user(request: Request) -> dict[str, Any]:
user = getattr(request.state, "user", None)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
)
return {
"sub": user,
"email": getattr(request.state, "email", ""),
"roles": getattr(request.state, "roles", []),
}
return _get_current_user
def get_tenant_id() -> Callable[[Request], str]:
"""FastAPI dependency to get tenant ID"""
def _get_tenant_id(request: Request) -> str:
tenant_id = get_current_tenant(request)
if not tenant_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Tenant ID required"
)
return tenant_id
return _get_tenant_id