Some checks failed
CI/CD Pipeline / Code Quality & Linting (push) Has been cancelled
CI/CD Pipeline / Policy Validation (push) Has been cancelled
CI/CD Pipeline / Test Suite (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-firm-connectors) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-forms) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-hmrc) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ingestion) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-normalize-map) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-ocr) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-indexer) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-reason) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (svc-rpa) (push) Has been cancelled
CI/CD Pipeline / Build Docker Images (ui-review) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-coverage) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-extract) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-kg) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (svc-rag-retriever) (push) Has been cancelled
CI/CD Pipeline / Security Scanning (ui-review) (push) Has been cancelled
CI/CD Pipeline / Generate SBOM (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / Notifications (push) Has been cancelled
62 lines
2.1 KiB
Python
62 lines
2.1 KiB
Python
"""Authentication headers parsing and validation."""
|
|
|
|
import structlog
|
|
from fastapi import HTTPException, Request, status
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
class AuthenticationHeaders:
|
|
"""Parse and validate authentication headers from Traefik + Authentik"""
|
|
|
|
def __init__(self, request: Request):
|
|
self.request = request
|
|
self.headers = request.headers
|
|
|
|
@property
|
|
def authenticated_user(self) -> str | None:
|
|
"""Get authenticated user from headers"""
|
|
return self.headers.get("X-Authenticated-User")
|
|
|
|
@property
|
|
def authenticated_email(self) -> str | None:
|
|
"""Get authenticated email from headers"""
|
|
return self.headers.get("X-Authenticated-Email")
|
|
|
|
@property
|
|
def authenticated_groups(self) -> list[str]:
|
|
"""Get authenticated groups from headers"""
|
|
groups_header = self.headers.get("X-Authenticated-Groups", "")
|
|
return [g.strip() for g in groups_header.split(",") if g.strip()]
|
|
|
|
@property
|
|
def authorization_token(self) -> str | None:
|
|
"""Get JWT token from Authorization header"""
|
|
auth_header = self.headers.get("Authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
return auth_header[7:]
|
|
return None
|
|
|
|
def has_role(self, role: str) -> bool:
|
|
"""Check if user has specific role"""
|
|
return role in self.authenticated_groups
|
|
|
|
def has_any_role(self, roles: list[str]) -> bool:
|
|
"""Check if user has any of the specified roles"""
|
|
return any(role in self.authenticated_groups for role in roles)
|
|
|
|
def require_role(self, role: str) -> None:
|
|
"""Require specific role or raise HTTPException"""
|
|
if not self.has_role(role):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail=f"Role '{role}' required"
|
|
)
|
|
|
|
def require_any_role(self, roles: list[str]) -> None:
|
|
"""Require any of the specified roles or raise HTTPException"""
|
|
if not self.has_any_role(roles):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"One of roles {roles} required",
|
|
)
|