suna/backend/webhooks/providers.py

322 lines
13 KiB
Python
Raw Normal View History

2025-06-17 15:51:31 +08:00
import hmac
import hashlib
import time
import json
import httpx
2025-06-17 15:51:31 +08:00
from typing import Dict, Any, Optional
from fastapi import HTTPException
from .models import SlackEventRequest, SlackWebhookPayload, TelegramUpdateRequest, TelegramWebhookPayload
2025-06-17 15:51:31 +08:00
from utils.logger import logger
class SlackWebhookProvider:
"""Handles Slack webhook events and verification."""
@staticmethod
def verify_signature(body: bytes, timestamp: str, signature: str, signing_secret: str) -> bool:
"""Verify Slack request signature."""
try:
basestring = f"v0:{timestamp}:{body.decode('utf-8')}"
my_signature = f"v0={hmac.new(signing_secret.encode(), basestring.encode(), hashlib.sha256).hexdigest()}"
return hmac.compare_digest(my_signature, signature)
except Exception as e:
logger.error(f"Error verifying Slack signature: {e}")
return False
@staticmethod
def validate_request_timing(timestamp: str, tolerance: int = 300) -> bool:
"""Validate that the request is not too old (replay attack protection)."""
try:
request_time = int(timestamp)
current_time = int(time.time())
return abs(current_time - request_time) < tolerance
except (ValueError, TypeError):
return False
@staticmethod
def process_event(event_data: SlackEventRequest) -> Optional[SlackWebhookPayload]:
"""Process Slack event and extract relevant data."""
try:
# Handle cases where type might be None (empty verification requests)
if not event_data.type:
logger.info("Slack event has no type, likely verification ping")
return None
2025-06-17 15:51:31 +08:00
if event_data.type == "url_verification":
return None
if event_data.type == "event_callback" and event_data.event:
event = event_data.event
event_type = event.get("type")
if event_type == "app_mention":
text = event.get("text", "")
user_id = event.get("user", "")
channel_id = event.get("channel", "")
timestamp = event.get("ts", "")
import re
clean_text = re.sub(r'<@[^>]+>', '', text).strip()
return SlackWebhookPayload(
text=clean_text,
user_id=user_id,
channel_id=channel_id,
team_id=event_data.team_id or "",
timestamp=timestamp,
event_type=event_type,
trigger_word="mention"
)
elif event_type == "message":
if event.get("channel_type") == "im":
text = event.get("text", "")
user_id = event.get("user", "")
channel_id = event.get("channel", "")
timestamp = event.get("ts", "")
return SlackWebhookPayload(
text=text,
user_id=user_id,
channel_id=channel_id,
team_id=event_data.team_id or "",
timestamp=timestamp,
event_type=event_type,
trigger_word="direct_message"
)
logger.warning(f"Unhandled Slack event type: {event_data.type}")
return None
except Exception as e:
logger.error(f"Error processing Slack event: {e}")
raise HTTPException(status_code=400, detail=f"Error processing Slack event: {str(e)}")
class TelegramWebhookProvider:
"""Handles Telegram webhook events and verification."""
@staticmethod
def verify_webhook_secret(body: bytes, secret_token: str, telegram_secret_token: str) -> bool:
"""Verify Telegram webhook secret token."""
try:
return secret_token == telegram_secret_token
except Exception as e:
logger.error(f"Error verifying Telegram secret token: {e}")
return False
@staticmethod
async def setup_webhook(bot_token: str, webhook_url: str, secret_token: Optional[str] = None) -> Dict[str, Any]:
"""
Automatically set up the Telegram webhook by calling the Telegram Bot API.
Args:
bot_token: The Telegram bot token
webhook_url: The webhook URL to set
secret_token: Optional secret token for additional security
Returns:
Dict containing the API response
"""
try:
telegram_api_url = f"https://api.telegram.org/bot{bot_token}/setWebhook"
payload = {
"url": webhook_url,
"drop_pending_updates": True # Clear any pending updates
}
if secret_token:
payload["secret_token"] = secret_token
logger.info(f"Setting up Telegram webhook: {webhook_url}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(telegram_api_url, json=payload)
response_data = response.json()
if response.status_code == 200 and response_data.get("ok"):
logger.info(f"Successfully set up Telegram webhook: {response_data.get('description', 'Webhook set')}")
return {
"success": True,
"message": response_data.get("description", "Webhook set successfully"),
"response": response_data
}
else:
error_msg = response_data.get("description", f"HTTP {response.status_code}")
logger.error(f"Failed to set up Telegram webhook: {error_msg}")
return {
"success": False,
"error": error_msg,
"response": response_data
}
except httpx.TimeoutException:
error_msg = "Timeout while connecting to Telegram API"
logger.error(f"Telegram webhook setup failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Error setting up Telegram webhook: {str(e)}"
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
@staticmethod
async def remove_webhook(bot_token: str) -> Dict[str, Any]:
"""
Remove the Telegram webhook by calling the Telegram Bot API.
Args:
bot_token: The Telegram bot token
Returns:
Dict containing the API response
"""
try:
telegram_api_url = f"https://api.telegram.org/bot{bot_token}/deleteWebhook"
payload = {
"drop_pending_updates": True # Clear any pending updates
}
logger.info("Removing Telegram webhook")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(telegram_api_url, json=payload)
response_data = response.json()
if response.status_code == 200 and response_data.get("ok"):
logger.info(f"Successfully removed Telegram webhook: {response_data.get('description', 'Webhook removed')}")
return {
"success": True,
"message": response_data.get("description", "Webhook removed successfully"),
"response": response_data
}
else:
error_msg = response_data.get("description", f"HTTP {response.status_code}")
logger.error(f"Failed to remove Telegram webhook: {error_msg}")
return {
"success": False,
"error": error_msg,
"response": response_data
}
except httpx.TimeoutException:
error_msg = "Timeout while connecting to Telegram API"
logger.error(f"Telegram webhook removal failed: {error_msg}")
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Error removing Telegram webhook: {str(e)}"
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
@staticmethod
def process_update(update_data: TelegramUpdateRequest) -> Optional[TelegramWebhookPayload]:
"""Process Telegram update and extract relevant data."""
try:
# Handle regular messages
if update_data.message:
message = update_data.message
text = message.get("text", "")
# Skip if no text content
if not text:
logger.info("Telegram message has no text content")
return None
user = message.get("from", {})
chat = message.get("chat", {})
return TelegramWebhookPayload(
text=text,
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0),
timestamp=message.get("date", 0),
update_type="message",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
# Handle edited messages
elif update_data.edited_message:
message = update_data.edited_message
text = message.get("text", "")
if not text:
logger.info("Telegram edited message has no text content")
return None
user = message.get("from", {})
chat = message.get("chat", {})
return TelegramWebhookPayload(
text=text,
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0),
timestamp=message.get("edit_date", message.get("date", 0)),
update_type="edited_message",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
# Handle callback queries (inline keyboard button presses)
elif update_data.callback_query:
callback = update_data.callback_query
data = callback.get("data", "")
if not data:
logger.info("Telegram callback query has no data")
return None
user = callback.get("from", {})
message = callback.get("message", {})
chat = message.get("chat", {}) if message else {}
return TelegramWebhookPayload(
text=f"Callback: {data}",
user_id=str(user.get("id", "")),
chat_id=str(chat.get("id", "")),
message_id=message.get("message_id", 0) if message else 0,
timestamp=int(time.time()),
update_type="callback_query",
user_first_name=user.get("first_name"),
user_last_name=user.get("last_name"),
user_username=user.get("username"),
chat_type=chat.get("type"),
chat_title=chat.get("title")
)
logger.warning(f"Unhandled Telegram update type: {update_data.dict()}")
return None
except Exception as e:
logger.error(f"Error processing Telegram update: {e}")
raise HTTPException(status_code=400, detail=f"Error processing Telegram update: {str(e)}")
2025-06-17 15:51:31 +08:00
class GenericWebhookProvider:
"""Handles generic webhook events."""
@staticmethod
def process_payload(data: Dict[str, Any]) -> Dict[str, Any]:
"""Process generic webhook payload."""
return {
"payload": data,
"trigger_type": "webhook",
"processed_at": time.time()
}