suna/backend/webhooks/providers.py

322 lines
13 KiB
Python

import hmac
import hashlib
import time
import json
import httpx
from typing import Dict, Any, Optional
from fastapi import HTTPException
from .models import SlackEventRequest, SlackWebhookPayload, TelegramUpdateRequest, TelegramWebhookPayload
from utils.logger import logger
class SlackWebhookProvider:
"""Handles Slack webhook events and verification."""
@staticmethod
def verify_signature(body: bytes, timestamp: str, signature: str, signing_secret: str) -> bool:
"""Verify Slack request signature."""
try:
basestring = f"v0:{timestamp}:{body.decode('utf-8')}"
my_signature = f"v0={hmac.new(signing_secret.encode(), basestring.encode(), hashlib.sha256).hexdigest()}"
return hmac.compare_digest(my_signature, signature)
except Exception as e:
logger.error(f"Error verifying Slack signature: {e}")
return False
@staticmethod
def validate_request_timing(timestamp: str, tolerance: int = 300) -> bool:
"""Validate that the request is not too old (replay attack protection)."""
try:
request_time = int(timestamp)
current_time = int(time.time())
return abs(current_time - request_time) < tolerance
except (ValueError, TypeError):
return False
@staticmethod
def process_event(event_data: SlackEventRequest) -> Optional[SlackWebhookPayload]:
"""Process Slack event and extract relevant data."""
try:
# Handle cases where type might be None (empty verification requests)
if not event_data.type:
logger.info("Slack event has no type, likely verification ping")
return None
if event_data.type == "url_verification":
return None
if event_data.type == "event_callback" and event_data.event:
event = event_data.event
event_type = event.get("type")
if event_type == "app_mention":
text = event.get("text", "")
user_id = event.get("user", "")
channel_id = event.get("channel", "")
timestamp = event.get("ts", "")
import re
clean_text = re.sub(r'<@[^>]+>', '', text).strip()
return SlackWebhookPayload(
text=clean_text,
user_id=user_id,
channel_id=channel_id,
team_id=event_data.team_id or "",
timestamp=timestamp,
event_type=event_type,
trigger_word="mention"
)
elif event_type == "message":
if event.get("channel_type") == "im":
text = event.get("text", "")
user_id = event.get("user", "")
channel_id = event.get("channel", "")
timestamp = event.get("ts", "")
return SlackWebhookPayload(
text=text,
user_id=user_id,
channel_id=channel_id,
team_id=event_data.team_id or "",
timestamp=timestamp,
event_type=event_type,
trigger_word="direct_message"
)
logger.warning(f"Unhandled Slack event type: {event_data.type}")
return None
except Exception as e:
logger.error(f"Error processing Slack event: {e}")
raise HTTPException(status_code=400, detail=f"Error processing Slack event: {str(e)}")
class TelegramWebhookProvider:
"""Handles Telegram webhook events and verification."""
@staticmethod
def verify_webhook_secret(body: bytes, secret_token: str, telegram_secret_token: str) -> bool:
"""Verify Telegram webhook secret token."""
try:
return secret_token == telegram_secret_token
except Exception as e:
logger.error(f"Error verifying Telegram secret token: {e}")
return False
@staticmethod
async def setup_webhook(bot_token: str, webhook_url: str, secret_token: Optional[str] = None) -> Dict[str, Any]:
"""
Automatically set up the Telegram webhook by calling the Telegram Bot API.
Args:
bot_token: The Telegram bot token
webhook_url: The webhook URL to set
secret_token: Optional secret token for additional security
Returns:
Dict containing the API response
"""
try:
telegram_api_url = f"https://api.telegram.org/bot{bot_token}/setWebhook"
payload = {
"url": webhook_url,
"drop_pending_updates": True # Clear any pending updates
}
if secret_token:
payload["secret_token"] = secret_token
logger.info(f"Setting up Telegram webhook: {webhook_url}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(telegram_api_url, json=payload)
response_data = response.json()
if response.status_code == 200 and response_data.get("ok"):
logger.info(f"Successfully set up Telegram webhook: {response_data.get('description', 'Webhook set')}")
return {
"success": True,
"message": response_data.get("description", "Webhook set successfully"),
"response": response_data
}
else:
error_msg = response_data.get("description", f"HTTP {response.status_code}")
logger.error(f"Failed to set up Telegram webhook: {error_msg}")
return {
"success": False,
"error": error_msg,
"response": response_data
}
except httpx.TimeoutException:
error_msg = "Timeout while connecting to Telegram API"
logger.error(f"Telegram webhook setup failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Error setting up Telegram webhook: {str(e)}"
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
@staticmethod
async def remove_webhook(bot_token: str) -> Dict[str, Any]:
"""
Remove the Telegram webhook by calling the Telegram Bot API.
Args:
bot_token: The Telegram bot token
Returns:
Dict containing the API response
"""
try:
telegram_api_url = f"https://api.telegram.org/bot{bot_token}/deleteWebhook"
payload = {
"drop_pending_updates": True # Clear any pending updates
}
logger.info("Removing Telegram webhook")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(telegram_api_url, json=payload)
response_data = response.json()
if response.status_code == 200 and response_data.get("ok"):
logger.info(f"Successfully removed Telegram webhook: {response_data.get('description', 'Webhook removed')}")
return {
"success": True,
"message": response_data.get("description", "Webhook removed successfully"),
"response": response_data
}
else:
error_msg = response_data.get("description", f"HTTP {response.status_code}")
logger.error(f"Failed to remove Telegram webhook: {error_msg}")
return {
"success": False,
"error": error_msg,
"response": response_data
}
except httpx.TimeoutException:
error_msg = "Timeout while connecting to Telegram API"
logger.error(f"Telegram webhook removal failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Error removing Telegram webhook: {str(e)}"
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
@staticmethod
def process_update(update_data: TelegramUpdateRequest) -> Optional[TelegramWebhookPayload]:
"""Process Telegram update and extract relevant data."""
try:
# Handle regular messages
if update_data.message:
message = update_data.message
text = message.get("text", "")
# Skip if no text content
if not text:
logger.info("Telegram message has no text content")
return None
user = message.get("from", {})
chat = message.get("chat", {})
return TelegramWebhookPayload(
text=text,
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0),
timestamp=message.get("date", 0),
update_type="message",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
# Handle edited messages
elif update_data.edited_message:
message = update_data.edited_message
text = message.get("text", "")
if not text:
logger.info("Telegram edited message has no text content")
return None
user = message.get("from", {})
chat = message.get("chat", {})
return TelegramWebhookPayload(
text=text,
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0),
timestamp=message.get("edit_date", message.get("date", 0)),
update_type="edited_message",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
# Handle callback queries (inline keyboard button presses)
elif update_data.callback_query:
callback = update_data.callback_query
data = callback.get("data", "")
if not data:
logger.info("Telegram callback query has no data")
return None
user = callback.get("from", {})
message = callback.get("message", {})
chat = message.get("chat", {}) if message else {}
return TelegramWebhookPayload(
text=f"Callback: {data}",
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0) if message else 0,
timestamp=int(time.time()),
update_type="callback_query",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
logger.warning(f"Unhandled Telegram update type: {update_data.dict()}")
return None
except Exception as e:
logger.error(f"Error processing Telegram update: {e}")
raise HTTPException(status_code=400, detail=f"Error processing Telegram update: {str(e)}")
class GenericWebhookProvider:
"""Handles generic webhook events."""
@staticmethod
def process_payload(data: Dict[str, Any]) -> Dict[str, Any]:
"""Process generic webhook payload."""
return {
"payload": data,
"trigger_type": "webhook",
"processed_at": time.time()
}