import asyncio
from agentpress.tool import ToolResult, openapi_schema, usage_example
from sandbox.tool_base import SandboxToolsBase
from agentpress.thread_manager import ThreadManager
from typing import List, Dict, Optional, Any
import json
import base64
import io
from datetime import datetime
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.dml.color import RGBColor
from pptx.enum.text import PP_ALIGN, MSO_ANCHOR
from pptx.enum.shapes import MSO_SHAPE
import tempfile
import os
import hashlib
from PIL import Image
import re
import asyncio
import random
import httpx
class SandboxPresentationToolV2(SandboxToolsBase):
def __init__(self, project_id: str, thread_manager: ThreadManager):
super().__init__(project_id, thread_manager)
self.presentations_dir = "presentations"
self.images_cache = {}
async def _ensure_presentations_dir(self):
full_path = f"{self.workspace_path}/{self.presentations_dir}"
try:
await self.sandbox.fs.create_folder(full_path, "755")
except:
pass
def _get_display_url(self, image_url: str) -> str:
if not image_url:
return ""
if image_url.startswith("unsplash:"):
keyword = image_url.replace("unsplash:", "").strip()
return f"https://source.unsplash.com/1920x1080/?{keyword}"
return image_url
async def _download_and_cache_image(self, image_url: str, presentation_dir: str) -> Optional[str]:
"""Download an image and cache it locally. Returns the relative path from workspace root."""
if not image_url:
return None
# Check if already cached
if image_url in self.images_cache:
return self.images_cache[image_url]
try:
download_url = self._get_display_url(image_url)
# Generate unique filename based on URL
url_hash = hashlib.md5(image_url.encode()).hexdigest()[:8]
images_dir = f"{presentation_dir}/images"
full_images_dir = f"{self.workspace_path}/{images_dir}"
# Ensure images directory exists
try:
await self.sandbox.fs.create_folder(full_images_dir, "755")
except:
pass
image_data: bytes | None = None
# Try to download the image
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=20.0) as client:
resp = await client.get(download_url, headers=headers)
resp.raise_for_status()
image_data = resp.content
except Exception as e:
# Fallback to curl if httpx fails
try:
tmp_path = f"/tmp/img_{url_hash}"
cmd = f"/bin/sh -c 'curl -fsSL -A \"Mozilla/5.0\" \"{download_url}\" -o {tmp_path}'"
res = await self.sandbox.process.exec(cmd, timeout=30)
if getattr(res, "exit_code", 1) == 0:
image_data = await self.sandbox.fs.download_file(tmp_path)
try:
await self.sandbox.process.exec(f"/bin/sh -c 'rm -f {tmp_path}'", timeout=10)
except:
pass
except Exception:
image_data = None
if not image_data:
print(f"Failed to download image from {download_url}")
return None
# Process and save the image
try:
img = Image.open(io.BytesIO(image_data))
output = io.BytesIO()
# Convert to RGB if necessary
if img.mode in ("RGBA", "LA", "P"):
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "P":
img = img.convert("RGBA")
background.paste(img, mask=img.split()[-1] if img.mode in ("RGBA", "LA") else None)
img = background
else:
img = img.convert("RGB")
img.save(output, format="JPEG", quality=90)
image_data = output.getvalue()
filename = f"img_{url_hash}.jpg"
except Exception:
# If image processing fails, save as-is
filename = f"img_{url_hash}.jpg"
# Save the image
image_path = f"{images_dir}/{filename}"
full_image_path = f"{self.workspace_path}/{image_path}"
await self.sandbox.fs.upload_file(image_data, full_image_path)
# Cache and return the relative path
self.images_cache[image_url] = image_path
return image_path
except Exception as e:
print(f"Failed to download image {image_url}: {e}")
return None
def _safe_name_variants(self, name: str) -> List[str]:
"""Generate safe name variants for file/folder naming."""
base = "".join(c if c.isalnum() or c in "-_" else '-' for c in name).lower()
while "--" in base:
base = base.replace("--", "-")
while "__" in base:
base = base.replace("__", "_")
hyphen = base.replace("_", "-")
underscore = base.replace("-", "_")
variants = []
for v in [base, hyphen, underscore]:
if v not in variants:
variants.append(v)
return variants
async def _process_slide_images(self, slide: Dict, presentation_dir: str) -> Dict:
"""Process all images in a slide and download them if needed."""
content = slide.get("content", {})
# Process single image
if "image" in content:
image_info = content["image"]
if isinstance(image_info, str):
# Convert string to dict format
local_path = await self._download_and_cache_image(image_info, presentation_dir)
if local_path:
content["image"] = {
"url": image_info,
"local_path": local_path
}
elif isinstance(image_info, dict) and "url" in image_info:
# Download if not already downloaded
if "local_path" not in image_info:
local_path = await self._download_and_cache_image(
image_info["url"],
presentation_dir
)
if local_path:
image_info["local_path"] = local_path
# Process image grid
if "images" in content:
processed_images = []
for img in content["images"]:
if isinstance(img, str):
local_path = await self._download_and_cache_image(img, presentation_dir)
if local_path:
processed_images.append({
"url": img,
"local_path": local_path
})
else:
processed_images.append({"url": img})
elif isinstance(img, dict):
if "url" in img and "local_path" not in img:
local_path = await self._download_and_cache_image(
img["url"],
presentation_dir
)
if local_path:
img["local_path"] = local_path
processed_images.append(img)
content["images"] = processed_images
return slide
@openapi_schema({
"type": "function",
"function": {
"name": "create_presentation",
"description": """
Create a professional presentation using structured JSON format.
The presentation will be saved as JSON and can be previewed in the browser or exported to PPTX.
IMPORTANT: Generate a structured JSON with specific layout types. Each slide must have:
- layout: One of 'title', 'title-bullets', 'title-content', 'two-column', 'image-text',
'quote', 'section', 'blank', 'hero-image', 'image-grid', 'comparison', 'timeline', 'stats'
- content: Object with fields specific to that layout type
Images can be specified as:
- URLs (will be downloaded and embedded)
- Unsplash search terms using "unsplash:keyword" format
- Local file paths in the workspace
The tool will handle all formatting and ensure consistency across preview and export.
THEME SELECTION: Instead of a fixed theme, choose a theme that best fits the content:
- 'corporate-blue': Professional blue theme for business presentations
- 'modern-purple': Modern purple/pink gradient for tech/startup
- 'minimal-mono': Black and white minimalist design
- 'ocean-teal': Calming teal and aqua colors
- 'sunset-warm': Warm orange and red tones
- 'forest-green': Natural green palette
- 'midnight-dark': Dark mode with bright accents
- 'pastel-soft': Soft pastel colors
- 'bold-contrast': High contrast bold colors
- 'elegant-gold': Sophisticated gold and navy
Select the theme that best matches the presentation's content and purpose.
""",
"parameters": {
"type": "object",
"properties": {
"presentation_name": {
"type": "string",
"description": "Name for the presentation (used for file naming)"
},
"title": {
"type": "string",
"description": "Main title of the presentation"
},
"subtitle": {
"type": "string",
"description": "Optional subtitle or tagline"
},
"theme": {
"type": "string",
"enum": ["corporate-blue", "modern-purple", "minimal-mono", "ocean-teal",
"sunset-warm", "forest-green", "midnight-dark", "pastel-soft",
"bold-contrast", "elegant-gold"],
"description": "Visual theme - choose based on content and purpose"
},
"slides": {
"type": "array",
"description": "Array of slide objects with layout and content",
"items": {
"type": "object",
"properties": {
"layout": {
"type": "string",
"enum": ["title", "title-bullets", "title-content", "two-column", "image-text",
"quote", "section", "blank", "hero-image", "image-grid",
"comparison", "timeline", "stats"],
"description": "Layout type for the slide"
},
"content": {
"type": "object",
"description": "Content object specific to the layout type",
"properties": {
"title": {"type": "string"},
"subtitle": {"type": "string"},
"bullets": {"type": "array", "items": {"type": "string"}},
"text": {"type": "string"},
"left_content": {"type": "object"},
"right_content": {"type": "object"},
"image": {
"type": "object",
"properties": {
"url": {"type": "string"},
"alt": {"type": "string"},
"position": {"type": "string", "enum": ["left", "right", "center", "background"]}
}
},
"quote": {"type": "string"},
"author": {"type": "string"},
"notes": {"type": "string"}
}
}
},
"required": ["layout", "content"]
}
}
},
"required": ["presentation_name", "title", "slides"]
}
}
})
@usage_example('''
company_overviewCompany Overview 2024Innovation Through Technologymodern-purple[
{
"layout": "hero-image",
"content": {
"title": "Welcome to the Future",
"subtitle": "Where Innovation Meets Excellence",
"image": {
"url": "unsplash:technology office",
"position": "background"
}
}
},
{
"layout": "stats",
"content": {
"title": "Our Impact in Numbers",
"stats": [
{"value": "50K+", "label": "Active Users"},
{"value": "$2.5M", "label": "Revenue"},
{"value": "98%", "label": "Satisfaction"},
{"value": "15", "label": "Countries"}
]
}
},
{
"layout": "image-text",
"content": {
"title": "Our Mission",
"text": "We empower businesses with cutting-edge AI technology to transform their operations, enhance productivity, and unlock new possibilities. Our platform combines powerful automation with human creativity.",
"image": {
"url": "unsplash:artificial intelligence",
"position": "right"
}
}
},
{
"layout": "image-grid",
"content": {
"title": "Our Products in Action",
"images": [
{"url": "unsplash:dashboard analytics", "caption": "Real-time Analytics"},
{"url": "unsplash:team collaboration", "caption": "Team Collaboration"},
{"url": "unsplash:mobile app", "caption": "Mobile Experience"},
{"url": "unsplash:data visualization", "caption": "Data Insights"}
]
}
},
{
"layout": "title-bullets",
"content": {
"title": "Key Features",
"bullets": [
"AI-powered automation for repetitive tasks",
"Real-time collaboration and communication",
"Advanced analytics and reporting",
"Enterprise-grade security and compliance",
"24/7 customer support and training"
]
}
},
{
"layout": "quote",
"content": {
"quote": "This platform has transformed how we work. We've saved 40% of our time and increased productivity by 60%.",
"author": "Sarah Johnson, CTO at TechCorp"
}
},
{
"layout": "section",
"content": {
"title": "Let's Build Together",
"subtitle": "Start Your Journey Today"
}
},
{
"layout": "title",
"content": {
"title": "Thank You",
"subtitle": "Questions? Let's Connect!",
"notes": "Contact us at hello@company.com"
}
}
]
''')
async def create_presentation(
self,
presentation_name: str,
title: str,
slides: List[Dict[str, Any]],
subtitle: Optional[str] = None,
theme: str = "corporate-blue"
) -> ToolResult:
try:
await self._ensure_sandbox()
await self._ensure_presentations_dir()
# Validate inputs
if not presentation_name:
return self.fail_response("Presentation name is required.")
if not title:
return self.fail_response("Presentation title is required.")
if not slides or not isinstance(slides, list) or len(slides) == 0:
return self.fail_response("At least one slide is required.")
# Validate slide structure
for i, slide in enumerate(slides):
if 'layout' not in slide:
return self.fail_response(f"Slide {i+1} missing 'layout' field")
if 'content' not in slide:
return self.fail_response(f"Slide {i+1} missing 'content' field")
# Create presentation directory
safe_name = self._safe_name_variants(presentation_name)[0]
presentation_dir = f"{self.presentations_dir}/{safe_name}"
full_presentation_path = f"{self.workspace_path}/{presentation_dir}"
try:
await self.sandbox.fs.create_folder(full_presentation_path, "755")
except:
pass
# Process all images in slides (download them during creation)
download_errors = []
processed_slides = []
for slide in slides:
try:
processed_slide = await self._process_slide_images(slide.copy(), presentation_dir)
processed_slides.append(processed_slide)
except Exception as e:
download_errors.append(f"Error processing slide images: {str(e)}")
processed_slides.append(slide)
# Create presentation data
presentation_data = {
"version": "2.0",
"metadata": {
"title": title,
"subtitle": subtitle,
"theme": theme,
"created_at": datetime.now().isoformat(),
"presentation_name": presentation_name,
"total_slides": len(processed_slides)
},
"slides": processed_slides,
"theme_config": self._get_theme_config(theme)
}
# Save JSON
json_filename = "presentation.json"
json_path = f"{presentation_dir}/{json_filename}"
full_json_path = f"{self.workspace_path}/{json_path}"
json_content = json.dumps(presentation_data, indent=2)
await self.sandbox.fs.upload_file(
json_content.encode('utf-8'),
full_json_path
)
# Generate HTML preview with downloaded images
preview_html = self._generate_html_preview(presentation_data)
preview_path = f"{presentation_dir}/preview.html"
full_preview_path = f"{self.workspace_path}/{preview_path}"
await self.sandbox.fs.upload_file(
preview_html.encode('utf-8'),
full_preview_path
)
result = {
"message": f"Successfully created presentation '{title}' with {theme} theme",
"presentation_name": safe_name,
"json_file": json_path,
"preview_url": f"/workspace/{preview_path}",
"total_slides": len(processed_slides),
"theme": theme,
"slides": [
{
"slide_number": i + 1,
"layout": slide["layout"],
"title": slide["content"].get("title", f"Slide {i+1}")
}
for i, slide in enumerate(processed_slides)
]
}
if download_errors:
result["warnings"] = download_errors
return self.success_response(result)
except Exception as e:
return self.fail_response(f"Failed to create presentation: {str(e)}")
def _get_theme_config(self, theme: str) -> Dict[str, Any]:
themes = {
"corporate-blue": {
"colors": {
"primary": "#1e3a8a",
"secondary": "#3b82f6",
"accent": "#f59e0b",
"background": "#ffffff",
"text": "#1e293b",
"text_light": "#64748b"
},
"fonts": {
"heading": "Arial, sans-serif",
"body": "Arial, sans-serif"
}
},
"modern-purple": {
"colors": {
"primary": "#7c3aed",
"secondary": "#ec4899",
"accent": "#06b6d4",
"background": "#ffffff",
"text": "#1e293b",
"text_light": "#64748b"
},
"fonts": {
"heading": "Inter, sans-serif",
"body": "Inter, sans-serif"
}
},
"minimal-mono": {
"colors": {
"primary": "#000000",
"secondary": "#525252",
"accent": "#000000",
"background": "#ffffff",
"text": "#000000",
"text_light": "#737373"
},
"fonts": {
"heading": "Helvetica Neue, sans-serif",
"body": "Helvetica Neue, sans-serif"
}
},
"ocean-teal": {
"colors": {
"primary": "#0d9488",
"secondary": "#06b6d4",
"accent": "#0284c7",
"background": "#ffffff",
"text": "#134e4a",
"text_light": "#5eead4"
},
"fonts": {
"heading": "Roboto, sans-serif",
"body": "Roboto, sans-serif"
}
},
"sunset-warm": {
"colors": {
"primary": "#dc2626",
"secondary": "#f97316",
"accent": "#fbbf24",
"background": "#ffffff",
"text": "#7c2d12",
"text_light": "#ea580c"
},
"fonts": {
"heading": "Poppins, sans-serif",
"body": "Open Sans, sans-serif"
}
},
"forest-green": {
"colors": {
"primary": "#14532d",
"secondary": "#16a34a",
"accent": "#84cc16",
"background": "#ffffff",
"text": "#14532d",
"text_light": "#22c55e"
},
"fonts": {
"heading": "Montserrat, sans-serif",
"body": "Source Sans Pro, sans-serif"
}
},
"midnight-dark": {
"colors": {
"primary": "#f3f4f6",
"secondary": "#60a5fa",
"accent": "#f472b6",
"background": "#111827",
"text": "#f9fafb",
"text_light": "#d1d5db"
},
"fonts": {
"heading": "Space Grotesk, sans-serif",
"body": "Inter, sans-serif"
}
},
"pastel-soft": {
"colors": {
"primary": "#c084fc",
"secondary": "#fda4af",
"accent": "#86efac",
"background": "#fef3c7",
"text": "#451a03",
"text_light": "#92400e"
},
"fonts": {
"heading": "Quicksand, sans-serif",
"body": "Nunito, sans-serif"
}
},
"bold-contrast": {
"colors": {
"primary": "#dc2626",
"secondary": "#000000",
"accent": "#fbbf24",
"background": "#ffffff",
"text": "#000000",
"text_light": "#525252"
},
"fonts": {
"heading": "Bebas Neue, sans-serif",
"body": "Roboto, sans-serif"
}
},
"elegant-gold": {
"colors": {
"primary": "#1e293b",
"secondary": "#f59e0b",
"accent": "#92400e",
"background": "#fffbeb",
"text": "#1e293b",
"text_light": "#475569"
},
"fonts": {
"heading": "Playfair Display, serif",
"body": "Lato, sans-serif"
}
}
}
return themes.get(theme, themes["corporate-blue"])
def _generate_html_preview(self, presentation_data: Dict) -> str:
theme = presentation_data["theme_config"]
colors = theme["colors"]
fonts = theme["fonts"]
is_dark = colors["background"].startswith("#1") or colors["background"].startswith("#0")
slides_html = []
for i, slide in enumerate(presentation_data["slides"]):
slide_html = self._render_slide_html(slide, i + 1, theme)
slides_html.append(slide_html)
html = f"""
{presentation_data['metadata']['title']}
{presentation_data['metadata']['title']}
{f'
{presentation_data["metadata"]["subtitle"]}
' if presentation_data['metadata'].get('subtitle') else ''}
{''.join(slides_html)}
"""
return html
def _render_slide_html(self, slide: Dict, slide_number: int, theme: Dict) -> str:
"""Render a slide to HTML using local image paths."""
layout = slide["layout"]
content = slide["content"]
# Helper function to get image URL for HTML
def get_html_image_url(image_info):
if isinstance(image_info, dict):
if 'local_path' in image_info and image_info['local_path']:
# Use the local downloaded image
return f"/workspace/{image_info['local_path']}"
elif 'url' in image_info:
# Fallback to original URL if local path not available
return self._get_display_url(image_info['url'])
elif isinstance(image_info, str):
return self._get_display_url(image_info)
return ""
if layout == "title":
return f"""
{content.get('title', '')}
{f'
{content["subtitle"]}
' if content.get('subtitle') else ''}
{slide_number}