Files
harkon b324ff09ef
Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
Initial commit
2025-10-11 08:41:36 +01:00

62 lines
2.1 KiB
Python

"""Authentication headers parsing and validation."""
import structlog
from fastapi import HTTPException, Request, status
logger = structlog.get_logger()
class AuthenticationHeaders:
"""Parse and validate authentication headers from Traefik + Authentik"""
def __init__(self, request: Request):
self.request = request
self.headers = request.headers
@property
def authenticated_user(self) -> str | None:
"""Get authenticated user from headers"""
return self.headers.get("X-Authenticated-User")
@property
def authenticated_email(self) -> str | None:
"""Get authenticated email from headers"""
return self.headers.get("X-Authenticated-Email")
@property
def authenticated_groups(self) -> list[str]:
"""Get authenticated groups from headers"""
groups_header = self.headers.get("X-Authenticated-Groups", "")
return [g.strip() for g in groups_header.split(",") if g.strip()]
@property
def authorization_token(self) -> str | None:
"""Get JWT token from Authorization header"""
auth_header = self.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:]
return None
def has_role(self, role: str) -> bool:
"""Check if user has specific role"""
return role in self.authenticated_groups
def has_any_role(self, roles: list[str]) -> bool:
"""Check if user has any of the specified roles"""
return any(role in self.authenticated_groups for role in roles)
def require_role(self, role: str) -> None:
"""Require specific role or raise HTTPException"""
if not self.has_role(role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=f"Role '{role}' required"
)
def require_any_role(self, roles: list[str]) -> None:
"""Require any of the specified roles or raise HTTPException"""
if not self.has_any_role(roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of roles {roles} required",
)