mirror of https://github.com/kortix-ai/suna.git
56 lines
2.1 KiB
Python
56 lines
2.1 KiB
Python
import os
|
|
import json
|
|
import hashlib
|
|
from typing import Dict, Any, Tuple
|
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
from ..protocols import Logger
|
|
|
|
|
|
class EncryptionService:
|
|
def __init__(self, logger: Logger):
|
|
self._logger = logger
|
|
self._encryption_key = self._get_or_create_encryption_key()
|
|
self._cipher = Fernet(self._encryption_key)
|
|
|
|
def _get_or_create_encryption_key(self) -> bytes:
|
|
key_env = os.getenv("MCP_CREDENTIAL_ENCRYPTION_KEY")
|
|
|
|
try:
|
|
if isinstance(key_env, str):
|
|
return key_env.encode('utf-8')
|
|
else:
|
|
return key_env
|
|
|
|
except Exception as e:
|
|
self._logger.error(f"Invalid encryption key: {e}")
|
|
self._logger.warning("Generating new encryption key for this session")
|
|
key = Fernet.generate_key()
|
|
self._logger.info(f"Generated new encryption key. Set this in your environment:")
|
|
self._logger.info(f"MCP_CREDENTIAL_ENCRYPTION_KEY={key.decode()}")
|
|
return key
|
|
|
|
def encrypt_config(self, config: Dict[str, Any]) -> Tuple[bytes, str]:
|
|
config_json = json.dumps(config, sort_keys=True)
|
|
config_bytes = config_json.encode('utf-8')
|
|
|
|
config_hash = hashlib.sha256(config_bytes).hexdigest()
|
|
encrypted_config = self._cipher.encrypt(config_bytes)
|
|
|
|
return encrypted_config, config_hash
|
|
|
|
def decrypt_config(self, encrypted_config: bytes, expected_hash: str) -> Dict[str, Any]:
|
|
try:
|
|
decrypted_bytes = self._cipher.decrypt(encrypted_config)
|
|
|
|
actual_hash = hashlib.sha256(decrypted_bytes).hexdigest()
|
|
if actual_hash != expected_hash:
|
|
raise ValueError("Credential integrity check failed")
|
|
|
|
config_json = decrypted_bytes.decode('utf-8')
|
|
return json.loads(config_json)
|
|
|
|
except Exception as e:
|
|
self._logger.error(f"Failed to decrypt credential: {e}")
|
|
raise ValueError("Failed to decrypt credential") |