suna/backend/credentials/support/encryption.py

56 lines
2.1 KiB
Python
Raw Normal View History

2025-07-14 18:36:27 +08:00
import os
import json
import hashlib
from typing import Dict, Any, Tuple
from cryptography.fernet import Fernet
from ..protocols import Logger
class EncryptionService:
def __init__(self, logger: Logger):
self._logger = logger
self._encryption_key = self._get_or_create_encryption_key()
self._cipher = Fernet(self._encryption_key)
def _get_or_create_encryption_key(self) -> bytes:
key_env = os.getenv("MCP_CREDENTIAL_ENCRYPTION_KEY")
try:
if isinstance(key_env, str):
return key_env.encode('utf-8')
else:
return key_env
except Exception as e:
self._logger.error(f"Invalid encryption key: {e}")
self._logger.warning("Generating new encryption key for this session")
key = Fernet.generate_key()
self._logger.info(f"Generated new encryption key. Set this in your environment:")
self._logger.info(f"MCP_CREDENTIAL_ENCRYPTION_KEY={key.decode()}")
return key
def encrypt_config(self, config: Dict[str, Any]) -> Tuple[bytes, str]:
config_json = json.dumps(config, sort_keys=True)
config_bytes = config_json.encode('utf-8')
config_hash = hashlib.sha256(config_bytes).hexdigest()
encrypted_config = self._cipher.encrypt(config_bytes)
return encrypted_config, config_hash
def decrypt_config(self, encrypted_config: bytes, expected_hash: str) -> Dict[str, Any]:
try:
decrypted_bytes = self._cipher.decrypt(encrypted_config)
actual_hash = hashlib.sha256(decrypted_bytes).hexdigest()
if actual_hash != expected_hash:
raise ValueError("Credential integrity check failed")
config_json = decrypted_bytes.decode('utf-8')
return json.loads(config_json)
except Exception as e:
self._logger.error(f"Failed to decrypt credential: {e}")
raise ValueError("Failed to decrypt credential")