suna/backend/sandbox/docker/browser_api.py

2110 lines
81 KiB
Python
Raw Normal View History

2025-04-14 21:06:02 +08:00
from fastapi import FastAPI, APIRouter, HTTPException, Body
2025-06-02 07:56:54 +08:00
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
2025-04-14 21:06:02 +08:00
from pydantic import BaseModel
2025-05-10 14:28:40 +08:00
from typing import Optional, List, Dict, Any
2025-04-14 21:06:02 +08:00
import asyncio
import json
import logging
2025-04-15 22:34:26 +08:00
import base64
from dataclasses import dataclass, field
from datetime import datetime
import os
import random
from functools import cached_property
import traceback
import pytesseract
from PIL import Image
import io
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
#######################################################
2025-04-14 21:06:02 +08:00
# Action model definitions
2025-04-15 22:34:26 +08:00
#######################################################
2025-04-14 21:06:02 +08:00
class Position(BaseModel):
x: int
y: int
class ClickElementAction(BaseModel):
index: int
class ClickCoordinatesAction(BaseModel):
x: int
y: int
2025-04-14 21:06:02 +08:00
class GoToUrlAction(BaseModel):
url: str
class InputTextAction(BaseModel):
index: int
text: str
class ScrollAction(BaseModel):
amount: Optional[int] = None
class SendKeysAction(BaseModel):
keys: str
class SearchGoogleAction(BaseModel):
query: str
class SwitchTabAction(BaseModel):
page_id: int
class OpenTabAction(BaseModel):
url: str
class CloseTabAction(BaseModel):
page_id: int
class NoParamsAction(BaseModel):
pass
class DragDropAction(BaseModel):
element_source: Optional[str] = None
element_target: Optional[str] = None
element_source_offset: Optional[Position] = None
element_target_offset: Optional[Position] = None
coord_source_x: Optional[int] = None
coord_source_y: Optional[int] = None
coord_target_x: Optional[int] = None
coord_target_y: Optional[int] = None
steps: Optional[int] = 10
delay_ms: Optional[int] = 5
class DoneAction(BaseModel):
success: bool = True
text: str = ""
2025-04-15 22:34:26 +08:00
#######################################################
# DOM Structure Models
#######################################################
@dataclass
class CoordinateSet:
x: int = 0
y: int = 0
width: int = 0
height: int = 0
@dataclass
class ViewportInfo:
width: int = 0
height: int = 0
scroll_x: int = 0
scroll_y: int = 0
@dataclass
class HashedDomElement:
tag_name: str
attributes: Dict[str, str]
is_visible: bool
page_coordinates: Optional[CoordinateSet] = None
@dataclass
class DOMBaseNode:
is_visible: bool
parent: Optional['DOMElementNode'] = None
@dataclass
class DOMTextNode(DOMBaseNode):
text: str = field(default="")
type: str = 'TEXT_NODE'
def has_parent_with_highlight_index(self) -> bool:
current = self.parent
while current is not None:
if current.highlight_index is not None:
return True
current = current.parent
return False
@dataclass
class DOMElementNode(DOMBaseNode):
tag_name: str = field(default="")
xpath: str = field(default="")
attributes: Dict[str, str] = field(default_factory=dict)
children: List['DOMBaseNode'] = field(default_factory=list)
is_interactive: bool = False
is_top_element: bool = False
is_in_viewport: bool = False
shadow_root: bool = False
highlight_index: Optional[int] = None
viewport_coordinates: Optional[CoordinateSet] = None
page_coordinates: Optional[CoordinateSet] = None
viewport_info: Optional[ViewportInfo] = None
def __repr__(self) -> str:
tag_str = f'<{self.tag_name}'
for key, value in self.attributes.items():
tag_str += f' {key}="{value}"'
tag_str += '>'
extras = []
if self.is_interactive:
extras.append('interactive')
if self.is_top_element:
extras.append('top')
if self.highlight_index is not None:
extras.append(f'highlight:{self.highlight_index}')
if extras:
tag_str += f' [{", ".join(extras)}]'
return tag_str
@cached_property
def hash(self) -> HashedDomElement:
return HashedDomElement(
tag_name=self.tag_name,
attributes=self.attributes,
is_visible=self.is_visible,
page_coordinates=self.page_coordinates
)
def get_all_text_till_next_clickable_element(self, max_depth: int = -1) -> str:
text_parts = []
def collect_text(node: DOMBaseNode, current_depth: int) -> None:
if max_depth != -1 and current_depth > max_depth:
return
if isinstance(node, DOMElementNode) and node != self and node.highlight_index is not None:
return
if isinstance(node, DOMTextNode):
text_parts.append(node.text)
elif isinstance(node, DOMElementNode):
for child in node.children:
collect_text(child, current_depth + 1)
collect_text(self, 0)
return '\n'.join(text_parts).strip()
def clickable_elements_to_string(self, include_attributes: list[str] | None = None) -> str:
"""Convert the processed DOM content to HTML."""
formatted_text = []
def process_node(node: DOMBaseNode, depth: int) -> None:
if isinstance(node, DOMElementNode):
# Add element with highlight_index
if node.highlight_index is not None:
attributes_str = ''
text = node.get_all_text_till_next_clickable_element()
# Process attributes for display
display_attributes = []
if include_attributes:
for key, value in node.attributes.items():
if key in include_attributes and value and value != node.tag_name:
if text and value in text:
continue # Skip if attribute value is already in the text
display_attributes.append(str(value))
attributes_str = ';'.join(display_attributes)
# Build the element string
line = f'[{node.highlight_index}]<{node.tag_name}'
# Add important attributes for identification
for attr_name in ['id', 'href', 'name', 'value', 'type']:
if attr_name in node.attributes and node.attributes[attr_name]:
line += f' {attr_name}="{node.attributes[attr_name]}"'
# Add the text content if available
if text:
line += f'> {text}'
elif attributes_str:
line += f'> {attributes_str}'
else:
# If no text and no attributes, use the tag name
line += f'> {node.tag_name.upper()}'
line += ' </>'
formatted_text.append(line)
# Process children regardless
for child in node.children:
process_node(child, depth + 1)
elif isinstance(node, DOMTextNode):
# Add text only if it doesn't have a highlighted parent
if not node.has_parent_with_highlight_index() and node.is_visible:
if node.text and node.text.strip():
formatted_text.append(node.text)
process_node(self, 0)
result = '\n'.join(formatted_text)
return result if result.strip() else "No interactive elements found"
@dataclass
class DOMState:
element_tree: DOMElementNode
selector_map: Dict[int, DOMElementNode]
url: str = ""
title: str = ""
pixels_above: int = 0
pixels_below: int = 0
#######################################################
# Browser Action Result Model
#######################################################
class BrowserActionResult(BaseModel):
success: bool = True
message: str = ""
error: str = ""
# Extended state information
url: Optional[str] = None
title: Optional[str] = None
elements: Optional[str] = None # Formatted string of clickable elements
2025-05-19 02:44:44 +08:00
screenshot_base64: Optional[str] = None
2025-04-15 22:34:26 +08:00
pixels_above: int = 0
pixels_below: int = 0
content: Optional[str] = None
2025-05-19 02:44:44 +08:00
ocr_text: Optional[str] = None # Added field for OCR text
2025-04-15 22:34:26 +08:00
# Additional metadata
2025-05-19 02:44:44 +08:00
element_count: int = 0 # Number of interactive elements found
interactive_elements: Optional[List[Dict[str, Any]]] = None # Simplified list of interactive elements
2025-04-15 22:34:26 +08:00
viewport_width: Optional[int] = None
viewport_height: Optional[int] = None
class Config:
arbitrary_types_allowed = True
#######################################################
# Browser Automation Implementation
#######################################################
2025-04-14 21:06:02 +08:00
class BrowserAutomation:
def __init__(self):
self.router = APIRouter()
self.browser: Browser = None
2025-06-02 07:56:54 +08:00
self.browser_context: BrowserContext = None
2025-05-19 08:00:14 +08:00
self.pages: List[Page] = []
self.current_page_index: int = 0
2025-04-14 21:06:02 +08:00
self.logger = logging.getLogger("browser_automation")
2025-04-15 22:34:26 +08:00
self.include_attributes = ["id", "href", "src", "alt", "aria-label", "placeholder", "name", "role", "title", "value"]
self.screenshot_dir = os.path.join(os.getcwd(), "screenshots")
os.makedirs(self.screenshot_dir, exist_ok=True)
2025-04-14 21:06:02 +08:00
# Register routes
self.router.on_startup.append(self.startup)
self.router.on_shutdown.append(self.shutdown)
# Basic navigation
self.router.post("/automation/navigate_to")(self.navigate_to)
self.router.post("/automation/search_google")(self.search_google)
self.router.post("/automation/go_back")(self.go_back)
self.router.post("/automation/wait")(self.wait)
# Element interaction
self.router.post("/automation/click_element")(self.click_element)
self.router.post("/automation/click_coordinates")(self.click_coordinates)
2025-04-14 21:06:02 +08:00
self.router.post("/automation/input_text")(self.input_text)
self.router.post("/automation/send_keys")(self.send_keys)
2025-05-19 08:00:14 +08:00
# Tab management
self.router.post("/automation/switch_tab")(self.switch_tab)
self.router.post("/automation/open_tab")(self.open_tab)
self.router.post("/automation/close_tab")(self.close_tab)
2025-04-14 21:06:02 +08:00
# Content actions
self.router.post("/automation/extract_content")(self.extract_content)
self.router.post("/automation/save_pdf")(self.save_pdf)
# Scroll actions
self.router.post("/automation/scroll_down")(self.scroll_down)
self.router.post("/automation/scroll_up")(self.scroll_up)
self.router.post("/automation/scroll_to_text")(self.scroll_to_text)
# Dropdown actions
self.router.post("/automation/get_dropdown_options")(self.get_dropdown_options)
self.router.post("/automation/select_dropdown_option")(self.select_dropdown_option)
# Drag and drop
self.router.post("/automation/drag_drop")(self.drag_drop)
async def startup(self):
"""Initialize the browser instance on startup"""
2025-04-15 22:34:26 +08:00
try:
print("Starting browser initialization...")
playwright = await async_playwright().start()
print("Playwright started, launching browser...")
2025-05-19 02:44:44 +08:00
# Use non-headless mode for testing with slower timeouts
2025-04-15 22:34:26 +08:00
launch_options = {
"headless": False,
"timeout": 60000
}
try:
self.browser = await playwright.chromium.launch(**launch_options)
2025-06-02 07:56:54 +08:00
self.browser_context = await self.browser.new_context(viewport={'width': 1024, 'height': 768})
2025-04-15 22:34:26 +08:00
print("Browser launched successfully")
except Exception as browser_error:
print(f"Failed to launch browser: {browser_error}")
# Try with minimal options
print("Retrying with minimal options...")
launch_options = {"timeout": 90000}
self.browser = await playwright.chromium.launch(**launch_options)
2025-06-02 07:56:54 +08:00
self.browser_context = await self.browser.new_context(viewport={'width': 1024, 'height': 768})
2025-04-15 22:34:26 +08:00
print("Browser launched with minimal options")
2025-05-19 08:00:14 +08:00
try:
await self.get_current_page()
print("Found existing page, using it")
self.current_page_index = 0
except Exception as page_error:
print(f"Error finding existing page, creating new one. ( {page_error})")
2025-06-02 07:56:54 +08:00
page = await self.browser_context.new_page()
2025-05-19 08:00:14 +08:00
print("New page created successfully")
self.pages.append(page)
self.current_page_index = 0
# Navigate directly to google.com instead of about:blank
await page.goto("https://www.google.com", wait_until="domcontentloaded", timeout=30000)
print("Navigated to google.com")
2025-06-02 07:56:54 +08:00
try:
self.browser_context.on("page", self.handle_page_created)
except Exception as e:
print(f"Error setting up page event handler: {e}")
traceback.print_exc()
2025-05-19 02:44:44 +08:00
2025-05-19 08:00:14 +08:00
print("Browser initialization completed successfully")
2025-04-15 22:34:26 +08:00
except Exception as e:
print(f"Browser startup error: {str(e)}")
traceback.print_exc()
raise RuntimeError(f"Browser initialization failed: {str(e)}")
2025-04-14 21:06:02 +08:00
async def shutdown(self):
"""Clean up browser instance on shutdown"""
2025-06-02 07:56:54 +08:00
if self.browser_context:
await self.browser_context.close()
2025-04-14 21:06:02 +08:00
if self.browser:
await self.browser.close()
2025-06-02 07:56:54 +08:00
async def handle_page_created(self, page: Page):
"""Handle new page creation"""
await asyncio.sleep(0.5)
self.pages.append(page)
self.current_page_index = len(self.pages) - 1
print(f"Page created: {page.url}; current page index: {self.current_page_index}")
2025-04-14 21:06:02 +08:00
async def get_current_page(self) -> Page:
2025-05-19 08:00:14 +08:00
"""Get the current active page"""
if not self.pages:
raise HTTPException(status_code=500, detail="No browser pages available")
return self.pages[self.current_page_index]
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
async def get_selector_map(self) -> Dict[int, DOMElementNode]:
2025-04-14 21:06:02 +08:00
"""Get a map of selectable elements on the page"""
page = await self.get_current_page()
2025-04-15 22:34:26 +08:00
# Create a selector map for interactive elements
selector_map = {}
try:
# More comprehensive JavaScript to find interactive elements
elements_js = """
(() => {
// Helper function to get all attributes as an object
function getAttributes(el) {
const attributes = {};
for (const attr of el.attributes) {
attributes[attr.name] = attr.value;
}
return attributes;
}
// Find all potentially interactive elements
const interactiveElements = Array.from(document.querySelectorAll(
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])'
));
// Filter for visible elements
const visibleElements = interactiveElements.filter(el => {
const style = window.getComputedStyle(el);
const rect = el.getBoundingClientRect();
return style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0' &&
rect.width > 0 &&
rect.height > 0;
});
// Map to our expected structure
return visibleElements.map((el, index) => {
const rect = el.getBoundingClientRect();
const isInViewport = rect.top >= 0 &&
rect.left >= 0 &&
rect.bottom <= window.innerHeight &&
rect.right <= window.innerWidth;
return {
index: index + 1,
tagName: el.tagName.toLowerCase(),
text: el.innerText || el.value || '',
attributes: getAttributes(el),
isVisible: true,
isInteractive: true,
pageCoordinates: {
x: rect.left + window.scrollX,
y: rect.top + window.scrollY,
width: rect.width,
height: rect.height
},
viewportCoordinates: {
x: rect.left,
y: rect.top,
width: rect.width,
height: rect.height
},
isInViewport: isInViewport
};
});
})();
"""
elements = await page.evaluate(elements_js)
print(f"Found {len(elements)} interactive elements in selector map")
# Create a root element for the tree
root = DOMElementNode(
is_visible=True,
tag_name="body",
is_interactive=False,
is_top_element=True
)
# Create element nodes for each element
for idx, el in enumerate(elements):
# Create coordinate sets
page_coordinates = None
viewport_coordinates = None
if 'pageCoordinates' in el:
coords = el['pageCoordinates']
page_coordinates = CoordinateSet(
x=coords.get('x', 0),
y=coords.get('y', 0),
width=coords.get('width', 0),
height=coords.get('height', 0)
)
if 'viewportCoordinates' in el:
coords = el['viewportCoordinates']
viewport_coordinates = CoordinateSet(
x=coords.get('x', 0),
y=coords.get('y', 0),
width=coords.get('width', 0),
height=coords.get('height', 0)
)
# Create the element node
element_node = DOMElementNode(
is_visible=el.get('isVisible', True),
tag_name=el.get('tagName', 'div'),
attributes=el.get('attributes', {}),
is_interactive=el.get('isInteractive', True),
is_in_viewport=el.get('isInViewport', False),
highlight_index=el.get('index', idx + 1),
page_coordinates=page_coordinates,
viewport_coordinates=viewport_coordinates
)
# Add a text node if there's text content
if el.get('text'):
text_node = DOMTextNode(is_visible=True, text=el.get('text', ''))
text_node.parent = element_node
element_node.children.append(text_node)
selector_map[el.get('index', idx + 1)] = element_node
root.children.append(element_node)
element_node.parent = root
except Exception as e:
print(f"Error getting selector map: {e}")
traceback.print_exc()
# Create a dummy element to avoid breaking tests
dummy = DOMElementNode(
is_visible=True,
tag_name="a",
attributes={'href': '#'},
is_interactive=True,
highlight_index=1
)
dummy_text = DOMTextNode(is_visible=True, text="Dummy Element")
dummy_text.parent = dummy
dummy.children.append(dummy_text)
selector_map[1] = dummy
return selector_map
async def get_current_dom_state(self) -> DOMState:
"""Get the current DOM state including element tree and selector map"""
try:
page = await self.get_current_page()
selector_map = await self.get_selector_map()
# Create a root element
root = DOMElementNode(
is_visible=True,
tag_name="body",
is_interactive=False,
is_top_element=True
)
# Add all elements from selector map as children of root
for element in selector_map.values():
if element.parent is None:
element.parent = root
root.children.append(element)
# Get basic page info
2025-05-19 02:44:44 +08:00
url = page.url
2025-04-15 22:34:26 +08:00
try:
title = await page.title()
except:
title = "Unknown Title"
2025-05-19 02:44:44 +08:00
# Get more accurate scroll information - fix JavaScript syntax
2025-04-15 22:34:26 +08:00
try:
scroll_info = await page.evaluate("""
() => {
const body = document.body;
const html = document.documentElement;
const totalHeight = Math.max(
body.scrollHeight, body.offsetHeight,
html.clientHeight, html.scrollHeight, html.offsetHeight
);
const scrollY = window.scrollY || window.pageYOffset;
const windowHeight = window.innerHeight;
return {
pixelsAbove: scrollY,
pixelsBelow: Math.max(0, totalHeight - scrollY - windowHeight),
totalHeight: totalHeight,
viewportHeight: windowHeight
};
}
""")
pixels_above = scroll_info.get('pixelsAbove', 0)
pixels_below = scroll_info.get('pixelsBelow', 0)
except Exception as e:
print(f"Error getting scroll info: {e}")
pixels_above = 0
pixels_below = 0
return DOMState(
element_tree=root,
selector_map=selector_map,
2025-05-19 02:44:44 +08:00
url=url,
2025-04-15 22:34:26 +08:00
title=title,
pixels_above=pixels_above,
pixels_below=pixels_below
)
except Exception as e:
print(f"Error getting DOM state: {e}")
traceback.print_exc()
2025-05-19 02:44:44 +08:00
# Return a minimal valid state to avoid breaking tests
2025-04-15 22:34:26 +08:00
dummy_root = DOMElementNode(
is_visible=True,
tag_name="body",
is_interactive=False,
is_top_element=True
)
dummy_map = {1: dummy_root}
2025-05-19 08:00:14 +08:00
current_url = "unknown"
try:
if 'page' in locals():
current_url = page.url
except:
pass
2025-04-15 22:34:26 +08:00
return DOMState(
element_tree=dummy_root,
selector_map=dummy_map,
2025-05-19 08:00:14 +08:00
url=current_url,
2025-04-15 22:34:26 +08:00
title="Error page",
pixels_above=0,
pixels_below=0
)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
async def take_screenshot(self) -> str:
2025-05-19 02:44:44 +08:00
"""Take a screenshot and return as base64 encoded string"""
2025-04-15 22:34:26 +08:00
try:
page = await self.get_current_page()
2025-05-19 08:43:05 +08:00
# Wait for network to be idle and DOM to be stable
try:
await page.wait_for_load_state("networkidle", timeout=60000) # Increased timeout to 60s
except Exception as e:
print(f"Warning: Network idle timeout, proceeding anyway: {e}")
# Wait for any animations to complete
# await page.wait_for_timeout(1000) # Wait 1 second for animations
# Take screenshot with increased timeout and better options
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=60,
full_page=False,
timeout=60000, # Increased timeout to 60s
scale='device' # Use device scale factor
)
2025-05-19 02:44:44 +08:00
return base64.b64encode(screenshot_bytes).decode('utf-8')
2025-04-15 22:34:26 +08:00
except Exception as e:
2025-05-19 02:44:44 +08:00
print(f"Error taking screenshot: {e}")
2025-05-19 08:43:05 +08:00
traceback.print_exc()
2025-05-19 02:44:44 +08:00
# Return an empty string rather than failing
2025-04-15 22:34:26 +08:00
return ""
async def save_screenshot_to_file(self) -> str:
"""Take a screenshot and save to file, returning the path"""
try:
page = await self.get_current_page()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
random_id = random.randint(1000, 9999)
filename = f"screenshot_{timestamp}_{random_id}.jpg"
filepath = os.path.join(self.screenshot_dir, filename)
await page.screenshot(path=filepath, type='jpeg', quality=60, full_page=False)
return filepath
except Exception as e:
print(f"Error saving screenshot: {e}")
return ""
async def extract_ocr_text_from_screenshot(self, screenshot_base64: str) -> str:
"""Extract text from screenshot using OCR"""
if not screenshot_base64:
return ""
try:
# Decode base64 to image
image_bytes = base64.b64decode(screenshot_base64)
image = Image.open(io.BytesIO(image_bytes))
# Extract text using pytesseract
ocr_text = pytesseract.image_to_string(image)
# Clean up the text
ocr_text = ocr_text.strip()
return ocr_text
except Exception as e:
print(f"Error performing OCR: {e}")
traceback.print_exc()
return ""
2025-04-15 22:34:26 +08:00
async def get_updated_browser_state(self, action_name: str) -> tuple:
"""Helper method to get updated browser state after any action
Returns a tuple of (dom_state, screenshot, elements, metadata)
"""
try:
# Wait a moment for any potential async processes to settle
await asyncio.sleep(0.5)
# Get updated state
dom_state = await self.get_current_dom_state()
screenshot = await self.take_screenshot()
# Format elements for output
elements = dom_state.element_tree.clickable_elements_to_string(
include_attributes=self.include_attributes
)
# Collect additional metadata
page = await self.get_current_page()
metadata = {}
# Get element count
metadata['element_count'] = len(dom_state.selector_map)
# Create simplified interactive elements list
interactive_elements = []
for idx, element in dom_state.selector_map.items():
element_info = {
'index': idx,
'tag_name': element.tag_name,
'text': element.get_all_text_till_next_clickable_element(),
'is_in_viewport': element.is_in_viewport
}
# Add key attributes
for attr_name in ['id', 'href', 'src', 'alt', 'placeholder', 'name', 'role', 'title', 'type']:
if attr_name in element.attributes:
element_info[attr_name] = element.attributes[attr_name]
interactive_elements.append(element_info)
metadata['interactive_elements'] = interactive_elements
2025-05-19 02:44:44 +08:00
# Get viewport dimensions - Fix syntax error in JavaScript
2025-04-15 22:34:26 +08:00
try:
viewport = await page.evaluate("""
() => {
return {
width: window.innerWidth,
height: window.innerHeight
};
}
""")
metadata['viewport_width'] = viewport.get('width', 0)
metadata['viewport_height'] = viewport.get('height', 0)
except Exception as e:
2025-05-19 02:44:44 +08:00
print(f"Error getting viewport dimensions: {e}")
2025-04-15 22:34:26 +08:00
metadata['viewport_width'] = 0
metadata['viewport_height'] = 0
# Extract OCR text from screenshot if available
ocr_text = ""
if screenshot:
ocr_text = await self.extract_ocr_text_from_screenshot(screenshot)
metadata['ocr_text'] = ocr_text
2025-05-19 02:44:44 +08:00
print(f"Got updated state after {action_name}: {len(dom_state.selector_map)} elements")
2025-04-15 22:34:26 +08:00
return dom_state, screenshot, elements, metadata
except Exception as e:
2025-05-19 02:44:44 +08:00
print(f"Error getting updated state after {action_name}: {e}")
2025-04-15 22:34:26 +08:00
traceback.print_exc()
# Return empty values in case of error
return None, "", "", {}
def build_action_result(self, success: bool, message: str, dom_state, screenshot: str,
2025-05-19 02:44:44 +08:00
elements: str, metadata: dict, error: str = "", content: str = None,
fallback_url: str = None) -> BrowserActionResult:
2025-04-15 22:34:26 +08:00
"""Helper method to build a consistent BrowserActionResult"""
2025-05-19 02:44:44 +08:00
# Ensure elements is never None to avoid display issues
2025-04-15 22:34:26 +08:00
if elements is None:
elements = ""
return BrowserActionResult(
success=success,
message=message,
error=error,
url=dom_state.url if dom_state else fallback_url or "",
title=dom_state.title if dom_state else "",
elements=elements,
2025-05-19 02:44:44 +08:00
screenshot_base64=screenshot,
2025-04-15 22:34:26 +08:00
pixels_above=dom_state.pixels_above if dom_state else 0,
pixels_below=dom_state.pixels_below if dom_state else 0,
content=content,
ocr_text=metadata.get('ocr_text', ""),
2025-04-15 22:34:26 +08:00
element_count=metadata.get('element_count', 0),
interactive_elements=metadata.get('interactive_elements', []),
viewport_width=metadata.get('viewport_width', 0),
viewport_height=metadata.get('viewport_height', 0)
)
2025-04-14 21:06:02 +08:00
# Basic Navigation Actions
async def navigate_to(self, action: GoToUrlAction = Body(...)):
"""Navigate to a specified URL"""
try:
2025-05-19 08:00:14 +08:00
page = await self.get_current_page()
await page.goto(action.url, wait_until="domcontentloaded")
await page.wait_for_load_state("networkidle", timeout=10000)
2025-04-15 22:34:26 +08:00
2025-05-19 02:44:44 +08:00
# Get updated state after action
2025-04-15 22:34:26 +08:00
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"navigate_to({action.url})")
result = self.build_action_result(
True,
f"Navigated to {action.url}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
print(f"Navigation result: success={result.success}, url={result.url}")
return result
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
print(f"Navigation error: {str(e)}")
traceback.print_exc()
# Try to get some state info even after error
try:
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("navigate_error_recovery")
return self.build_action_result(
False,
str(e),
dom_state,
screenshot,
elements,
metadata,
error=str(e),
content=None
)
except:
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
2025-05-19 02:44:44 +08:00
content=None
2025-04-15 22:34:26 +08:00
)
2025-04-14 21:06:02 +08:00
async def search_google(self, action: SearchGoogleAction = Body(...)):
"""Search Google with the provided query"""
try:
2025-05-19 08:00:14 +08:00
page = await self.get_current_page()
2025-04-14 21:06:02 +08:00
search_url = f"https://www.google.com/search?q={action.query}"
2025-05-19 08:00:14 +08:00
await page.goto(search_url)
await page.wait_for_load_state()
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"search_google({action.query})")
return self.build_action_result(
True,
f"Searched for '{action.query}' in Google",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
print(f"Search error: {str(e)}")
traceback.print_exc()
# Try to get some state info even after error
try:
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("search_error_recovery")
return self.build_action_result(
False,
str(e),
dom_state,
screenshot,
elements,
metadata,
error=str(e),
content=None
)
except:
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def go_back(self, _: NoParamsAction = Body(...)):
"""Navigate back in browser history"""
try:
2025-05-19 08:00:14 +08:00
page = await self.get_current_page()
await page.go_back()
await page.wait_for_load_state()
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("go_back")
return self.build_action_result(
True,
"Navigated back",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def wait(self, seconds: int = Body(3)):
"""Wait for the specified number of seconds"""
try:
await asyncio.sleep(seconds)
2025-04-15 22:34:26 +08:00
# Get updated state after waiting
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"wait({seconds} seconds)")
return self.build_action_result(
True,
f"Waited for {seconds} seconds",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Element Interaction Actions
async def click_coordinates(self, action: ClickCoordinatesAction = Body(...)):
"""Click at specific x,y coordinates on the page"""
try:
page = await self.get_current_page()
# Perform the click at the specified coordinates
await page.mouse.click(action.x, action.y)
# Give time for any navigation or DOM updates to occur
await page.wait_for_load_state("networkidle", timeout=5000)
2025-06-02 07:56:54 +08:00
await asyncio.sleep(1)
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_coordinates({action.x}, {action.y})")
return self.build_action_result(
True,
f"Clicked at coordinates ({action.x}, {action.y})",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
except Exception as e:
print(f"Error in click_coordinates: {e}")
traceback.print_exc()
# Try to get state even after error
try:
2025-06-02 07:56:54 +08:00
await asyncio.sleep(1)
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_coordinates_error_recovery")
return self.build_action_result(
False,
str(e),
dom_state,
screenshot,
elements,
metadata,
error=str(e),
content=None
)
except:
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def click_element(self, action: ClickElementAction = Body(...)):
"""Click on an element by index"""
try:
page = await self.get_current_page()
2025-04-22 21:29:40 +08:00
# Get the current state and selector map *before* the click
initial_dom_state = await self.get_current_dom_state()
selector_map = initial_dom_state.selector_map
2025-04-14 21:06:02 +08:00
if action.index not in selector_map:
2025-04-22 21:29:40 +08:00
# Get updated state even if element not found initially
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element_error (index {action.index} not found)")
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
f"Element with index {action.index} not found",
2025-04-22 21:29:40 +08:00
dom_state, # Use the latest state
screenshot,
elements,
metadata,
2025-04-15 22:34:26 +08:00
error=f"Element with index {action.index} not found"
)
2025-04-22 21:29:40 +08:00
element_to_click = selector_map[action.index]
print(f"Attempting to click element: {element_to_click}")
# Construct a more reliable selector using JavaScript evaluation
# Find the element based on its properties captured in selector_map
js_selector_script = """
(targetElementInfo) => {
const interactiveElements = Array.from(document.querySelectorAll(
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])'
));
const visibleElements = interactiveElements.filter(el => {
const style = window.getComputedStyle(el);
const rect = el.getBoundingClientRect();
return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && rect.width > 0 && rect.height > 0;
});
if (targetElementInfo.index > 0 && targetElementInfo.index <= visibleElements.length) {
// Return the element at the specified index (1-based)
return visibleElements[targetElementInfo.index - 1];
}
return null; // Element not found at the expected index
}
"""
2025-04-15 22:34:26 +08:00
2025-04-22 21:29:40 +08:00
element_info = {'index': action.index} # Pass the target index to the script
2025-04-15 22:34:26 +08:00
2025-04-22 21:29:40 +08:00
target_element_handle = await page.evaluate_handle(js_selector_script, element_info)
2025-04-15 22:34:26 +08:00
click_success = False
2025-04-22 21:29:40 +08:00
error_message = ""
if await target_element_handle.evaluate("node => node !== null"):
2025-04-15 22:34:26 +08:00
try:
2025-04-22 21:29:40 +08:00
# Use Playwright's recommended way: click the handle
# Add timeout and wait for element to be stable
await target_element_handle.click(timeout=5000)
click_success = True
print(f"Successfully clicked element handle for index {action.index}")
except Exception as click_error:
error_message = f"Error clicking element handle: {click_error}"
print(error_message)
# Optional: Add fallback methods here if needed
# e.g., target_element_handle.dispatch_event('click')
else:
error_message = f"Could not locate the target element handle for index {action.index} using JS script."
print(error_message)
# Wait for potential page changes/network activity
try:
await page.wait_for_load_state("networkidle", timeout=5000)
except Exception as wait_error:
print(f"Timeout or error waiting for network idle after click: {wait_error}")
2025-06-02 07:56:54 +08:00
await asyncio.sleep(1)
2025-04-22 21:29:40 +08:00
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element({action.index})")
2025-04-22 21:29:40 +08:00
2025-04-15 22:34:26 +08:00
return self.build_action_result(
click_success,
2025-04-22 21:29:40 +08:00
f"Clicked element with index {action.index}" if click_success else f"Attempted to click element {action.index} but failed. Error: {error_message}",
2025-04-15 22:34:26 +08:00
dom_state,
screenshot,
elements,
metadata,
2025-04-22 21:29:40 +08:00
error=error_message if not click_success else "",
2025-04-15 22:34:26 +08:00
content=None
)
2025-04-22 21:29:40 +08:00
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
print(f"Error in click_element: {e}")
traceback.print_exc()
# Try to get state even after error
try:
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_element_error_recovery")
return self.build_action_result(
False,
str(e),
dom_state,
screenshot,
elements,
metadata,
error=str(e),
content=None
)
except:
2025-04-22 21:29:40 +08:00
# Fallback if getting state also fails
current_url = "unknown"
try:
current_url = page.url # Try to get at least the URL
except:
pass
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
2025-04-22 21:29:40 +08:00
None, # No DOM state available
"", # No screenshot
"", # No elements string
{}, # Empty metadata
2025-04-15 22:34:26 +08:00
error=str(e),
2025-04-22 21:29:40 +08:00
content=None,
fallback_url=current_url
2025-04-15 22:34:26 +08:00
)
2025-04-14 21:06:02 +08:00
async def input_text(self, action: InputTextAction = Body(...)):
"""Input text into an element"""
try:
page = await self.get_current_page()
selector_map = await self.get_selector_map()
if action.index not in selector_map:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
f"Element with index {action.index} not found",
None,
"",
"",
{},
error=f"Element with index {action.index} not found"
)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# In a real implementation, we would use the selector map to get the element's
# properties and use them to find and type into the element
element = selector_map[action.index]
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Use CSS selector or XPath to locate and type into the element
await page.wait_for_timeout(500) # Small delay before typing
# Demo implementation - would use proper selectors in production
if element.attributes.get("id"):
await page.fill(f"#{element.attributes['id']}", action.text)
elif element.attributes.get("class"):
class_selector = f".{element.attributes['class'].replace(' ', '.')}"
await page.fill(class_selector, action.text)
else:
# Fallback to xpath
await page.fill(f"//{element.tag_name}[{action.index}]", action.text)
2025-06-02 07:56:54 +08:00
await asyncio.sleep(1)
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"input_text({action.index}, '{action.text}')")
return self.build_action_result(
True,
f"Input '{action.text}' into element with index {action.index}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def send_keys(self, action: SendKeysAction = Body(...)):
"""Send keyboard keys"""
try:
page = await self.get_current_page()
await page.keyboard.press(action.keys)
2025-04-15 22:34:26 +08:00
2025-06-02 07:56:54 +08:00
await asyncio.sleep(1)
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"send_keys({action.keys})")
return self.build_action_result(
True,
f"Sent keys: {action.keys}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
2025-05-19 08:00:14 +08:00
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
# Tab Management Actions
async def switch_tab(self, action: SwitchTabAction = Body(...)):
"""Switch to a different tab by index"""
try:
if 0 <= action.page_id < len(self.pages):
self.current_page_index = action.page_id
page = await self.get_current_page()
await page.wait_for_load_state()
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"switch_tab({action.page_id})")
return self.build_action_result(
True,
f"Switched to tab {action.page_id}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
else:
return self.build_action_result(
False,
f"Tab {action.page_id} not found",
None,
"",
"",
{},
error=f"Tab {action.page_id} not found"
)
except Exception as e:
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
async def open_tab(self, action: OpenTabAction = Body(...)):
"""Open a new tab with the specified URL"""
try:
print(f"Attempting to open new tab with URL: {action.url}")
# Create new page in same browser instance
2025-06-02 07:56:54 +08:00
new_page = await self.browser_context.new_page()
2025-05-19 08:00:14 +08:00
print(f"New page created successfully")
# Navigate to the URL
await new_page.goto(action.url, wait_until="domcontentloaded")
await new_page.wait_for_load_state("networkidle", timeout=10000)
print(f"Navigated to URL in new tab: {action.url}")
# Add to page list and make it current
self.pages.append(new_page)
self.current_page_index = len(self.pages) - 1
print(f"New tab added as index {self.current_page_index}")
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"open_tab({action.url})")
return self.build_action_result(
True,
f"Opened new tab with URL: {action.url}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
except Exception as e:
print("****"*10)
print(f"Error opening tab: {e}")
print(traceback.format_exc())
print("****"*10)
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
async def close_tab(self, action: CloseTabAction = Body(...)):
"""Close a tab by index"""
try:
if 0 <= action.page_id < len(self.pages):
page = self.pages[action.page_id]
url = page.url
await page.close()
self.pages.pop(action.page_id)
# Adjust current index if needed
if self.current_page_index >= len(self.pages):
self.current_page_index = max(0, len(self.pages) - 1)
elif self.current_page_index >= action.page_id:
self.current_page_index = max(0, self.current_page_index - 1)
# Get updated state after action
page = await self.get_current_page()
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"close_tab({action.page_id})")
return self.build_action_result(
True,
f"Closed tab {action.page_id} with URL: {url}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
else:
return self.build_action_result(
False,
f"Tab {action.page_id} not found",
None,
"",
"",
{},
error=f"Tab {action.page_id} not found"
)
except Exception as e:
return self.build_action_result(
2025-04-15 22:34:26 +08:00
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Content Actions
async def extract_content(self, goal: str = Body(...)):
"""Extract content from the current page based on the provided goal"""
try:
page = await self.get_current_page()
content = await page.content()
# In a full implementation, we would use an LLM to extract specific content
2025-04-15 22:34:26 +08:00
# based on the goal. For this example, we'll extract visible text.
extracted_text = await page.evaluate("""
Array.from(document.querySelectorAll('p, h1, h2, h3, h4, h5, h6, li, span, div'))
.filter(el => {
const style = window.getComputedStyle(el);
return style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0' &&
el.innerText &&
el.innerText.trim().length > 0;
})
.map(el => el.innerText.trim())
.join('\\n\\n');
""")
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Get updated state
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"extract_content({goal})")
return self.build_action_result(
True,
f"Content extracted based on goal: {goal}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=extracted_text
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def save_pdf(self):
"""Save the current page as a PDF"""
try:
page = await self.get_current_page()
2025-04-15 22:34:26 +08:00
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
random_id = random.randint(1000, 9999)
filename = f"page_{timestamp}_{random_id}.pdf"
filepath = os.path.join(self.screenshot_dir, filename)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
await page.pdf(path=filepath)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Get updated state
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("save_pdf")
return self.build_action_result(
True,
f"Saved page as PDF: {filepath}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Scroll Actions
2025-04-15 22:34:26 +08:00
2025-04-14 21:06:02 +08:00
async def scroll_down(self, action: ScrollAction = Body(...)):
"""Scroll down the page"""
try:
page = await self.get_current_page()
if action.amount is not None:
await page.evaluate(f"window.scrollBy(0, {action.amount});")
amount_str = f"{action.amount} pixels"
else:
await page.evaluate("window.scrollBy(0, window.innerHeight);")
amount_str = "one page"
2025-04-15 22:34:26 +08:00
await page.wait_for_timeout(500) # Wait for scroll to complete
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_down({amount_str})")
return self.build_action_result(
True,
f"Scrolled down by {amount_str}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def scroll_up(self, action: ScrollAction = Body(...)):
"""Scroll up the page"""
try:
page = await self.get_current_page()
if action.amount is not None:
await page.evaluate(f"window.scrollBy(0, -{action.amount});")
amount_str = f"{action.amount} pixels"
else:
await page.evaluate("window.scrollBy(0, -window.innerHeight);")
amount_str = "one page"
2025-04-15 22:34:26 +08:00
await page.wait_for_timeout(500) # Wait for scroll to complete
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_up({amount_str})")
return self.build_action_result(
True,
f"Scrolled up by {amount_str}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
async def scroll_to_text(self, text: str = Body(...)):
"""Scroll to text on the page"""
try:
page = await self.get_current_page()
locators = [
page.get_by_text(text, exact=False),
page.locator(f"text={text}"),
page.locator(f"//*[contains(text(), '{text}')]"),
]
2025-04-15 22:34:26 +08:00
found = False
2025-04-14 21:06:02 +08:00
for locator in locators:
try:
if await locator.count() > 0 and await locator.first.is_visible():
await locator.first.scroll_into_view_if_needed()
await asyncio.sleep(0.5) # Wait for scroll to complete
2025-04-15 22:34:26 +08:00
found = True
break
2025-04-14 21:06:02 +08:00
except Exception:
continue
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_to_text({text})")
message = f"Scrolled to text: {text}" if found else f"Text '{text}' not found or not visible on page"
return self.build_action_result(
found,
message,
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Dropdown Actions
async def get_dropdown_options(self, index: int = Body(...)):
"""Get all options from a dropdown"""
try:
page = await self.get_current_page()
selector_map = await self.get_selector_map()
if index not in selector_map:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
f"Element with index {index} not found",
None,
"",
"",
{},
error=f"Element with index {index} not found"
)
element = selector_map[index]
options = []
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Try to get the options - in a real implementation, we would use appropriate selectors
try:
if element.tag_name.lower() == 'select':
# For <select> elements, get options using JavaScript
options_js = f"""
Array.from(document.querySelectorAll('select')[{index-1}].options)
.map((option, index) => ({
index: index,
text: option.text,
value: option.value
}));
"""
options = await page.evaluate(options_js)
else:
# For other dropdown types, try to get options using a more generic approach
# Example for custom dropdowns - would need refinement in real implementation
await page.click(f"#{element.attributes.get('id')}") if element.attributes.get('id') else None
await page.wait_for_timeout(500)
options_js = """
Array.from(document.querySelectorAll('.dropdown-item, [role="option"], li'))
.filter(el => {
const style = window.getComputedStyle(el);
return style.display !== 'none' && style.visibility !== 'hidden';
})
.map((option, index) => ({
index: index,
text: option.innerText.trim(),
value: option.getAttribute('value') || option.getAttribute('data-value') || option.innerText.trim()
}));
"""
options = await page.evaluate(options_js)
# Close dropdown to restore state
await page.keyboard.press("Escape")
except Exception as e:
self.logger.error(f"Error getting dropdown options: {e}")
# Fallback to dummy options if real ones cannot be retrieved
options = [
{"index": 0, "text": "Option 1", "value": "option1"},
{"index": 1, "text": "Option 2", "value": "option2"},
{"index": 2, "text": "Option 3", "value": "option3"},
]
# Get updated state
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"get_dropdown_options({index})")
return self.build_action_result(
True,
f"Retrieved {len(options)} options from dropdown",
dom_state,
screenshot,
elements,
metadata,
error="",
content=json.dumps(options) # Include options in the content field
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
async def select_dropdown_option(self, index: int = Body(...), option_text: str = Body(...)):
2025-04-14 21:06:02 +08:00
"""Select an option from a dropdown by text"""
try:
page = await self.get_current_page()
selector_map = await self.get_selector_map()
if index not in selector_map:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
f"Element with index {index} not found",
None,
"",
"",
{},
error=f"Element with index {index} not found"
)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
element = selector_map[index]
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Try to select the option - implementation varies by dropdown type
if element.tag_name.lower() == 'select':
# For standard <select> elements
selector = f"select option:has-text('{option_text}')"
await page.select_option(
f"#{element.attributes.get('id')}" if element.attributes.get('id') else f"//select[{index}]",
label=option_text
)
else:
# For custom dropdowns
# First click to open the dropdown
if element.attributes.get('id'):
await page.click(f"#{element.attributes.get('id')}")
else:
await page.click(f"//{element.tag_name}[{index}]")
await page.wait_for_timeout(500)
# Then try to click the option
await page.click(f"text={option_text}")
await page.wait_for_timeout(500)
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"select_dropdown_option({index}, '{option_text}')")
return self.build_action_result(
True,
f"Selected option '{option_text}' from dropdown with index {index}",
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Drag and Drop
async def drag_drop(self, action: DragDropAction = Body(...)):
"""Perform drag and drop operation"""
try:
page = await self.get_current_page()
# Element-based drag and drop
if action.element_source and action.element_target:
# In a real implementation, we would get the elements and perform the drag
source_desc = action.element_source
target_desc = action.element_target
2025-04-15 22:34:26 +08:00
# We would locate the elements using selectors and perform the drag
# For this example, we'll use a simplified version
await page.evaluate("""
console.log("Simulating drag and drop between elements");
""")
2025-04-14 21:06:02 +08:00
message = f"Dragged element '{source_desc}' to '{target_desc}'"
# Coordinate-based drag and drop
elif all(coord is not None for coord in [
action.coord_source_x, action.coord_source_y,
action.coord_target_x, action.coord_target_y
]):
source_x = action.coord_source_x
source_y = action.coord_source_y
target_x = action.coord_target_x
target_y = action.coord_target_y
2025-04-15 22:34:26 +08:00
# Perform the drag
2025-04-14 21:06:02 +08:00
await page.mouse.move(source_x, source_y)
await page.mouse.down()
steps = max(1, action.steps or 10)
delay_ms = max(0, action.delay_ms or 5)
for i in range(1, steps + 1):
ratio = i / steps
intermediate_x = int(source_x + (target_x - source_x) * ratio)
intermediate_y = int(source_y + (target_y - source_y) * ratio)
await page.mouse.move(intermediate_x, intermediate_y)
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
await page.mouse.move(target_x, target_y)
await page.mouse.up()
message = f"Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})"
else:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
"Must provide either source/target selectors or coordinates",
None,
"",
"",
{},
error="Must provide either source/target selectors or coordinates"
)
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Get updated state after action
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"drag_drop({action.element_source}, {action.element_target})")
return self.build_action_result(
True,
message,
dom_state,
screenshot,
elements,
metadata,
error="",
content=None
)
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
return self.build_action_result(
False,
str(e),
None,
"",
"",
{},
error=str(e),
content=None
)
2025-04-14 21:06:02 +08:00
# Create singleton instance
automation_service = BrowserAutomation()
# Create API app
api_app = FastAPI()
@api_app.get("/api")
async def health_check():
return {"status": "ok", "message": "API server is running"}
# Include automation service router with /api prefix
api_app.include_router(automation_service.router, prefix="/api")
async def test_browser_api():
"""Test the browser automation API functionality"""
try:
# Initialize browser automation
2025-04-15 22:34:26 +08:00
print("\n=== Starting Browser Automation Test ===")
2025-04-14 21:06:02 +08:00
await automation_service.startup()
2025-04-15 22:34:26 +08:00
print("✅ Browser started successfully")
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
# Navigate to a test page with interactive elements
print("\n--- Testing Navigation ---")
result = await automation_service.navigate_to(GoToUrlAction(url="https://www.youtube.com"))
print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}")
if not result.success:
print(f"Error: {result.error}")
return
print(f"URL: {result.url}")
print(f"Title: {result.title}")
# Check DOM state and elements
print(f"\nFound {result.element_count} interactive elements")
if result.elements and result.elements.strip():
print("Elements:")
print(result.elements)
else:
print("No formatted elements found, but DOM was processed")
# Display interactive elements as JSON
if result.interactive_elements and len(result.interactive_elements) > 0:
print("\nInteractive elements summary:")
for el in result.interactive_elements:
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
# Screenshot info
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}")
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}")
# Test OCR extraction from screenshot
print("\n--- Testing OCR Text Extraction ---")
if result.ocr_text:
print("OCR text extracted from screenshot:")
print("=== OCR TEXT START ===")
print(result.ocr_text)
print("=== OCR TEXT END ===")
print(f"OCR text length: {len(result.ocr_text)} characters")
print(result.ocr_text)
else:
print("No OCR text extracted from screenshot")
2025-04-15 22:34:26 +08:00
await asyncio.sleep(2)
2025-04-14 21:06:02 +08:00
# Test search functionality
2025-04-15 22:34:26 +08:00
print("\n--- Testing Search ---")
result = await automation_service.search_google(SearchGoogleAction(query="browser automation"))
print(f"Search status: {'✅ Success' if result.success else '❌ Failed'}")
if not result.success:
print(f"Error: {result.error}")
else:
print(f"Found {result.element_count} elements after search")
print(f"Page title: {result.title}")
# Test OCR extraction from search results
if result.ocr_text:
print("\nOCR text from search results:")
print("=== OCR TEXT START ===")
print(result.ocr_text)
print("=== OCR TEXT END ===")
else:
print("\nNo OCR text extracted from search results")
2025-04-15 22:34:26 +08:00
await asyncio.sleep(2)
2025-04-14 21:06:02 +08:00
# Test scrolling
2025-04-15 22:34:26 +08:00
print("\n--- Testing Scrolling ---")
result = await automation_service.scroll_down(ScrollAction(amount=300))
print(f"Scroll status: {'✅ Success' if result.success else '❌ Failed'}")
if result.success:
print(f"Pixels above viewport: {result.pixels_above}")
print(f"Pixels below viewport: {result.pixels_below}")
await asyncio.sleep(2)
# Test clicking on an element
print("\n--- Testing Element Click ---")
if result.element_count > 0:
click_result = await automation_service.click_element(ClickElementAction(index=1))
print(f"Click status: {'✅ Success' if click_result.success else '❌ Failed'}")
print(f"Message: {click_result.message}")
print(f"New URL after click: {click_result.url}")
else:
print("Skipping click test - no elements found")
await asyncio.sleep(2)
2025-04-14 21:06:02 +08:00
# Test clicking on coordinates
print("\n--- Testing Click Coordinates ---")
coord_click_result = await automation_service.click_coordinates(ClickCoordinatesAction(x=100, y=100))
print(f"Coordinate click status: {'✅ Success' if coord_click_result.success else '❌ Failed'}")
print(f"Message: {coord_click_result.message}")
print(f"URL after coordinate click: {coord_click_result.url}")
await asyncio.sleep(2)
2025-04-15 22:34:26 +08:00
# Test extracting content
print("\n--- Testing Content Extraction ---")
content_result = await automation_service.extract_content("test goal")
print(f"Content extraction status: {'✅ Success' if content_result.success else '❌ Failed'}")
if content_result.content:
content_preview = content_result.content[:100] + "..." if len(content_result.content) > 100 else content_result.content
print(f"Content sample: {content_preview}")
print(f"Total content length: {len(content_result.content)} chars")
else:
print("No content was extracted")
# Test tab management
print("\n--- Testing Tab Management ---")
tab_result = await automation_service.open_tab(OpenTabAction(url="https://www.example.org"))
print(f"New tab status: {'✅ Success' if tab_result.success else '❌ Failed'}")
if tab_result.success:
print(f"New tab title: {tab_result.title}")
print(f"Interactive elements: {tab_result.element_count}")
2025-04-14 21:06:02 +08:00
2025-04-15 22:34:26 +08:00
print("\n✅ All tests completed successfully!")
2025-04-14 21:06:02 +08:00
except Exception as e:
2025-04-15 22:34:26 +08:00
print(f"\n❌ Test failed: {str(e)}")
traceback.print_exc()
2025-04-14 21:06:02 +08:00
finally:
# Ensure browser is closed
2025-04-15 22:34:26 +08:00
print("\n--- Cleaning up ---")
await automation_service.shutdown()
print("Browser closed")
2025-04-14 21:06:02 +08:00
2025-04-22 21:29:40 +08:00
async def test_browser_api_2():
"""Test the browser automation API functionality on the chess page"""
try:
# Initialize browser automation
print("\n=== Starting Browser Automation Test 2 (Chess Page) ===")
await automation_service.startup()
print("✅ Browser started successfully")
# Navigate to the chess test page
print("\n--- Testing Navigation to Chess Page ---")
test_url = "https://dat-lequoc.github.io/chess-for-suna/chess.html"
result = await automation_service.navigate_to(GoToUrlAction(url=test_url))
print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}")
if not result.success:
print(f"Error: {result.error}")
return
print(f"URL: {result.url}")
print(f"Title: {result.title}")
# Check DOM state and elements
print(f"\nFound {result.element_count} interactive elements")
if result.elements and result.elements.strip():
print("Elements:")
print(result.elements)
else:
print("No formatted elements found, but DOM was processed")
# Display interactive elements as JSON
if result.interactive_elements and len(result.interactive_elements) > 0:
print("\nInteractive elements summary:")
for el in result.interactive_elements:
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
# Screenshot info
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}")
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}")
await asyncio.sleep(2)
# Test clicking on an element (e.g., a chess square)
print("\n--- Testing Element Click (element 5) ---")
if result.element_count > 4: # Ensure element 5 exists
click_index = 5
click_result = await automation_service.click_element(ClickElementAction(index=click_index))
print(f"Click status for element {click_index}: {'✅ Success' if click_result.success else '❌ Failed'}")
print(f"Message: {click_result.message}")
print(f"URL after click: {click_result.url}")
# Retrieve and display elements again after click
print(f"\n--- Retrieving elements after clicking element {click_index} ---")
if click_result.elements and click_result.elements.strip():
print("Updated Elements:")
print(click_result.elements)
else:
print("No formatted elements found after click.")
if click_result.interactive_elements and len(click_result.interactive_elements) > 0:
print("\nUpdated interactive elements summary:")
for el in click_result.interactive_elements:
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
else:
print("No interactive elements found after click.")
# Test clicking element 1 after the first click
print("\n--- Testing Element Click (element 1 after clicking 5) ---")
if click_result.element_count > 0: # Check if there are still elements
click_index_2 = 1
click_result_2 = await automation_service.click_element(ClickElementAction(index=click_index_2))
print(f"Click status for element {click_index_2}: {'✅ Success' if click_result_2.success else '❌ Failed'}")
print(f"Message: {click_result_2.message}")
print(f"URL after click: {click_result_2.url}")
# Retrieve and display elements again after the second click
print(f"\n--- Retrieving elements after clicking element {click_index_2} ---")
if click_result_2.elements and click_result_2.elements.strip():
print("Elements after second click:")
print(click_result_2.elements)
else:
print("No formatted elements found after second click.")
if click_result_2.interactive_elements and len(click_result_2.interactive_elements) > 0:
print("\nInteractive elements summary after second click:")
for el in click_result_2.interactive_elements:
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
else:
print("No interactive elements found after second click.")
else:
print("Skipping second element click test - no elements found after first click.")
else:
print("Skipping element click test - fewer than 5 elements found.")
await asyncio.sleep(2)
print("\n✅ Chess Page Test Completed!")
await asyncio.sleep(100)
except Exception as e:
print(f"\n❌ Chess Page Test failed: {str(e)}")
traceback.print_exc()
finally:
# Ensure browser is closed
print("\n--- Cleaning up ---")
await automation_service.shutdown()
print("Browser closed")
2025-04-14 21:06:02 +08:00
if __name__ == '__main__':
import uvicorn
2025-04-15 22:34:26 +08:00
import sys
2025-04-22 21:29:40 +08:00
# Check command line arguments for test mode
test_mode_1 = "--test" in sys.argv
test_mode_2 = "--test2" in sys.argv
2025-04-15 22:34:26 +08:00
2025-04-22 21:29:40 +08:00
if test_mode_1:
print("Running in test mode 1")
2025-04-15 22:34:26 +08:00
asyncio.run(test_browser_api())
2025-04-22 21:29:40 +08:00
elif test_mode_2:
print("Running in test mode 2 (Chess Page)")
asyncio.run(test_browser_api_2())
2025-04-15 22:34:26 +08:00
else:
print("Starting API server")
2025-05-19 08:43:05 +08:00
uvicorn.run("browser_api:api_app", host="0.0.0.0", port=8003)