mirror of https://github.com/kortix-ai/suna.git
107 lines
3.9 KiB
Python
107 lines
3.9 KiB
Python
import os
|
|
import httpx
|
|
from typing import Dict, Any, Optional
|
|
from ..domain.exceptions import HttpClientException, AuthenticationException, RateLimitException
|
|
|
|
|
|
class HttpClient:
|
|
def __init__(self):
|
|
self.base_url = "https://api.pipedream.com/v1"
|
|
self.session: Optional[httpx.AsyncClient] = None
|
|
self.access_token: Optional[str] = None
|
|
self.rate_limit_token: Optional[str] = None
|
|
|
|
async def _get_session(self) -> httpx.AsyncClient:
|
|
if self.session is None or self.session.is_closed:
|
|
self.session = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(30.0),
|
|
headers={"User-Agent": "Suna-Pipedream-Client/1.0"}
|
|
)
|
|
return self.session
|
|
|
|
async def _ensure_access_token(self) -> str:
|
|
if self.access_token:
|
|
return self.access_token
|
|
|
|
project_id = os.getenv("PIPEDREAM_PROJECT_ID")
|
|
client_id = os.getenv("PIPEDREAM_CLIENT_ID")
|
|
client_secret = os.getenv("PIPEDREAM_CLIENT_SECRET")
|
|
|
|
if not all([project_id, client_id, client_secret]):
|
|
raise AuthenticationException("Missing required environment variables")
|
|
|
|
session = await self._get_session()
|
|
|
|
try:
|
|
response = await session.post(
|
|
f"{self.base_url}/oauth/token",
|
|
data={
|
|
"grant_type": "client_credentials",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
self.access_token = data["access_token"]
|
|
return self.access_token
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 429:
|
|
raise RateLimitException()
|
|
raise AuthenticationException(f"Failed to obtain access token: {e}")
|
|
|
|
async def get(self, url: str, headers: Dict[str, str] = None, params: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
session = await self._get_session()
|
|
access_token = await self._ensure_access_token()
|
|
|
|
request_headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
if headers:
|
|
request_headers.update(headers)
|
|
|
|
if self.rate_limit_token:
|
|
request_headers["x-pd-rate-limit"] = self.rate_limit_token
|
|
|
|
try:
|
|
response = await session.get(url, headers=request_headers, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 429:
|
|
raise RateLimitException()
|
|
raise HttpClientException(url, e.response.status_code, str(e))
|
|
|
|
async def post(self, url: str, headers: Dict[str, str] = None, json: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
session = await self._get_session()
|
|
access_token = await self._ensure_access_token()
|
|
|
|
request_headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
if headers:
|
|
request_headers.update(headers)
|
|
|
|
if self.rate_limit_token:
|
|
request_headers["x-pd-rate-limit"] = self.rate_limit_token
|
|
|
|
try:
|
|
response = await session.post(url, headers=request_headers, json=json)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 429:
|
|
raise RateLimitException()
|
|
raise HttpClientException(url, e.response.status_code, str(e))
|
|
|
|
async def close(self) -> None:
|
|
if self.session and not self.session.is_closed:
|
|
await self.session.aclose() |