"""Vault Transit encryption/decryption helpers.""" import base64 import hvac import structlog logger = structlog.get_logger() class VaultTransitHelper: """Helper for Vault Transit encryption/decryption""" def __init__(self, vault_client: hvac.Client, mount_point: str = "transit"): self.vault_client = vault_client self.mount_point = mount_point def encrypt_field(self, key_name: str, plaintext: str) -> str: """Encrypt a field using Vault Transit""" try: # Ensure key exists self._ensure_key_exists(key_name) # Encrypt the data response = self.vault_client.secrets.transit.encrypt_data( mount_point=self.mount_point, name=key_name, plaintext=base64.b64encode(plaintext.encode()).decode(), ) return str(response["data"]["ciphertext"]) except Exception as e: logger.error("Failed to encrypt field", key_name=key_name, error=str(e)) raise def decrypt_field(self, key_name: str, ciphertext: str) -> str: """Decrypt a field using Vault Transit""" try: response = self.vault_client.secrets.transit.decrypt_data( mount_point=self.mount_point, name=key_name, ciphertext=ciphertext ) return base64.b64decode(response["data"]["plaintext"]).decode() except Exception as e: logger.error("Failed to decrypt field", key_name=key_name, error=str(e)) raise def _ensure_key_exists(self, key_name: str) -> None: """Ensure encryption key exists in Vault""" try: self.vault_client.secrets.transit.read_key( mount_point=self.mount_point, name=key_name ) # pylint: disable-next=broad-exception-caught except Exception: # hvac.exceptions.InvalidPath # Key doesn't exist, create it self.vault_client.secrets.transit.create_key( mount_point=self.mount_point, name=key_name, key_type="aes256-gcm96" ) logger.info("Created new encryption key", key_name=key_name)