mirror of https://github.com/kortix-ai/suna.git
48 lines
1.1 KiB
Python
48 lines
1.1 KiB
Python
|
import json
|
||
|
import os
|
||
|
from typing import Any, Optional
|
||
|
from dotenv import load_dotenv
|
||
|
|
||
|
load_dotenv("./.env")
|
||
|
|
||
|
|
||
|
# Local key-value store for storing agent and thread IDs
|
||
|
class LocalKVStore:
|
||
|
def __init__(self, filename: str = ".kvstore.json"):
|
||
|
self.filename = filename
|
||
|
self._data = {}
|
||
|
self._load()
|
||
|
|
||
|
def _load(self):
|
||
|
if os.path.exists(self.filename):
|
||
|
try:
|
||
|
with open(self.filename, "r", encoding="utf-8") as f:
|
||
|
self._data = json.load(f)
|
||
|
except Exception:
|
||
|
self._data = {}
|
||
|
else:
|
||
|
self._data = {}
|
||
|
|
||
|
def _save(self):
|
||
|
with open(self.filename, "w", encoding="utf-8") as f:
|
||
|
json.dump(self._data, f, indent=2)
|
||
|
|
||
|
def get(self, key: str, default: Optional[Any] = None) -> Any:
|
||
|
return self._data.get(key, default)
|
||
|
|
||
|
def set(self, key: str, value: Any):
|
||
|
self._data[key] = value
|
||
|
self._save()
|
||
|
|
||
|
def delete(self, key: str):
|
||
|
if key in self._data:
|
||
|
del self._data[key]
|
||
|
self._save()
|
||
|
|
||
|
def clear(self):
|
||
|
self._data = {}
|
||
|
self._save()
|
||
|
|
||
|
|
||
|
kv = LocalKVStore()
|