From e0bf6f7634d4cba3b464cc67c1bca8bc7cc6c512 Mon Sep 17 00:00:00 2001 From: Josh XT Date: Fri, 29 Nov 2024 08:06:18 -0500 Subject: [PATCH] improve web scraping extension --- agixt/Globals.py | 6 + agixt/extensions/web_playwright.py | 120 ----- agixt/extensions/web_scraping.py | 812 +++++++++++++++++++++++++++++ 3 files changed, 818 insertions(+), 120 deletions(-) delete mode 100644 agixt/extensions/web_playwright.py create mode 100644 agixt/extensions/web_scraping.py diff --git a/agixt/Globals.py b/agixt/Globals.py index a66136f0b091..631d6b92661c 100644 --- a/agixt/Globals.py +++ b/agixt/Globals.py @@ -173,5 +173,11 @@ def get_agixt_training_urls(): ] +def get_output_url(path: str): + agixt_uri = getenv("AGIXT_URI") + new_path = path.split("/WORKSPACE/")[-1] + return f"{agixt_uri}/{new_path}" + + DEFAULT_USER = str(getenv("DEFAULT_USER")).lower() DEFAULT_SETTINGS = get_default_agent_settings() diff --git a/agixt/extensions/web_playwright.py b/agixt/extensions/web_playwright.py deleted file mode 100644 index 45d9ffe229d1..000000000000 --- a/agixt/extensions/web_playwright.py +++ /dev/null @@ -1,120 +0,0 @@ -from typing import List, Union -from requests.compat import urljoin -import logging -import subprocess -import sys - -try: - from bs4 import BeautifulSoup -except ImportError: - subprocess.check_call( - [sys.executable, "-m", "pip", "install", "beautifulsoup4==4.12.2"] - ) - from bs4 import BeautifulSoup -try: - from playwright.async_api import async_playwright -except ImportError: - subprocess.check_call([sys.executable, "-m", "pip", "install", "playwright"]) - from playwright.async_api import async_playwright -from Extensions import Extensions - - -class web_playwright(Extensions): - """ - The Web Playwright extension for AGiXT enables you to scrape webpages and take screenshots using Playwright. - """ - - def __init__(self, **kwargs): - self.commands = { - "Scrape Text with Playwright": self.scrape_text_with_playwright, - "Scrape Links with Playwright": self.scrape_links_with_playwright, - "Take Screenshot with Playwright": self.take_screenshot_with_playwright, - } - - async def scrape_text_with_playwright(self, url: str) -> str: - """ - Scrape the text content of a webpage using Playwright - - Args: - url (str): The URL of the webpage to scrape - - Returns: - str: The text content of the webpage - """ - try: - async with async_playwright() as p: - browser = await p.chromium.launch() - context = await browser.new_context() - page = await context.new_page() - await page.goto(url) - html_content = await page.content() - soup = BeautifulSoup(html_content, "html.parser") - for script in soup(["script", "style"]): - script.extract() - text = soup.get_text() - lines = (line.strip() for line in text.splitlines()) - chunks = ( - phrase.strip() for line in lines for phrase in line.split(" ") - ) - text = "\n".join(chunk for chunk in chunks if chunk) - await browser.close() - - except Exception as e: - text = f"Error: {str(e)}" - return text - - async def scrape_links_with_playwright(self, url: str) -> Union[str, List[str]]: - """ - Scrape the hyperlinks of a webpage using Playwright - - Args: - url (str): The URL of the webpage to scrape - - Returns: - Union[str, List[str]]: The hyperlinks of the webpage - """ - try: - async with async_playwright() as p: - browser = await p.chromium.launch() - context = await browser.new_context() - page = await context.new_page() - await page.goto(url) - html_content = await page.content() - soup = BeautifulSoup(html_content, "html.parser") - for script in soup(["script", "style"]): - script.extract() - hyperlinks = [ - (link.text, urljoin(url, link["href"])) - for link in soup.find_all("a", href=True) - ] - formatted_links = [ - f"{link_text} ({link_url})" for link_text, link_url in hyperlinks - ] - await browser.close() - - except Exception as e: - formatted_links = f"Error: {str(e)}" - return formatted_links - - async def take_screenshot_with_playwright(self, url: str, path: str): - """ - Take a screenshot of a webpage using Playwright - - Args: - url (str): The URL of the webpage to take a screenshot of - path (str): The path to save the screenshot - - Returns: - str: The result of taking the screenshot - """ - try: - async with async_playwright() as p: - browser = await p.chromium.launch() - context = await browser.new_context() - page = await context.new_page() - await page.goto(url) - await page.screenshot(path=path, full_page=True, type="png") - await browser.close() - except Exception as e: - logging.error(f"Playwright Error: {str(e)}") - return f"Error: {str(e)}" diff --git a/agixt/extensions/web_scraping.py b/agixt/extensions/web_scraping.py new file mode 100644 index 000000000000..6378401ba35d --- /dev/null +++ b/agixt/extensions/web_scraping.py @@ -0,0 +1,812 @@ +from typing import List, Union +from requests.compat import urljoin +import logging +import subprocess +import uuid +import sys +import os + +try: + from bs4 import BeautifulSoup +except ImportError: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "beautifulsoup4==4.12.2"] + ) + from bs4 import BeautifulSoup +try: + from playwright.async_api import async_playwright +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "playwright"]) + from playwright.async_api import async_playwright + +# Additional dependencies +try: + import pyotp +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "pyotp"]) + import pyotp +try: + import cv2 +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "opencv-python"]) + import cv2 +try: + import numpy as np +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "numpy"]) + import numpy as np +try: + from pyzbar.pyzbar import decode +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "pyzbar"]) + from pyzbar.pyzbar import decode +try: + import pytesseract +except ImportError: + subprocess.check_call([sys.executable, "-m", "pip", "install", "pytesseract"]) + import pytesseract + +from Extensions import Extensions +from Globals import getenv, get_output_url + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +class web_scraping(Extensions): + """ + The AGiXT Web Scraping extension enables you to interact with webpages using Playwright. + This includes scraping text and links, taking screenshots, interacting with page elements, + handling authentication flows, and more. + """ + + def __init__(self, **kwargs): + self.agent_name = kwargs.get("agent_name", "gpt4free") + self.conversation_name = kwargs.get("conversation_name", "") + self.WORKING_DIRECTORY = kwargs.get( + "conversation_directory", os.path.join(os.getcwd(), "WORKSPACE") + ) + os.makedirs(self.WORKING_DIRECTORY, exist_ok=True) + self.conversation_id = kwargs.get("conversation_id", "") + self.commands = { + "Scrape Text with Playwright": self.scrape_text_with_playwright, + "Scrape Links with Playwright": self.scrape_links_with_playwright, + "Take Screenshot with Playwright": self.take_screenshot_with_playwright, + "Navigate to URL with Playwright": self.navigate_to_url_with_playwright, + "Click Element with Playwright": self.click_element_with_playwright, + "Fill Input with Playwright": self.fill_input_with_playwright, + "Select Option with Playwright": self.select_option_with_playwright, + "Check Checkbox with Playwright": self.check_checkbox_with_playwright, + "Handle MFA with Playwright": self.handle_mfa_with_playwright, + "Handle Popup with Playwright": self.handle_popup_with_playwright, + "Upload File with Playwright": self.upload_file_with_playwright, + "Download File with Playwright": self.download_file_with_playwright, + "Go Back with Playwright": self.go_back_with_playwright, + "Go Forward with Playwright": self.go_forward_with_playwright, + "Wait for Selector with Playwright": self.wait_for_selector_with_playwright, + "Extract Table with Playwright": self.extract_table_with_playwright, + "Assert Element with Playwright": self.assert_element_with_playwright, + "Evaluate JavaScript with Playwright": self.evaluate_javascript_with_playwright, + "Close Browser with Playwright": self.close_browser_with_playwright, + # Additional features + "Set Viewport with Playwright": self.set_viewport_with_playwright, + "Emulate Device with Playwright": self.emulate_device_with_playwright, + "Get Cookies with Playwright": self.get_cookies_with_playwright, + "Set Cookies with Playwright": self.set_cookies_with_playwright, + "Handle Dialog with Playwright": self.handle_dialog_with_playwright, + "Intercept Requests with Playwright": self.intercept_requests_with_playwright, + "Take Screenshot with Highlight with Playwright": self.take_screenshot_with_highlight_with_playwright, + "Extract Text from Image with Playwright": self.extract_text_from_image_with_playwright, + } + # Initialize Playwright attributes + self.playwright = None + self.browser = None + self.context = None + self.page = None + self.popup = None + + async def scrape_text_with_playwright(self, url: str) -> str: + """ + Scrape the text content of a webpage using Playwright + + Args: + url (str): The URL of the webpage to scrape + + Returns: + str: The text content of the webpage + """ + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + context = await browser.new_context() + page = await context.new_page() + await page.goto(url) + html_content = await page.content() + soup = BeautifulSoup(html_content, "html.parser") + for script in soup(["script", "style"]): + script.extract() + text = soup.get_text() + lines = (line.strip() for line in text.splitlines()) + chunks = ( + phrase.strip() for line in lines for phrase in line.split(" ") + ) + text = "\n".join(chunk for chunk in chunks if chunk) + await browser.close() + + except Exception as e: + text = f"Error: {str(e)}" + return text + + async def scrape_links_with_playwright(self, url: str) -> Union[str, List[str]]: + """ + Scrape the hyperlinks of a webpage using Playwright + + Args: + url (str): The URL of the webpage to scrape + + Returns: + Union[str, List[str]]: The hyperlinks of the webpage + """ + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + context = await browser.new_context() + page = await context.new_page() + await page.goto(url) + html_content = await page.content() + soup = BeautifulSoup(html_content, "html.parser") + for script in soup(["script", "style"]): + script.extract() + hyperlinks = [ + (link.text, urljoin(url, link["href"])) + for link in soup.find_all("a", href=True) + ] + formatted_links = [ + f"{link_text} ({link_url})" for link_text, link_url in hyperlinks + ] + await browser.close() + + except Exception as e: + formatted_links = f"Error: {str(e)}" + return formatted_links + + async def take_screenshot_with_playwright(self, url: str): + """ + Take a screenshot of a webpage using Playwright + + Args: + url (str): The URL of the webpage to take a screenshot of + + Returns: + str: The URL of the screenshot + """ + path = os.path.join(self.WORKING_DIRECTORY, f"{uuid.uuid4()}.png") + output_url = get_output_url(path) + try: + async with async_playwright() as p: + browser = await p.chromium.launch() + context = await browser.new_context() + page = await context.new_page() + await page.goto(url) + await page.screenshot(path=path, full_page=True, type="png") + await browser.close() + return output_url + except Exception as e: + logging.error(f"Playwright Error: {str(e)}") + return f"Error: {str(e)}" + + async def navigate_to_url_with_playwright( + self, url: str, headless: bool = True + ) -> str: + """ + Navigate to a URL using Playwright + + Args: + url (str): The URL to navigate to + headless (bool): Whether to run the browser in headless mode + + Returns: + str: Confirmation message + """ + try: + if self.playwright is None: + self.playwright = await async_playwright().start() + self.browser = await self.playwright.chromium.launch(headless=headless) + self.context = await self.browser.new_context() + self.page = await self.context.new_page() + await self.page.goto(url) + logging.info(f"Navigated to {url}") + return f"Navigated to {url}" + except Exception as e: + logging.error(f"Error navigating to {url}: {str(e)}") + return f"Error: {str(e)}" + + async def click_element_with_playwright(self, selector: str) -> str: + """ + Click an element specified by a selector using Playwright + + Args: + selector (str): The CSS selector of the element to click + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + await self.page.click(selector) + logging.info(f"Clicked element with selector {selector}") + return f"Clicked element with selector {selector}" + except Exception as e: + logging.error(f"Error clicking element {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def fill_input_with_playwright(self, selector: str, text: str) -> str: + """ + Fill an input field specified by a selector using Playwright + + Args: + selector (str): The CSS selector of the input field + text (str): The text to fill into the input field + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + await self.page.fill(selector, text) + logging.info(f"Filled input {selector} with text '{text}'") + return f"Filled input {selector} with text '{text}'" + except Exception as e: + logging.error(f"Error filling input {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def select_option_with_playwright(self, selector: str, value: str) -> str: + """ + Select an option from a dropdown menu specified by a selector using Playwright + + Args: + selector (str): The CSS selector of the dropdown element + value (str): The value or label of the option to select + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + await self.page.select_option(selector, value) + logging.info(f"Selected option '{value}' in dropdown '{selector}'") + return f"Selected option '{value}' in dropdown '{selector}'" + except Exception as e: + logging.error(f"Error selecting option {value} in {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def check_checkbox_with_playwright(self, selector: str) -> str: + """ + Check a checkbox specified by a selector using Playwright + + Args: + selector (str): The CSS selector of the checkbox + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + await self.page.check(selector) + logging.info(f"Checked checkbox '{selector}'") + return f"Checked checkbox '{selector}'" + except Exception as e: + logging.error(f"Error checking checkbox {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def handle_mfa_with_playwright(self, otp_selector: str) -> str: + """ + Handle MFA by detecting QR code on the page, decoding it, and entering the TOTP code + + Args: + otp_selector (str): The CSS selector where the OTP code should be entered + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + + # Take a screenshot of the page + screenshot = await self.page.screenshot() + # Decode QR code from the screenshot + nparr = np.frombuffer(screenshot, np.uint8) + img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + decoded_objects = decode(img) + otp_uri = None + for obj in decoded_objects: + if obj.type == "QRCODE": + otp_uri = obj.data.decode("utf-8") + break + if not otp_uri: + return "Error: No QR code found on the page." + # Extract secret key from OTP URI + import re + + match = re.search(r"secret=([\w\d]+)", otp_uri) + if match: + secret_key = match.group(1) + totp = pyotp.TOTP(secret_key) + otp_token = totp.now() + # Enter the OTP token into the input field + await self.page.fill(otp_selector, otp_token) + # Submit the form (you might need to adjust the selector) + await self.page.click('button[type="submit"]') + logging.info("MFA handled successfully.") + return "MFA handled successfully." + else: + return "Error: Failed to extract secret key from OTP URI." + except Exception as e: + logging.error(f"Error handling MFA: {str(e)}") + return f"Error: {str(e)}" + + async def handle_popup_with_playwright(self) -> str: + """ + Handle a popup window, such as during an OAuth flow + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + + popup = await self.page.wait_for_event("popup") + self.popup = popup + logging.info(f"Popup opened with URL: {popup.url}") + # You can now interact with the popup as a new page + # Example: await self.popup.fill("#email", "user@example.com") + # Close the popup when done + await self.popup.close() + self.popup = None + logging.info("Popup handled and closed successfully.") + return "Popup handled successfully." + except Exception as e: + logging.error(f"Error handling popup: {str(e)}") + return f"Error: {str(e)}" + + async def upload_file_with_playwright(self, selector: str, file_path: str) -> str: + """ + Upload a file to a file input element specified by a selector using Playwright + + Args: + selector (str): The CSS selector of the file input element + file_path (str): The path to the file to upload + + Returns: + str: Confirmation message + """ + try: + if not os.path.isfile(file_path): + return f"Error: File '{file_path}' does not exist." + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + await self.page.set_input_files(selector, file_path) + logging.info(f"Uploaded file '{file_path}' to input '{selector}'") + return f"Uploaded file '{file_path}' to input '{selector}'" + except Exception as e: + logging.error(f"Error uploading file {file_path}: {str(e)}") + return f"Error: {str(e)}" + + async def download_file_with_playwright( + self, download_url: str, save_path: str + ) -> str: + """ + Download a file from a URL using Playwright + + Args: + download_url (str): The URL of the file to download + save_path (str): The path to save the downloaded file + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + await self.navigate_to_url_with_playwright(download_url) + # Start the download + download = await self.page.wait_for_event("download") + # Save the file to the specified path + await download.save_as(save_path) + logging.info(f"Downloaded file to '{save_path}'") + return f"Downloaded file to '{save_path}'" + except Exception as e: + logging.error(f"Error downloading file from {download_url}: {str(e)}") + return f"Error: {str(e)}" + + async def go_back_with_playwright(self) -> str: + """ + Navigate back in the browser history + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + await self.page.go_back() + logging.info("Navigated back in browser history.") + return "Navigated back in browser history." + except Exception as e: + logging.error(f"Error navigating back: {str(e)}") + return f"Error: {str(e)}" + + async def go_forward_with_playwright(self) -> str: + """ + Navigate forward in the browser history + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + await self.page.go_forward() + logging.info("Navigated forward in browser history.") + return "Navigated forward in browser history." + except Exception as e: + logging.error(f"Error navigating forward: {str(e)}") + return f"Error: {str(e)}" + + async def wait_for_selector_with_playwright( + self, selector: str, timeout: int = 30000 + ) -> str: + """ + Wait for an element to appear on the page + + Args: + selector (str): The CSS selector of the element + timeout (int): Maximum time to wait in milliseconds + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + await self.page.wait_for_selector(selector, timeout=timeout) + logging.info(f"Element '{selector}' appeared on the page.") + return f"Element '{selector}' appeared on the page." + except Exception as e: + logging.error(f"Error waiting for selector {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def extract_table_with_playwright( + self, selector: str + ) -> Union[str, List[List[str]]]: + """ + Extract data from a table element + + Args: + selector (str): The CSS selector of the table element + + Returns: + Union[str, List[List[str]]]: The extracted table data or error message + """ + try: + if self.page is None: + return "Error: No page loaded." + table = await self.page.query_selector(selector) + if not table: + return f"Error: Table '{selector}' not found." + rows = await table.query_selector_all("tr") + table_data = [] + for row in rows: + cells = await row.query_selector_all("th, td") + cell_texts = [await cell.inner_text() for cell in cells] + table_data.append(cell_texts) + logging.info(f"Extracted data from table '{selector}'.") + return table_data + except Exception as e: + logging.error(f"Error extracting table {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def assert_element_with_playwright( + self, selector: str, expected_text: str + ) -> str: + """ + Assert that an element contains the expected text + + Args: + selector (str): The CSS selector of the element + expected_text (str): The expected text content + + Returns: + str: Confirmation message or error message + """ + try: + if self.page is None: + return "Error: No page loaded." + element = await self.page.query_selector(selector) + if not element: + return f"Error: Element '{selector}' not found." + text_content = await element.inner_text() + if expected_text in text_content: + logging.info( + f"Assertion passed: '{expected_text}' is in element '{selector}'." + ) + return ( + f"Assertion passed: '{expected_text}' is in element '{selector}'." + ) + else: + logging.warning( + f"Assertion failed: '{expected_text}' not found in element '{selector}'." + ) + return f"Assertion failed: '{expected_text}' not found in element '{selector}'." + except Exception as e: + logging.error(f"Error asserting element {selector}: {str(e)}") + return f"Error: {str(e)}" + + async def evaluate_javascript_with_playwright(self, script: str) -> str: + """ + Evaluate JavaScript code on the page using Playwright + + Args: + script (str): The JavaScript code to evaluate + + Returns: + str: The result of the script evaluation + """ + try: + if self.page is None: + return "Error: No page loaded. Please navigate to a URL first." + result = await self.page.evaluate(script) + logging.info(f"Evaluated script: {script}") + return f"Script result: {result}" + except Exception as e: + logging.error(f"Error evaluating script: {str(e)}") + return f"Error: {str(e)}" + + async def close_browser_with_playwright(self) -> str: + """ + Close the Playwright browser instance + + Returns: + str: Confirmation message + """ + try: + if self.browser is not None: + await self.browser.close() + self.browser = None + self.context = None + self.page = None + if self.playwright is not None: + await self.playwright.stop() + self.playwright = None + logging.info("Browser closed successfully.") + return "Browser closed successfully." + else: + logging.info("Browser is already closed.") + return "Browser is already closed." + except Exception as e: + logging.error(f"Error closing browser: {str(e)}") + return f"Error: {str(e)}" + + # Additional features + + async def set_viewport_with_playwright(self, width: int, height: int) -> str: + """ + Set the viewport size of the browser + + Args: + width (int): The width of the viewport + height (int): The height of the viewport + + Returns: + str: Confirmation message + """ + try: + if self.context is None: + return "Error: Browser context not initialized." + await self.context.set_viewport_size({"width": width, "height": height}) + logging.info(f"Viewport size set to {width}x{height}.") + return f"Viewport size set to {width}x{height}." + except Exception as e: + logging.error(f"Error setting viewport size: {str(e)}") + return f"Error: {str(e)}" + + async def emulate_device_with_playwright(self, device_name: str) -> str: + """ + Emulate a device using predefined device settings + + Args: + device_name (str): The name of the device to emulate (e.g., 'iPhone 12') + + Returns: + str: Confirmation message + """ + try: + if self.playwright is None: + return "Error: Playwright not started." + device = self.playwright.devices.get(device_name) + if not device: + return f"Error: Device '{device_name}' not found." + if self.context is not None: + await self.context.close() + self.context = await self.browser.new_context(**device) + self.page = await self.context.new_page() + logging.info(f"Emulating device '{device_name}'.") + return f"Emulating device '{device_name}'." + except Exception as e: + logging.error(f"Error emulating device {device_name}: {str(e)}") + return f"Error: {str(e)}" + + async def get_cookies_with_playwright(self) -> Union[str, List[dict]]: + """ + Get cookies from the current page + + Returns: + Union[str, List[dict]]: List of cookies or error message + """ + try: + if self.context is None: + return "Error: Browser context not initialized." + cookies = await self.context.cookies() + logging.info("Retrieved cookies from the browser context.") + return cookies + except Exception as e: + logging.error(f"Error getting cookies: {str(e)}") + return f"Error: {str(e)}" + + async def set_cookies_with_playwright(self, cookies: List[dict]) -> str: + """ + Set cookies in the browser context + + Args: + cookies (List[dict]): List of cookie dictionaries + + Returns: + str: Confirmation message + """ + try: + if self.context is None: + return "Error: Browser context not initialized." + await self.context.add_cookies(cookies) + logging.info("Cookies set in the browser context.") + return "Cookies set successfully." + except Exception as e: + logging.error(f"Error setting cookies: {str(e)}") + return f"Error: {str(e)}" + + async def handle_dialog_with_playwright( + self, action: str = "accept", prompt_text: str = "" + ) -> str: + """ + Handle JavaScript dialogs (alerts, confirms, prompts) + + Args: + action (str): The action to perform ('accept' or 'dismiss') + prompt_text (str): Text to enter into a prompt dialog + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + + async def dialog_handler(dialog): + if action == "accept": + await dialog.accept(prompt_text) + logging.info(f"Dialog accepted with text '{prompt_text}'.") + else: + await dialog.dismiss() + logging.info("Dialog dismissed.") + + self.page.on("dialog", dialog_handler) + return f"Dialog will be handled with action '{action}'." + except Exception as e: + logging.error(f"Error handling dialog: {str(e)}") + return f"Error: {str(e)}" + + async def intercept_requests_with_playwright( + self, url_pattern: str, action: str = "block" + ) -> str: + """ + Intercept network requests matching a pattern + + Args: + url_pattern (str): The URL pattern to match + action (str): The action to perform ('block', 'modify', 'continue') + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + + async def route_handler(route, request): + if action == "block": + await route.abort() + logging.info(f"Blocked request to {request.url}") + elif action == "continue": + await route.continue_() + logging.info(f"Allowed request to {request.url}") + # Add 'modify' action as needed + + await self.page.route(url_pattern, route_handler) + return f"Requests matching '{url_pattern}' will be handled with action '{action}'." + except Exception as e: + logging.error(f"Error intercepting requests: {str(e)}") + return f"Error: {str(e)}" + + async def take_screenshot_with_highlight_with_playwright( + self, selector: str, save_path: str + ) -> str: + """ + Take a screenshot of the page with a specific element highlighted + + Args: + selector (str): The CSS selector of the element to highlight + save_path (str): The path to save the screenshot + + Returns: + str: Confirmation message + """ + try: + if self.page is None: + return "Error: No page loaded." + # Add a highlight style to the element + await self.page.evaluate( + f""" + const element = document.querySelector('{selector}'); + if (element) {{ + element.style.border = '2px solid red'; + }} + """ + ) + await self.page.screenshot(path=save_path, full_page=True) + logging.info( + f"Screenshot saved to '{save_path}' with element '{selector}' highlighted." + ) + return f"Screenshot saved to '{save_path}' with element '{selector}' highlighted." + except Exception as e: + logging.error(f"Error taking screenshot with highlight: {str(e)}") + return f"Error: {str(e)}" + + async def extract_text_from_image_with_playwright( + self, image_selector: str + ) -> Union[str, List[str]]: + """ + Extract text from an image element using OCR + + Args: + image_selector (str): The CSS selector of the image element + + Returns: + Union[str, List[str]]: Extracted text or error message + """ + try: + if self.page is None: + return "Error: No page loaded." + element = await self.page.query_selector(image_selector) + if not element: + return f"Error: Image '{image_selector}' not found." + # Take a screenshot of the image element + image_bytes = await element.screenshot() + # Use pytesseract to extract text + from PIL import Image + import io + + image = Image.open(io.BytesIO(image_bytes)) + text = pytesseract.image_to_string(image) + logging.info(f"Extracted text from image '{image_selector}'.") + return text + except Exception as e: + logging.error( + f"Error extracting text from image {image_selector}: {str(e)}" + ) + return f"Error: {str(e)}"