mirror of https://github.com/kortix-ai/suna.git
2160 lines
84 KiB
Python
2160 lines
84 KiB
Python
from fastapi import FastAPI, APIRouter, HTTPException, Body
|
|
from playwright.async_api import async_playwright, Browser, Page
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict, Any
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import base64
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import os
|
|
import random
|
|
from functools import cached_property
|
|
import traceback
|
|
import pytesseract
|
|
from PIL import Image
|
|
import io
|
|
from utils.logger import logger
|
|
from services.supabase import DBConnection
|
|
|
|
#######################################################
|
|
# Action model definitions
|
|
#######################################################
|
|
|
|
class Position(BaseModel):
|
|
x: int
|
|
y: int
|
|
|
|
class ClickElementAction(BaseModel):
|
|
index: int
|
|
|
|
class ClickCoordinatesAction(BaseModel):
|
|
x: int
|
|
y: int
|
|
|
|
class GoToUrlAction(BaseModel):
|
|
url: str
|
|
|
|
class InputTextAction(BaseModel):
|
|
index: int
|
|
text: str
|
|
|
|
class ScrollAction(BaseModel):
|
|
amount: Optional[int] = None
|
|
|
|
class SendKeysAction(BaseModel):
|
|
keys: str
|
|
|
|
class SearchGoogleAction(BaseModel):
|
|
query: str
|
|
|
|
class SwitchTabAction(BaseModel):
|
|
page_id: int
|
|
|
|
class OpenTabAction(BaseModel):
|
|
url: str
|
|
|
|
class CloseTabAction(BaseModel):
|
|
page_id: int
|
|
|
|
class NoParamsAction(BaseModel):
|
|
pass
|
|
|
|
class DragDropAction(BaseModel):
|
|
element_source: Optional[str] = None
|
|
element_target: Optional[str] = None
|
|
element_source_offset: Optional[Position] = None
|
|
element_target_offset: Optional[Position] = None
|
|
coord_source_x: Optional[int] = None
|
|
coord_source_y: Optional[int] = None
|
|
coord_target_x: Optional[int] = None
|
|
coord_target_y: Optional[int] = None
|
|
steps: Optional[int] = 10
|
|
delay_ms: Optional[int] = 5
|
|
|
|
class DoneAction(BaseModel):
|
|
success: bool = True
|
|
text: str = ""
|
|
|
|
#######################################################
|
|
# DOM Structure Models
|
|
#######################################################
|
|
|
|
@dataclass
|
|
class CoordinateSet:
|
|
x: int = 0
|
|
y: int = 0
|
|
width: int = 0
|
|
height: int = 0
|
|
|
|
@dataclass
|
|
class ViewportInfo:
|
|
width: int = 0
|
|
height: int = 0
|
|
scroll_x: int = 0
|
|
scroll_y: int = 0
|
|
|
|
@dataclass
|
|
class HashedDomElement:
|
|
tag_name: str
|
|
attributes: Dict[str, str]
|
|
is_visible: bool
|
|
page_coordinates: Optional[CoordinateSet] = None
|
|
|
|
@dataclass
|
|
class DOMBaseNode:
|
|
is_visible: bool
|
|
parent: Optional['DOMElementNode'] = None
|
|
|
|
@dataclass
|
|
class DOMTextNode(DOMBaseNode):
|
|
text: str = field(default="")
|
|
type: str = 'TEXT_NODE'
|
|
|
|
def has_parent_with_highlight_index(self) -> bool:
|
|
current = self.parent
|
|
while current is not None:
|
|
if current.highlight_index is not None:
|
|
return True
|
|
current = current.parent
|
|
return False
|
|
|
|
@dataclass
|
|
class DOMElementNode(DOMBaseNode):
|
|
tag_name: str = field(default="")
|
|
xpath: str = field(default="")
|
|
attributes: Dict[str, str] = field(default_factory=dict)
|
|
children: List['DOMBaseNode'] = field(default_factory=list)
|
|
|
|
is_interactive: bool = False
|
|
is_top_element: bool = False
|
|
is_in_viewport: bool = False
|
|
shadow_root: bool = False
|
|
highlight_index: Optional[int] = None
|
|
viewport_coordinates: Optional[CoordinateSet] = None
|
|
page_coordinates: Optional[CoordinateSet] = None
|
|
viewport_info: Optional[ViewportInfo] = None
|
|
|
|
def __repr__(self) -> str:
|
|
tag_str = f'<{self.tag_name}'
|
|
for key, value in self.attributes.items():
|
|
tag_str += f' {key}="{value}"'
|
|
tag_str += '>'
|
|
|
|
extras = []
|
|
if self.is_interactive:
|
|
extras.append('interactive')
|
|
if self.is_top_element:
|
|
extras.append('top')
|
|
if self.highlight_index is not None:
|
|
extras.append(f'highlight:{self.highlight_index}')
|
|
|
|
if extras:
|
|
tag_str += f' [{", ".join(extras)}]'
|
|
|
|
return tag_str
|
|
|
|
@cached_property
|
|
def hash(self) -> HashedDomElement:
|
|
return HashedDomElement(
|
|
tag_name=self.tag_name,
|
|
attributes=self.attributes,
|
|
is_visible=self.is_visible,
|
|
page_coordinates=self.page_coordinates
|
|
)
|
|
|
|
def get_all_text_till_next_clickable_element(self, max_depth: int = -1) -> str:
|
|
text_parts = []
|
|
|
|
def collect_text(node: DOMBaseNode, current_depth: int) -> None:
|
|
if max_depth != -1 and current_depth > max_depth:
|
|
return
|
|
|
|
if isinstance(node, DOMElementNode) and node != self and node.highlight_index is not None:
|
|
return
|
|
|
|
if isinstance(node, DOMTextNode):
|
|
text_parts.append(node.text)
|
|
elif isinstance(node, DOMElementNode):
|
|
for child in node.children:
|
|
collect_text(child, current_depth + 1)
|
|
|
|
collect_text(self, 0)
|
|
return '\n'.join(text_parts).strip()
|
|
|
|
def clickable_elements_to_string(self, include_attributes: list[str] | None = None) -> str:
|
|
"""Convert the processed DOM content to HTML."""
|
|
formatted_text = []
|
|
|
|
def process_node(node: DOMBaseNode, depth: int) -> None:
|
|
if isinstance(node, DOMElementNode):
|
|
# Add element with highlight_index
|
|
if node.highlight_index is not None:
|
|
attributes_str = ''
|
|
text = node.get_all_text_till_next_clickable_element()
|
|
|
|
# Process attributes for display
|
|
display_attributes = []
|
|
if include_attributes:
|
|
for key, value in node.attributes.items():
|
|
if key in include_attributes and value and value != node.tag_name:
|
|
if text and value in text:
|
|
continue # Skip if attribute value is already in the text
|
|
display_attributes.append(str(value))
|
|
|
|
attributes_str = ';'.join(display_attributes)
|
|
|
|
# Build the element string
|
|
line = f'[{node.highlight_index}]<{node.tag_name}'
|
|
|
|
# Add important attributes for identification
|
|
for attr_name in ['id', 'href', 'name', 'value', 'type']:
|
|
if attr_name in node.attributes and node.attributes[attr_name]:
|
|
line += f' {attr_name}="{node.attributes[attr_name]}"'
|
|
|
|
# Add the text content if available
|
|
if text:
|
|
line += f'> {text}'
|
|
elif attributes_str:
|
|
line += f'> {attributes_str}'
|
|
else:
|
|
# If no text and no attributes, use the tag name
|
|
line += f'> {node.tag_name.upper()}'
|
|
|
|
line += ' </>'
|
|
formatted_text.append(line)
|
|
|
|
# Process children regardless
|
|
for child in node.children:
|
|
process_node(child, depth + 1)
|
|
|
|
elif isinstance(node, DOMTextNode):
|
|
# Add text only if it doesn't have a highlighted parent
|
|
if not node.has_parent_with_highlight_index() and node.is_visible:
|
|
if node.text and node.text.strip():
|
|
formatted_text.append(node.text)
|
|
|
|
process_node(self, 0)
|
|
result = '\n'.join(formatted_text)
|
|
return result if result.strip() else "No interactive elements found"
|
|
|
|
@dataclass
|
|
class DOMState:
|
|
element_tree: DOMElementNode
|
|
selector_map: Dict[int, DOMElementNode]
|
|
url: str = ""
|
|
title: str = ""
|
|
pixels_above: int = 0
|
|
pixels_below: int = 0
|
|
|
|
#######################################################
|
|
# Browser Action Result Model
|
|
#######################################################
|
|
|
|
class BrowserActionResult(BaseModel):
|
|
success: bool = True
|
|
message: str = ""
|
|
error: str = ""
|
|
|
|
# Extended state information
|
|
url: Optional[str] = None
|
|
title: Optional[str] = None
|
|
elements: Optional[str] = None # Formatted string of clickable elements
|
|
screenshot_base64: Optional[str] = None # For backward compatibility
|
|
screenshot_url: Optional[str] = None
|
|
pixels_above: int = 0
|
|
pixels_below: int = 0
|
|
content: Optional[str] = None
|
|
ocr_text: Optional[str] = None
|
|
|
|
# Additional metadata
|
|
element_count: int = 0
|
|
interactive_elements: Optional[List[Dict[str, Any]]] = None
|
|
viewport_width: Optional[int] = None
|
|
viewport_height: Optional[int] = None
|
|
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
#######################################################
|
|
# Browser Automation Implementation
|
|
#######################################################
|
|
|
|
class BrowserAutomation:
|
|
def __init__(self):
|
|
self.router = APIRouter()
|
|
self.browser: Browser = None
|
|
self.pages: List[Page] = []
|
|
self.current_page_index: int = 0
|
|
self.logger = logging.getLogger("browser_automation")
|
|
self.include_attributes = ["id", "href", "src", "alt", "aria-label", "placeholder", "name", "role", "title", "value"]
|
|
self.screenshot_dir = os.path.join(os.getcwd(), "screenshots")
|
|
os.makedirs(self.screenshot_dir, exist_ok=True)
|
|
self.db = DBConnection() # Initialize DB connection
|
|
|
|
# Register routes
|
|
self.router.on_startup.append(self.startup)
|
|
self.router.on_shutdown.append(self.shutdown)
|
|
|
|
# Basic navigation
|
|
self.router.post("/automation/navigate_to")(self.navigate_to)
|
|
self.router.post("/automation/search_google")(self.search_google)
|
|
self.router.post("/automation/go_back")(self.go_back)
|
|
self.router.post("/automation/wait")(self.wait)
|
|
|
|
# Element interaction
|
|
self.router.post("/automation/click_element")(self.click_element)
|
|
self.router.post("/automation/click_coordinates")(self.click_coordinates)
|
|
self.router.post("/automation/input_text")(self.input_text)
|
|
self.router.post("/automation/send_keys")(self.send_keys)
|
|
|
|
# Tab management
|
|
self.router.post("/automation/switch_tab")(self.switch_tab)
|
|
self.router.post("/automation/open_tab")(self.open_tab)
|
|
self.router.post("/automation/close_tab")(self.close_tab)
|
|
|
|
# Content actions
|
|
self.router.post("/automation/extract_content")(self.extract_content)
|
|
self.router.post("/automation/save_pdf")(self.save_pdf)
|
|
|
|
# Scroll actions
|
|
self.router.post("/automation/scroll_down")(self.scroll_down)
|
|
self.router.post("/automation/scroll_up")(self.scroll_up)
|
|
self.router.post("/automation/scroll_to_text")(self.scroll_to_text)
|
|
|
|
# Dropdown actions
|
|
self.router.post("/automation/get_dropdown_options")(self.get_dropdown_options)
|
|
self.router.post("/automation/select_dropdown_option")(self.select_dropdown_option)
|
|
|
|
# Drag and drop
|
|
self.router.post("/automation/drag_drop")(self.drag_drop)
|
|
|
|
async def startup(self):
|
|
"""Initialize the browser instance on startup"""
|
|
try:
|
|
print("Starting browser initialization...")
|
|
playwright = await async_playwright().start()
|
|
print("Playwright started, launching browser...")
|
|
|
|
# Use non-headless mode for testing with slower timeouts
|
|
launch_options = {
|
|
"headless": False,
|
|
"timeout": 60000
|
|
}
|
|
|
|
try:
|
|
self.browser = await playwright.chromium.launch(**launch_options)
|
|
print("Browser launched successfully")
|
|
except Exception as browser_error:
|
|
print(f"Failed to launch browser: {browser_error}")
|
|
# Try with minimal options
|
|
print("Retrying with minimal options...")
|
|
launch_options = {"timeout": 90000}
|
|
self.browser = await playwright.chromium.launch(**launch_options)
|
|
print("Browser launched with minimal options")
|
|
|
|
try:
|
|
await self.get_current_page()
|
|
print("Found existing page, using it")
|
|
self.current_page_index = 0
|
|
except Exception as page_error:
|
|
print(f"Error finding existing page, creating new one. ( {page_error})")
|
|
page = await self.browser.new_page()
|
|
print("New page created successfully")
|
|
self.pages.append(page)
|
|
self.current_page_index = 0
|
|
# Navigate to about:blank to ensure page is ready
|
|
# await page.goto("google.com", timeout=30000)
|
|
print("Navigated to google.com")
|
|
|
|
print("Browser initialization completed successfully")
|
|
except Exception as e:
|
|
print(f"Browser startup error: {str(e)}")
|
|
traceback.print_exc()
|
|
raise RuntimeError(f"Browser initialization failed: {str(e)}")
|
|
|
|
async def shutdown(self):
|
|
"""Clean up browser instance on shutdown"""
|
|
if self.browser:
|
|
await self.browser.close()
|
|
|
|
async def get_current_page(self) -> Page:
|
|
"""Get the current active page"""
|
|
if not self.pages:
|
|
raise HTTPException(status_code=500, detail="No browser pages available")
|
|
return self.pages[self.current_page_index]
|
|
|
|
async def get_selector_map(self) -> Dict[int, DOMElementNode]:
|
|
"""Get a map of selectable elements on the page"""
|
|
page = await self.get_current_page()
|
|
|
|
# Create a selector map for interactive elements
|
|
selector_map = {}
|
|
|
|
try:
|
|
# More comprehensive JavaScript to find interactive elements
|
|
elements_js = """
|
|
(() => {
|
|
// Helper function to get all attributes as an object
|
|
function getAttributes(el) {
|
|
const attributes = {};
|
|
for (const attr of el.attributes) {
|
|
attributes[attr.name] = attr.value;
|
|
}
|
|
return attributes;
|
|
}
|
|
|
|
// Find all potentially interactive elements
|
|
const interactiveElements = Array.from(document.querySelectorAll(
|
|
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])'
|
|
));
|
|
|
|
// Filter for visible elements
|
|
const visibleElements = interactiveElements.filter(el => {
|
|
const style = window.getComputedStyle(el);
|
|
const rect = el.getBoundingClientRect();
|
|
return style.display !== 'none' &&
|
|
style.visibility !== 'hidden' &&
|
|
style.opacity !== '0' &&
|
|
rect.width > 0 &&
|
|
rect.height > 0;
|
|
});
|
|
|
|
// Map to our expected structure
|
|
return visibleElements.map((el, index) => {
|
|
const rect = el.getBoundingClientRect();
|
|
const isInViewport = rect.top >= 0 &&
|
|
rect.left >= 0 &&
|
|
rect.bottom <= window.innerHeight &&
|
|
rect.right <= window.innerWidth;
|
|
|
|
return {
|
|
index: index + 1,
|
|
tagName: el.tagName.toLowerCase(),
|
|
text: el.innerText || el.value || '',
|
|
attributes: getAttributes(el),
|
|
isVisible: true,
|
|
isInteractive: true,
|
|
pageCoordinates: {
|
|
x: rect.left + window.scrollX,
|
|
y: rect.top + window.scrollY,
|
|
width: rect.width,
|
|
height: rect.height
|
|
},
|
|
viewportCoordinates: {
|
|
x: rect.left,
|
|
y: rect.top,
|
|
width: rect.width,
|
|
height: rect.height
|
|
},
|
|
isInViewport: isInViewport
|
|
};
|
|
});
|
|
})();
|
|
"""
|
|
|
|
elements = await page.evaluate(elements_js)
|
|
print(f"Found {len(elements)} interactive elements in selector map")
|
|
|
|
# Create a root element for the tree
|
|
root = DOMElementNode(
|
|
is_visible=True,
|
|
tag_name="body",
|
|
is_interactive=False,
|
|
is_top_element=True
|
|
)
|
|
|
|
# Create element nodes for each element
|
|
for idx, el in enumerate(elements):
|
|
# Create coordinate sets
|
|
page_coordinates = None
|
|
viewport_coordinates = None
|
|
|
|
if 'pageCoordinates' in el:
|
|
coords = el['pageCoordinates']
|
|
page_coordinates = CoordinateSet(
|
|
x=coords.get('x', 0),
|
|
y=coords.get('y', 0),
|
|
width=coords.get('width', 0),
|
|
height=coords.get('height', 0)
|
|
)
|
|
|
|
if 'viewportCoordinates' in el:
|
|
coords = el['viewportCoordinates']
|
|
viewport_coordinates = CoordinateSet(
|
|
x=coords.get('x', 0),
|
|
y=coords.get('y', 0),
|
|
width=coords.get('width', 0),
|
|
height=coords.get('height', 0)
|
|
)
|
|
|
|
# Create the element node
|
|
element_node = DOMElementNode(
|
|
is_visible=el.get('isVisible', True),
|
|
tag_name=el.get('tagName', 'div'),
|
|
attributes=el.get('attributes', {}),
|
|
is_interactive=el.get('isInteractive', True),
|
|
is_in_viewport=el.get('isInViewport', False),
|
|
highlight_index=el.get('index', idx + 1),
|
|
page_coordinates=page_coordinates,
|
|
viewport_coordinates=viewport_coordinates
|
|
)
|
|
|
|
# Add a text node if there's text content
|
|
if el.get('text'):
|
|
text_node = DOMTextNode(is_visible=True, text=el.get('text', ''))
|
|
text_node.parent = element_node
|
|
element_node.children.append(text_node)
|
|
|
|
selector_map[el.get('index', idx + 1)] = element_node
|
|
root.children.append(element_node)
|
|
element_node.parent = root
|
|
|
|
except Exception as e:
|
|
print(f"Error getting selector map: {e}")
|
|
traceback.print_exc()
|
|
# Create a dummy element to avoid breaking tests
|
|
dummy = DOMElementNode(
|
|
is_visible=True,
|
|
tag_name="a",
|
|
attributes={'href': '#'},
|
|
is_interactive=True,
|
|
highlight_index=1
|
|
)
|
|
dummy_text = DOMTextNode(is_visible=True, text="Dummy Element")
|
|
dummy_text.parent = dummy
|
|
dummy.children.append(dummy_text)
|
|
selector_map[1] = dummy
|
|
|
|
return selector_map
|
|
|
|
async def get_current_dom_state(self) -> DOMState:
|
|
"""Get the current DOM state including element tree and selector map"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
selector_map = await self.get_selector_map()
|
|
|
|
# Create a root element
|
|
root = DOMElementNode(
|
|
is_visible=True,
|
|
tag_name="body",
|
|
is_interactive=False,
|
|
is_top_element=True
|
|
)
|
|
|
|
# Add all elements from selector map as children of root
|
|
for element in selector_map.values():
|
|
if element.parent is None:
|
|
element.parent = root
|
|
root.children.append(element)
|
|
|
|
# Get basic page info
|
|
url = page.url
|
|
try:
|
|
title = await page.title()
|
|
except:
|
|
title = "Unknown Title"
|
|
|
|
# Get more accurate scroll information - fix JavaScript syntax
|
|
try:
|
|
scroll_info = await page.evaluate("""
|
|
() => {
|
|
const body = document.body;
|
|
const html = document.documentElement;
|
|
const totalHeight = Math.max(
|
|
body.scrollHeight, body.offsetHeight,
|
|
html.clientHeight, html.scrollHeight, html.offsetHeight
|
|
);
|
|
const scrollY = window.scrollY || window.pageYOffset;
|
|
const windowHeight = window.innerHeight;
|
|
|
|
return {
|
|
pixelsAbove: scrollY,
|
|
pixelsBelow: Math.max(0, totalHeight - scrollY - windowHeight),
|
|
totalHeight: totalHeight,
|
|
viewportHeight: windowHeight
|
|
};
|
|
}
|
|
""")
|
|
pixels_above = scroll_info.get('pixelsAbove', 0)
|
|
pixels_below = scroll_info.get('pixelsBelow', 0)
|
|
except Exception as e:
|
|
print(f"Error getting scroll info: {e}")
|
|
pixels_above = 0
|
|
pixels_below = 0
|
|
|
|
return DOMState(
|
|
element_tree=root,
|
|
selector_map=selector_map,
|
|
url=url,
|
|
title=title,
|
|
pixels_above=pixels_above,
|
|
pixels_below=pixels_below
|
|
)
|
|
except Exception as e:
|
|
print(f"Error getting DOM state: {e}")
|
|
traceback.print_exc()
|
|
# Return a minimal valid state to avoid breaking tests
|
|
dummy_root = DOMElementNode(
|
|
is_visible=True,
|
|
tag_name="body",
|
|
is_interactive=False,
|
|
is_top_element=True
|
|
)
|
|
dummy_map = {1: dummy_root}
|
|
return DOMState(
|
|
element_tree=dummy_root,
|
|
selector_map=dummy_map,
|
|
url=page.url if 'page' in locals() else "about:blank",
|
|
title="Error page",
|
|
pixels_above=0,
|
|
pixels_below=0
|
|
)
|
|
|
|
async def take_screenshot(self) -> str:
|
|
"""Take a screenshot and return as base64 encoded string or S3 URL"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
screenshot_bytes = await page.screenshot(type='jpeg', quality=60, full_page=False)
|
|
|
|
client = await self.db.client
|
|
|
|
if client:
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
random_id = random.randint(1000, 9999)
|
|
filename = f"screenshot_{timestamp}_{random_id}.jpg"
|
|
|
|
logger.info(f"Attempting to upload screenshot: {filename}")
|
|
result = await self.upload_to_storage(client, screenshot_bytes, filename)
|
|
|
|
if isinstance(result, dict) and result.get("is_s3") and result.get("url"):
|
|
if await self.verify_file_exists(client, filename):
|
|
logger.info(f"Screenshot upload verified: {filename}")
|
|
else:
|
|
logger.error(f"Screenshot upload failed verification: {filename}")
|
|
return base64.b64encode(screenshot_bytes).decode('utf-8')
|
|
|
|
return result
|
|
else:
|
|
logger.warning("No Supabase client available, falling back to base64")
|
|
return base64.b64encode(screenshot_bytes).decode('utf-8')
|
|
except Exception as e:
|
|
logger.error(f"Error taking screenshot: {str(e)}")
|
|
traceback.print_exc()
|
|
return ""
|
|
|
|
async def upload_to_storage(self, client, file_bytes: bytes, filename: str) -> str:
|
|
"""Upload file to Supabase Storage and return the URL"""
|
|
try:
|
|
bucket_name = 'screenshots'
|
|
|
|
buckets = client.storage.list_buckets()
|
|
if not any(bucket.name == bucket_name for bucket in buckets):
|
|
logger.info(f"Creating bucket: {bucket_name}")
|
|
try:
|
|
client.storage.create_bucket(bucket_name)
|
|
logger.info("Bucket created successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to create bucket: {str(e)}")
|
|
raise
|
|
|
|
logger.info(f"Uploading file: {filename}")
|
|
try:
|
|
result = client.storage.from_(bucket_name).upload(
|
|
path=filename,
|
|
file=file_bytes,
|
|
file_options={"content-type": "image/jpeg"}
|
|
)
|
|
logger.info("File upload successful")
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload file: {str(e)}")
|
|
raise
|
|
|
|
file_url = client.storage.from_(bucket_name).get_public_url(filename)
|
|
logger.info(f"Generated URL: {file_url}")
|
|
|
|
return {"url": file_url, "is_s3": True}
|
|
except Exception as e:
|
|
logger.error(f"Error in upload_to_storage: {str(e)}")
|
|
traceback.print_exc()
|
|
return base64.b64encode(file_bytes).decode('utf-8')
|
|
|
|
async def verify_file_exists(self, client, filename: str) -> bool:
|
|
"""Verify that a file exists in the storage bucket"""
|
|
logger.info(f"=== Verifying file exists: {filename} ===")
|
|
try:
|
|
bucket_name = 'screenshots'
|
|
files = client.storage.from_(bucket_name).list()
|
|
exists = any(f['name'] == filename for f in files)
|
|
logger.info(f"File verification result: {'exists' if exists else 'not found'}")
|
|
return exists
|
|
except Exception as e:
|
|
logger.error(f"Error verifying file: {str(e)}")
|
|
return False
|
|
|
|
async def save_screenshot_to_file(self) -> str:
|
|
"""Take a screenshot and save to file, returning the path"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
random_id = random.randint(1000, 9999)
|
|
filename = f"screenshot_{timestamp}_{random_id}.jpg"
|
|
filepath = os.path.join(self.screenshot_dir, filename)
|
|
|
|
await page.screenshot(path=filepath, type='jpeg', quality=60, full_page=False)
|
|
return filepath
|
|
except Exception as e:
|
|
print(f"Error saving screenshot: {e}")
|
|
return ""
|
|
|
|
async def extract_ocr_text_from_screenshot(self, screenshot_base64: str) -> str:
|
|
"""Extract text from screenshot using OCR"""
|
|
if not screenshot_base64:
|
|
return ""
|
|
|
|
try:
|
|
# Decode base64 to image
|
|
image_bytes = base64.b64decode(screenshot_base64)
|
|
image = Image.open(io.BytesIO(image_bytes))
|
|
|
|
# Extract text using pytesseract
|
|
ocr_text = pytesseract.image_to_string(image)
|
|
|
|
# Clean up the text
|
|
ocr_text = ocr_text.strip()
|
|
|
|
return ocr_text
|
|
except Exception as e:
|
|
print(f"Error performing OCR: {e}")
|
|
traceback.print_exc()
|
|
return ""
|
|
|
|
async def get_updated_browser_state(self, action_name: str) -> tuple:
|
|
"""Helper method to get updated browser state after any action
|
|
Returns a tuple of (dom_state, screenshot, elements, metadata)
|
|
"""
|
|
logger.info(f"=== Starting get_updated_browser_state for action: {action_name} ===")
|
|
try:
|
|
# Wait a moment for any potential async processes to settle
|
|
logger.info("Waiting for async processes to settle")
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Get updated state
|
|
logger.info("Getting current DOM state")
|
|
dom_state = await self.get_current_dom_state()
|
|
logger.info(f"DOM state retrieved - URL: {dom_state.url}, Title: {dom_state.title}")
|
|
|
|
logger.info("Taking screenshot")
|
|
screenshot = await self.take_screenshot()
|
|
logger.info(f"Screenshot result type: {'dict' if isinstance(screenshot, dict) else 'base64 string'}")
|
|
if isinstance(screenshot, dict) and screenshot.get("url"):
|
|
logger.info(f"Screenshot URL: {screenshot['url']}")
|
|
|
|
# Format elements for output
|
|
logger.info("Formatting clickable elements")
|
|
elements = dom_state.element_tree.clickable_elements_to_string(
|
|
include_attributes=self.include_attributes
|
|
)
|
|
logger.info(f"Found {len(dom_state.selector_map)} clickable elements")
|
|
|
|
# Collect additional metadata
|
|
logger.info("Collecting metadata")
|
|
page = await self.get_current_page()
|
|
metadata = {}
|
|
|
|
# Get element count
|
|
metadata['element_count'] = len(dom_state.selector_map)
|
|
|
|
# Create simplified interactive elements list
|
|
interactive_elements = []
|
|
for idx, element in dom_state.selector_map.items():
|
|
element_info = {
|
|
'index': idx,
|
|
'tag_name': element.tag_name,
|
|
'text': element.get_all_text_till_next_clickable_element(),
|
|
'is_in_viewport': element.is_in_viewport
|
|
}
|
|
|
|
# Add key attributes
|
|
for attr_name in ['id', 'href', 'src', 'alt', 'placeholder', 'name', 'role', 'title', 'type']:
|
|
if attr_name in element.attributes:
|
|
element_info[attr_name] = element.attributes[attr_name]
|
|
|
|
interactive_elements.append(element_info)
|
|
|
|
metadata['interactive_elements'] = interactive_elements
|
|
|
|
# Get viewport dimensions
|
|
try:
|
|
logger.info("Getting viewport dimensions")
|
|
viewport = await page.evaluate("""
|
|
() => {
|
|
return {
|
|
width: window.innerWidth,
|
|
height: window.innerHeight
|
|
};
|
|
}
|
|
""")
|
|
metadata['viewport_width'] = viewport.get('width', 0)
|
|
metadata['viewport_height'] = viewport.get('height', 0)
|
|
logger.info(f"Viewport dimensions: {metadata['viewport_width']}x{metadata['viewport_height']}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting viewport dimensions: {e}")
|
|
metadata['viewport_width'] = 0
|
|
metadata['viewport_height'] = 0
|
|
|
|
# Extract OCR text from screenshot if available
|
|
ocr_text = ""
|
|
if screenshot:
|
|
logger.info("Extracting OCR text from screenshot")
|
|
ocr_text = await self.extract_ocr_text_from_screenshot(screenshot)
|
|
metadata['ocr_text'] = ocr_text
|
|
logger.info(f"OCR text length: {len(ocr_text)} characters")
|
|
|
|
logger.info(f"=== Completed get_updated_browser_state for {action_name} ===")
|
|
return dom_state, screenshot, elements, metadata
|
|
except Exception as e:
|
|
logger.error(f"Error in get_updated_browser_state for {action_name}: {e}")
|
|
traceback.print_exc()
|
|
# Return empty values in case of error
|
|
return None, "", "", {}
|
|
|
|
def build_action_result(self, success: bool, message: str, dom_state, screenshot: str,
|
|
elements: str, metadata: dict, error: str = "", content: str = None,
|
|
fallback_url: str = None) -> BrowserActionResult:
|
|
"""Helper method to build a consistent BrowserActionResult"""
|
|
if elements is None:
|
|
elements = ""
|
|
|
|
screenshot_base64 = None
|
|
screenshot_url = None
|
|
|
|
if isinstance(screenshot, dict) and screenshot.get("is_s3"):
|
|
screenshot_url = screenshot.get("url")
|
|
else:
|
|
screenshot_base64 = screenshot
|
|
|
|
return BrowserActionResult(
|
|
success=success,
|
|
message=message,
|
|
error=error,
|
|
url=dom_state.url if dom_state else fallback_url or "",
|
|
title=dom_state.title if dom_state else "",
|
|
elements=elements,
|
|
screenshot_base64=screenshot_base64,
|
|
screenshot_url=screenshot_url,
|
|
pixels_above=dom_state.pixels_above if dom_state else 0,
|
|
pixels_below=dom_state.pixels_below if dom_state else 0,
|
|
content=content,
|
|
ocr_text=metadata.get('ocr_text', ""),
|
|
element_count=metadata.get('element_count', 0),
|
|
interactive_elements=metadata.get('interactive_elements', []),
|
|
viewport_width=metadata.get('viewport_width', 0),
|
|
viewport_height=metadata.get('viewport_height', 0)
|
|
)
|
|
|
|
# Basic Navigation Actions
|
|
|
|
async def navigate_to(self, action: GoToUrlAction = Body(...)):
|
|
"""Navigate to a specified URL"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
await page.goto(action.url, wait_until="domcontentloaded")
|
|
await page.wait_for_load_state("networkidle", timeout=10000)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"navigate_to({action.url})")
|
|
|
|
result = self.build_action_result(
|
|
True,
|
|
f"Navigated to {action.url}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
|
|
print(f"Navigation result: success={result.success}, url={result.url}")
|
|
return result
|
|
except Exception as e:
|
|
print(f"Navigation error: {str(e)}")
|
|
traceback.print_exc()
|
|
# Try to get some state info even after error
|
|
try:
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("navigate_error_recovery")
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
except:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def search_google(self, action: SearchGoogleAction = Body(...)):
|
|
"""Search Google with the provided query"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
search_url = f"https://www.google.com/search?q={action.query}"
|
|
await page.goto(search_url)
|
|
await page.wait_for_load_state()
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"search_google({action.query})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Searched for '{action.query}' in Google",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
print(f"Search error: {str(e)}")
|
|
traceback.print_exc()
|
|
# Try to get some state info even after error
|
|
try:
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("search_error_recovery")
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
except:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def go_back(self, _: NoParamsAction = Body(...)):
|
|
"""Navigate back in browser history"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
await page.go_back()
|
|
await page.wait_for_load_state()
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("go_back")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
"Navigated back",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def wait(self, seconds: int = Body(3)):
|
|
"""Wait for the specified number of seconds"""
|
|
try:
|
|
await asyncio.sleep(seconds)
|
|
|
|
# Get updated state after waiting
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"wait({seconds} seconds)")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Waited for {seconds} seconds",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Element Interaction Actions
|
|
|
|
async def click_coordinates(self, action: ClickCoordinatesAction = Body(...)):
|
|
"""Click at specific x,y coordinates on the page"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
|
|
# Perform the click at the specified coordinates
|
|
await page.mouse.click(action.x, action.y)
|
|
|
|
# Give time for any navigation or DOM updates to occur
|
|
await page.wait_for_load_state("networkidle", timeout=5000)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_coordinates({action.x}, {action.y})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Clicked at coordinates ({action.x}, {action.y})",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
print(f"Error in click_coordinates: {e}")
|
|
traceback.print_exc()
|
|
|
|
# Try to get state even after error
|
|
try:
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_coordinates_error_recovery")
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
except:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def click_element(self, action: ClickElementAction = Body(...)):
|
|
"""Click on an element by index"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
|
|
# Get the current state and selector map *before* the click
|
|
initial_dom_state = await self.get_current_dom_state()
|
|
selector_map = initial_dom_state.selector_map
|
|
|
|
if action.index not in selector_map:
|
|
# Get updated state even if element not found initially
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element_error (index {action.index} not found)")
|
|
return self.build_action_result(
|
|
False,
|
|
f"Element with index {action.index} not found",
|
|
dom_state, # Use the latest state
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=f"Element with index {action.index} not found"
|
|
)
|
|
|
|
element_to_click = selector_map[action.index]
|
|
print(f"Attempting to click element: {element_to_click}")
|
|
|
|
# Construct a more reliable selector using JavaScript evaluation
|
|
# Find the element based on its properties captured in selector_map
|
|
js_selector_script = """
|
|
(targetElementInfo) => {
|
|
const interactiveElements = Array.from(document.querySelectorAll(
|
|
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])'
|
|
));
|
|
|
|
const visibleElements = interactiveElements.filter(el => {
|
|
const style = window.getComputedStyle(el);
|
|
const rect = el.getBoundingClientRect();
|
|
return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && rect.width > 0 && rect.height > 0;
|
|
});
|
|
|
|
if (targetElementInfo.index > 0 && targetElementInfo.index <= visibleElements.length) {
|
|
// Return the element at the specified index (1-based)
|
|
return visibleElements[targetElementInfo.index - 1];
|
|
}
|
|
return null; // Element not found at the expected index
|
|
}
|
|
"""
|
|
|
|
element_info = {'index': action.index} # Pass the target index to the script
|
|
|
|
target_element_handle = await page.evaluate_handle(js_selector_script, element_info)
|
|
|
|
click_success = False
|
|
error_message = ""
|
|
|
|
if await target_element_handle.evaluate("node => node !== null"):
|
|
try:
|
|
# Use Playwright's recommended way: click the handle
|
|
# Add timeout and wait for element to be stable
|
|
await target_element_handle.click(timeout=5000)
|
|
click_success = True
|
|
print(f"Successfully clicked element handle for index {action.index}")
|
|
except Exception as click_error:
|
|
error_message = f"Error clicking element handle: {click_error}"
|
|
print(error_message)
|
|
# Optional: Add fallback methods here if needed
|
|
# e.g., target_element_handle.dispatch_event('click')
|
|
else:
|
|
error_message = f"Could not locate the target element handle for index {action.index} using JS script."
|
|
print(error_message)
|
|
|
|
|
|
# Wait for potential page changes/network activity
|
|
try:
|
|
await page.wait_for_load_state("networkidle", timeout=5000)
|
|
except Exception as wait_error:
|
|
print(f"Timeout or error waiting for network idle after click: {wait_error}")
|
|
await asyncio.sleep(1) # Fallback wait
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element({action.index})")
|
|
|
|
return self.build_action_result(
|
|
click_success,
|
|
f"Clicked element with index {action.index}" if click_success else f"Attempted to click element {action.index} but failed. Error: {error_message}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=error_message if not click_success else "",
|
|
content=None
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Error in click_element: {e}")
|
|
traceback.print_exc()
|
|
# Try to get state even after error
|
|
try:
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_element_error_recovery")
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
except:
|
|
# Fallback if getting state also fails
|
|
current_url = "unknown"
|
|
try:
|
|
current_url = page.url # Try to get at least the URL
|
|
except:
|
|
pass
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None, # No DOM state available
|
|
"", # No screenshot
|
|
"", # No elements string
|
|
{}, # Empty metadata
|
|
error=str(e),
|
|
content=None,
|
|
fallback_url=current_url
|
|
)
|
|
|
|
async def input_text(self, action: InputTextAction = Body(...)):
|
|
"""Input text into an element"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
selector_map = await self.get_selector_map()
|
|
|
|
if action.index not in selector_map:
|
|
return self.build_action_result(
|
|
False,
|
|
f"Element with index {action.index} not found",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=f"Element with index {action.index} not found"
|
|
)
|
|
|
|
# In a real implementation, we would use the selector map to get the element's
|
|
# properties and use them to find and type into the element
|
|
element = selector_map[action.index]
|
|
|
|
# Use CSS selector or XPath to locate and type into the element
|
|
await page.wait_for_timeout(500) # Small delay before typing
|
|
|
|
# Demo implementation - would use proper selectors in production
|
|
if element.attributes.get("id"):
|
|
await page.fill(f"#{element.attributes['id']}", action.text)
|
|
elif element.attributes.get("class"):
|
|
class_selector = f".{element.attributes['class'].replace(' ', '.')}"
|
|
await page.fill(class_selector, action.text)
|
|
else:
|
|
# Fallback to xpath
|
|
await page.fill(f"//{element.tag_name}[{action.index}]", action.text)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"input_text({action.index}, '{action.text}')")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Input '{action.text}' into element with index {action.index}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def send_keys(self, action: SendKeysAction = Body(...)):
|
|
"""Send keyboard keys"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
await page.keyboard.press(action.keys)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"send_keys({action.keys})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Sent keys: {action.keys}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Tab Management Actions
|
|
|
|
async def switch_tab(self, action: SwitchTabAction = Body(...)):
|
|
"""Switch to a different tab by index"""
|
|
try:
|
|
if 0 <= action.page_id < len(self.pages):
|
|
self.current_page_index = action.page_id
|
|
page = await self.get_current_page()
|
|
await page.wait_for_load_state()
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"switch_tab({action.page_id})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Switched to tab {action.page_id}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
else:
|
|
return self.build_action_result(
|
|
False,
|
|
f"Tab {action.page_id} not found",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=f"Tab {action.page_id} not found"
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def open_tab(self, action: OpenTabAction = Body(...)):
|
|
"""Open a new tab with the specified URL"""
|
|
try:
|
|
print(f"Attempting to open new tab with URL: {action.url}")
|
|
# Create new page in same browser instance
|
|
new_page = await self.browser.new_page()
|
|
print(f"New page created successfully")
|
|
|
|
# Navigate to the URL
|
|
await new_page.goto(action.url, wait_until="domcontentloaded")
|
|
await new_page.wait_for_load_state("networkidle", timeout=10000)
|
|
print(f"Navigated to URL in new tab: {action.url}")
|
|
|
|
# Add to page list and make it current
|
|
self.pages.append(new_page)
|
|
self.current_page_index = len(self.pages) - 1
|
|
print(f"New tab added as index {self.current_page_index}")
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"open_tab({action.url})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Opened new tab with URL: {action.url}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
print("****"*10)
|
|
print(f"Error opening tab: {e}")
|
|
print(traceback.format_exc())
|
|
print("****"*10)
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def close_tab(self, action: CloseTabAction = Body(...)):
|
|
"""Close a tab by index"""
|
|
try:
|
|
if 0 <= action.page_id < len(self.pages):
|
|
page = self.pages[action.page_id]
|
|
url = page.url
|
|
await page.close()
|
|
self.pages.pop(action.page_id)
|
|
|
|
# Adjust current index if needed
|
|
if self.current_page_index >= len(self.pages):
|
|
self.current_page_index = max(0, len(self.pages) - 1)
|
|
elif self.current_page_index >= action.page_id:
|
|
self.current_page_index = max(0, self.current_page_index - 1)
|
|
|
|
# Get updated state after action
|
|
page = await self.get_current_page()
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"close_tab({action.page_id})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Closed tab {action.page_id} with URL: {url}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
else:
|
|
return self.build_action_result(
|
|
False,
|
|
f"Tab {action.page_id} not found",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=f"Tab {action.page_id} not found"
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Content Actions
|
|
|
|
async def extract_content(self, goal: str = Body(...)):
|
|
"""Extract content from the current page based on the provided goal"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
content = await page.content()
|
|
|
|
# In a full implementation, we would use an LLM to extract specific content
|
|
# based on the goal. For this example, we'll extract visible text.
|
|
extracted_text = await page.evaluate("""
|
|
Array.from(document.querySelectorAll('p, h1, h2, h3, h4, h5, h6, li, span, div'))
|
|
.filter(el => {
|
|
const style = window.getComputedStyle(el);
|
|
return style.display !== 'none' &&
|
|
style.visibility !== 'hidden' &&
|
|
style.opacity !== '0' &&
|
|
el.innerText &&
|
|
el.innerText.trim().length > 0;
|
|
})
|
|
.map(el => el.innerText.trim())
|
|
.join('\\n\\n');
|
|
""")
|
|
|
|
# Get updated state
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"extract_content({goal})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Content extracted based on goal: {goal}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=extracted_text
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def save_pdf(self):
|
|
"""Save the current page as a PDF"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
random_id = random.randint(1000, 9999)
|
|
filename = f"page_{timestamp}_{random_id}.pdf"
|
|
filepath = os.path.join(self.screenshot_dir, filename)
|
|
|
|
await page.pdf(path=filepath)
|
|
|
|
# Get updated state
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("save_pdf")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Saved page as PDF: {filepath}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Scroll Actions
|
|
|
|
async def scroll_down(self, action: ScrollAction = Body(...)):
|
|
"""Scroll down the page"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
if action.amount is not None:
|
|
await page.evaluate(f"window.scrollBy(0, {action.amount});")
|
|
amount_str = f"{action.amount} pixels"
|
|
else:
|
|
await page.evaluate("window.scrollBy(0, window.innerHeight);")
|
|
amount_str = "one page"
|
|
|
|
await page.wait_for_timeout(500) # Wait for scroll to complete
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_down({amount_str})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Scrolled down by {amount_str}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def scroll_up(self, action: ScrollAction = Body(...)):
|
|
"""Scroll up the page"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
if action.amount is not None:
|
|
await page.evaluate(f"window.scrollBy(0, -{action.amount});")
|
|
amount_str = f"{action.amount} pixels"
|
|
else:
|
|
await page.evaluate("window.scrollBy(0, -window.innerHeight);")
|
|
amount_str = "one page"
|
|
|
|
await page.wait_for_timeout(500) # Wait for scroll to complete
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_up({amount_str})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Scrolled up by {amount_str}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def scroll_to_text(self, text: str = Body(...)):
|
|
"""Scroll to text on the page"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
locators = [
|
|
page.get_by_text(text, exact=False),
|
|
page.locator(f"text={text}"),
|
|
page.locator(f"//*[contains(text(), '{text}')]"),
|
|
]
|
|
|
|
found = False
|
|
for locator in locators:
|
|
try:
|
|
if await locator.count() > 0 and await locator.first.is_visible():
|
|
await locator.first.scroll_into_view_if_needed()
|
|
await asyncio.sleep(0.5) # Wait for scroll to complete
|
|
found = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_to_text({text})")
|
|
|
|
message = f"Scrolled to text: {text}" if found else f"Text '{text}' not found or not visible on page"
|
|
|
|
return self.build_action_result(
|
|
found,
|
|
message,
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Dropdown Actions
|
|
|
|
async def get_dropdown_options(self, index: int = Body(...)):
|
|
"""Get all options from a dropdown"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
selector_map = await self.get_selector_map()
|
|
|
|
if index not in selector_map:
|
|
return self.build_action_result(
|
|
False,
|
|
f"Element with index {index} not found",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=f"Element with index {index} not found"
|
|
)
|
|
|
|
element = selector_map[index]
|
|
options = []
|
|
|
|
# Try to get the options - in a real implementation, we would use appropriate selectors
|
|
try:
|
|
if element.tag_name.lower() == 'select':
|
|
# For <select> elements, get options using JavaScript
|
|
options_js = f"""
|
|
Array.from(document.querySelectorAll('select')[{index-1}].options)
|
|
.map((option, index) => ({
|
|
index: index,
|
|
text: option.text,
|
|
value: option.value
|
|
}));
|
|
"""
|
|
options = await page.evaluate(options_js)
|
|
else:
|
|
# For other dropdown types, try to get options using a more generic approach
|
|
# Example for custom dropdowns - would need refinement in real implementation
|
|
await page.click(f"#{element.attributes.get('id')}") if element.attributes.get('id') else None
|
|
await page.wait_for_timeout(500)
|
|
|
|
options_js = """
|
|
Array.from(document.querySelectorAll('.dropdown-item, [role="option"], li'))
|
|
.filter(el => {
|
|
const style = window.getComputedStyle(el);
|
|
return style.display !== 'none' && style.visibility !== 'hidden';
|
|
})
|
|
.map((option, index) => ({
|
|
index: index,
|
|
text: option.innerText.trim(),
|
|
value: option.getAttribute('value') || option.getAttribute('data-value') || option.innerText.trim()
|
|
}));
|
|
"""
|
|
options = await page.evaluate(options_js)
|
|
|
|
# Close dropdown to restore state
|
|
await page.keyboard.press("Escape")
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting dropdown options: {e}")
|
|
# Fallback to dummy options if real ones cannot be retrieved
|
|
options = [
|
|
{"index": 0, "text": "Option 1", "value": "option1"},
|
|
{"index": 1, "text": "Option 2", "value": "option2"},
|
|
{"index": 2, "text": "Option 3", "value": "option3"},
|
|
]
|
|
|
|
# Get updated state
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"get_dropdown_options({index})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Retrieved {len(options)} options from dropdown",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=json.dumps(options) # Include options in the content field
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
async def select_dropdown_option(self, index: int = Body(...), option_text: str = Body(...)):
|
|
"""Select an option from a dropdown by text"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
selector_map = await self.get_selector_map()
|
|
|
|
if index not in selector_map:
|
|
return self.build_action_result(
|
|
False,
|
|
f"Element with index {index} not found",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=f"Element with index {index} not found"
|
|
)
|
|
|
|
element = selector_map[index]
|
|
|
|
# Try to select the option - implementation varies by dropdown type
|
|
if element.tag_name.lower() == 'select':
|
|
# For standard <select> elements
|
|
selector = f"select option:has-text('{option_text}')"
|
|
await page.select_option(
|
|
f"#{element.attributes.get('id')}" if element.attributes.get('id') else f"//select[{index}]",
|
|
label=option_text
|
|
)
|
|
else:
|
|
# For custom dropdowns
|
|
# First click to open the dropdown
|
|
if element.attributes.get('id'):
|
|
await page.click(f"#{element.attributes.get('id')}")
|
|
else:
|
|
await page.click(f"//{element.tag_name}[{index}]")
|
|
|
|
await page.wait_for_timeout(500)
|
|
|
|
# Then try to click the option
|
|
await page.click(f"text={option_text}")
|
|
|
|
await page.wait_for_timeout(500)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"select_dropdown_option({index}, '{option_text}')")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
f"Selected option '{option_text}' from dropdown with index {index}",
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Drag and Drop
|
|
|
|
async def drag_drop(self, action: DragDropAction = Body(...)):
|
|
"""Perform drag and drop operation"""
|
|
try:
|
|
page = await self.get_current_page()
|
|
|
|
# Element-based drag and drop
|
|
if action.element_source and action.element_target:
|
|
# In a real implementation, we would get the elements and perform the drag
|
|
source_desc = action.element_source
|
|
target_desc = action.element_target
|
|
|
|
# We would locate the elements using selectors and perform the drag
|
|
# For this example, we'll use a simplified version
|
|
await page.evaluate("""
|
|
console.log("Simulating drag and drop between elements");
|
|
""")
|
|
|
|
message = f"Dragged element '{source_desc}' to '{target_desc}'"
|
|
|
|
# Coordinate-based drag and drop
|
|
elif all(coord is not None for coord in [
|
|
action.coord_source_x, action.coord_source_y,
|
|
action.coord_target_x, action.coord_target_y
|
|
]):
|
|
source_x = action.coord_source_x
|
|
source_y = action.coord_source_y
|
|
target_x = action.coord_target_x
|
|
target_y = action.coord_target_y
|
|
|
|
# Perform the drag
|
|
await page.mouse.move(source_x, source_y)
|
|
await page.mouse.down()
|
|
|
|
steps = max(1, action.steps or 10)
|
|
delay_ms = max(0, action.delay_ms or 5)
|
|
|
|
for i in range(1, steps + 1):
|
|
ratio = i / steps
|
|
intermediate_x = int(source_x + (target_x - source_x) * ratio)
|
|
intermediate_y = int(source_y + (target_y - source_y) * ratio)
|
|
await page.mouse.move(intermediate_x, intermediate_y)
|
|
if delay_ms > 0:
|
|
await asyncio.sleep(delay_ms / 1000)
|
|
|
|
await page.mouse.move(target_x, target_y)
|
|
await page.mouse.up()
|
|
|
|
message = f"Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})"
|
|
else:
|
|
return self.build_action_result(
|
|
False,
|
|
"Must provide either source/target selectors or coordinates",
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error="Must provide either source/target selectors or coordinates"
|
|
)
|
|
|
|
# Get updated state after action
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"drag_drop({action.element_source}, {action.element_target})")
|
|
|
|
return self.build_action_result(
|
|
True,
|
|
message,
|
|
dom_state,
|
|
screenshot,
|
|
elements,
|
|
metadata,
|
|
error="",
|
|
content=None
|
|
)
|
|
except Exception as e:
|
|
return self.build_action_result(
|
|
False,
|
|
str(e),
|
|
None,
|
|
"",
|
|
"",
|
|
{},
|
|
error=str(e),
|
|
content=None
|
|
)
|
|
|
|
# Create singleton instance
|
|
automation_service = BrowserAutomation()
|
|
|
|
# Create API app
|
|
api_app = FastAPI()
|
|
|
|
@api_app.get("/api")
|
|
async def health_check():
|
|
return {"status": "ok", "message": "API server is running"}
|
|
|
|
# Include automation service router with /api prefix
|
|
api_app.include_router(automation_service.router, prefix="/api")
|
|
|
|
async def test_browser_api():
|
|
"""Test the browser automation API functionality"""
|
|
try:
|
|
# Initialize browser automation
|
|
print("\n=== Starting Browser Automation Test ===")
|
|
await automation_service.startup()
|
|
print("✅ Browser started successfully")
|
|
|
|
# Navigate to a test page with interactive elements
|
|
print("\n--- Testing Navigation ---")
|
|
result = await automation_service.navigate_to(GoToUrlAction(url="https://www.youtube.com"))
|
|
print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}")
|
|
if not result.success:
|
|
print(f"Error: {result.error}")
|
|
return
|
|
|
|
print(f"URL: {result.url}")
|
|
print(f"Title: {result.title}")
|
|
|
|
# Check DOM state and elements
|
|
print(f"\nFound {result.element_count} interactive elements")
|
|
if result.elements and result.elements.strip():
|
|
print("Elements:")
|
|
print(result.elements)
|
|
else:
|
|
print("No formatted elements found, but DOM was processed")
|
|
|
|
# Display interactive elements as JSON
|
|
if result.interactive_elements and len(result.interactive_elements) > 0:
|
|
print("\nInteractive elements summary:")
|
|
for el in result.interactive_elements:
|
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
|
|
|
|
# Screenshot info
|
|
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}")
|
|
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}")
|
|
|
|
# Test OCR extraction from screenshot
|
|
print("\n--- Testing OCR Text Extraction ---")
|
|
if result.ocr_text:
|
|
print("OCR text extracted from screenshot:")
|
|
print("=== OCR TEXT START ===")
|
|
print(result.ocr_text)
|
|
print("=== OCR TEXT END ===")
|
|
print(f"OCR text length: {len(result.ocr_text)} characters")
|
|
print(result.ocr_text)
|
|
else:
|
|
print("No OCR text extracted from screenshot")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test search functionality
|
|
print("\n--- Testing Search ---")
|
|
result = await automation_service.search_google(SearchGoogleAction(query="browser automation"))
|
|
print(f"Search status: {'✅ Success' if result.success else '❌ Failed'}")
|
|
if not result.success:
|
|
print(f"Error: {result.error}")
|
|
else:
|
|
print(f"Found {result.element_count} elements after search")
|
|
print(f"Page title: {result.title}")
|
|
|
|
# Test OCR extraction from search results
|
|
if result.ocr_text:
|
|
print("\nOCR text from search results:")
|
|
print("=== OCR TEXT START ===")
|
|
print(result.ocr_text)
|
|
print("=== OCR TEXT END ===")
|
|
else:
|
|
print("\nNo OCR text extracted from search results")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test scrolling
|
|
print("\n--- Testing Scrolling ---")
|
|
result = await automation_service.scroll_down(ScrollAction(amount=300))
|
|
print(f"Scroll status: {'✅ Success' if result.success else '❌ Failed'}")
|
|
if result.success:
|
|
print(f"Pixels above viewport: {result.pixels_above}")
|
|
print(f"Pixels below viewport: {result.pixels_below}")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test clicking on an element
|
|
print("\n--- Testing Element Click ---")
|
|
if result.element_count > 0:
|
|
click_result = await automation_service.click_element(ClickElementAction(index=1))
|
|
print(f"Click status: {'✅ Success' if click_result.success else '❌ Failed'}")
|
|
print(f"Message: {click_result.message}")
|
|
print(f"New URL after click: {click_result.url}")
|
|
else:
|
|
print("Skipping click test - no elements found")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test clicking on coordinates
|
|
print("\n--- Testing Click Coordinates ---")
|
|
coord_click_result = await automation_service.click_coordinates(ClickCoordinatesAction(x=100, y=100))
|
|
print(f"Coordinate click status: {'✅ Success' if coord_click_result.success else '❌ Failed'}")
|
|
print(f"Message: {coord_click_result.message}")
|
|
print(f"URL after coordinate click: {coord_click_result.url}")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test extracting content
|
|
print("\n--- Testing Content Extraction ---")
|
|
content_result = await automation_service.extract_content("test goal")
|
|
print(f"Content extraction status: {'✅ Success' if content_result.success else '❌ Failed'}")
|
|
if content_result.content:
|
|
content_preview = content_result.content[:100] + "..." if len(content_result.content) > 100 else content_result.content
|
|
print(f"Content sample: {content_preview}")
|
|
print(f"Total content length: {len(content_result.content)} chars")
|
|
else:
|
|
print("No content was extracted")
|
|
|
|
# Test tab management
|
|
print("\n--- Testing Tab Management ---")
|
|
tab_result = await automation_service.open_tab(OpenTabAction(url="https://www.example.org"))
|
|
print(f"New tab status: {'✅ Success' if tab_result.success else '❌ Failed'}")
|
|
if tab_result.success:
|
|
print(f"New tab title: {tab_result.title}")
|
|
print(f"Interactive elements: {tab_result.element_count}")
|
|
|
|
print("\n✅ All tests completed successfully!")
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Test failed: {str(e)}")
|
|
traceback.print_exc()
|
|
finally:
|
|
# Ensure browser is closed
|
|
print("\n--- Cleaning up ---")
|
|
await automation_service.shutdown()
|
|
print("Browser closed")
|
|
|
|
async def test_browser_api_2():
|
|
"""Test the browser automation API functionality on the chess page"""
|
|
try:
|
|
# Initialize browser automation
|
|
print("\n=== Starting Browser Automation Test 2 (Chess Page) ===")
|
|
await automation_service.startup()
|
|
print("✅ Browser started successfully")
|
|
|
|
# Navigate to the chess test page
|
|
print("\n--- Testing Navigation to Chess Page ---")
|
|
test_url = "https://dat-lequoc.github.io/chess-for-suna/chess.html"
|
|
result = await automation_service.navigate_to(GoToUrlAction(url=test_url))
|
|
print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}")
|
|
if not result.success:
|
|
print(f"Error: {result.error}")
|
|
return
|
|
|
|
print(f"URL: {result.url}")
|
|
print(f"Title: {result.title}")
|
|
|
|
# Check DOM state and elements
|
|
print(f"\nFound {result.element_count} interactive elements")
|
|
if result.elements and result.elements.strip():
|
|
print("Elements:")
|
|
print(result.elements)
|
|
else:
|
|
print("No formatted elements found, but DOM was processed")
|
|
|
|
# Display interactive elements as JSON
|
|
if result.interactive_elements and len(result.interactive_elements) > 0:
|
|
print("\nInteractive elements summary:")
|
|
for el in result.interactive_elements:
|
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
|
|
|
|
# Screenshot info
|
|
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}")
|
|
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
# Test clicking on an element (e.g., a chess square)
|
|
print("\n--- Testing Element Click (element 5) ---")
|
|
if result.element_count > 4: # Ensure element 5 exists
|
|
click_index = 5
|
|
click_result = await automation_service.click_element(ClickElementAction(index=click_index))
|
|
print(f"Click status for element {click_index}: {'✅ Success' if click_result.success else '❌ Failed'}")
|
|
print(f"Message: {click_result.message}")
|
|
print(f"URL after click: {click_result.url}")
|
|
|
|
# Retrieve and display elements again after click
|
|
print(f"\n--- Retrieving elements after clicking element {click_index} ---")
|
|
if click_result.elements and click_result.elements.strip():
|
|
print("Updated Elements:")
|
|
print(click_result.elements)
|
|
else:
|
|
print("No formatted elements found after click.")
|
|
|
|
if click_result.interactive_elements and len(click_result.interactive_elements) > 0:
|
|
print("\nUpdated interactive elements summary:")
|
|
for el in click_result.interactive_elements:
|
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
|
|
else:
|
|
print("No interactive elements found after click.")
|
|
|
|
# Test clicking element 1 after the first click
|
|
print("\n--- Testing Element Click (element 1 after clicking 5) ---")
|
|
if click_result.element_count > 0: # Check if there are still elements
|
|
click_index_2 = 1
|
|
click_result_2 = await automation_service.click_element(ClickElementAction(index=click_index_2))
|
|
print(f"Click status for element {click_index_2}: {'✅ Success' if click_result_2.success else '❌ Failed'}")
|
|
print(f"Message: {click_result_2.message}")
|
|
print(f"URL after click: {click_result_2.url}")
|
|
|
|
# Retrieve and display elements again after the second click
|
|
print(f"\n--- Retrieving elements after clicking element {click_index_2} ---")
|
|
if click_result_2.elements and click_result_2.elements.strip():
|
|
print("Elements after second click:")
|
|
print(click_result_2.elements)
|
|
else:
|
|
print("No formatted elements found after second click.")
|
|
|
|
if click_result_2.interactive_elements and len(click_result_2.interactive_elements) > 0:
|
|
print("\nInteractive elements summary after second click:")
|
|
for el in click_result_2.interactive_elements:
|
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}")
|
|
else:
|
|
print("No interactive elements found after second click.")
|
|
else:
|
|
print("Skipping second element click test - no elements found after first click.")
|
|
|
|
else:
|
|
print("Skipping element click test - fewer than 5 elements found.")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
print("\n✅ Chess Page Test Completed!")
|
|
await asyncio.sleep(100)
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Chess Page Test failed: {str(e)}")
|
|
traceback.print_exc()
|
|
finally:
|
|
# Ensure browser is closed
|
|
print("\n--- Cleaning up ---")
|
|
await automation_service.shutdown()
|
|
print("Browser closed")
|
|
|
|
if __name__ == '__main__':
|
|
import uvicorn
|
|
import sys
|
|
|
|
# Check command line arguments for test mode
|
|
test_mode_1 = "--test" in sys.argv
|
|
test_mode_2 = "--test2" in sys.argv
|
|
|
|
if test_mode_1:
|
|
print("Running in test mode 1")
|
|
asyncio.run(test_browser_api())
|
|
elif test_mode_2:
|
|
print("Running in test mode 2 (Chess Page)")
|
|
asyncio.run(test_browser_api_2())
|
|
else:
|
|
print("Starting API server")
|
|
uvicorn.run("browser_api:api_app", host="0.0.0.0", port=8002) |