From 7c23ca549b797829470c9dbee2b3e36d9ba21b23 Mon Sep 17 00:00:00 2001 From: xhguo Date: Fri, 12 Dec 2025 17:58:20 +0800 Subject: [PATCH] feat: Added iOS support --- .claude/settings.local.json | 12 + .venv | 1 + ios.py | 550 +++++++++++++++++++++++++++++ main.py | 0 phone_agent/__init__.py | 5 +- phone_agent/actions/handler_ios.py | 280 +++++++++++++++ phone_agent/agent_ios.py | 277 +++++++++++++++ phone_agent/config/__init__.py | 2 + phone_agent/config/apps.py | 2 +- phone_agent/config/apps_ios.py | 339 ++++++++++++++++++ phone_agent/xctest/__init__.py | 47 +++ phone_agent/xctest/connection.py | 382 ++++++++++++++++++++ phone_agent/xctest/device.py | 458 ++++++++++++++++++++++++ phone_agent/xctest/input.py | 299 ++++++++++++++++ phone_agent/xctest/screenshot.py | 230 ++++++++++++ requirements.txt | 3 + 16 files changed, 2884 insertions(+), 3 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 .venv create mode 100755 ios.py mode change 100644 => 100755 main.py create mode 100644 phone_agent/actions/handler_ios.py create mode 100644 phone_agent/agent_ios.py create mode 100644 phone_agent/config/apps_ios.py create mode 100644 phone_agent/xctest/__init__.py create mode 100644 phone_agent/xctest/connection.py create mode 100644 phone_agent/xctest/device.py create mode 100644 phone_agent/xctest/input.py create mode 100644 phone_agent/xctest/screenshot.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..7ee4033 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,12 @@ +{ + "permissions": { + "allow": [ + "Bash(tree:*)", + "Bash(chmod:*)", + "Bash(ls:*)", + "Bash(python:*)" + ], + "deny": [], + "ask": [] + } +} diff --git a/.venv b/.venv new file mode 100644 index 0000000..b9d8d9b --- /dev/null +++ b/.venv @@ -0,0 +1 @@ +Open-AutoGLM diff --git a/ios.py b/ios.py new file mode 100755 index 0000000..78dfc5e --- /dev/null +++ b/ios.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 +""" +Phone Agent iOS CLI - AI-powered iOS phone automation. + +Usage: + python ios.py [OPTIONS] + +Environment Variables: + PHONE_AGENT_BASE_URL: Model API base URL (default: http://localhost:8000/v1) + PHONE_AGENT_MODEL: Model name (default: autoglm-phone-9b) + PHONE_AGENT_MAX_STEPS: Maximum steps per task (default: 100) + PHONE_AGENT_WDA_URL: WebDriverAgent URL (default: http://localhost:8100) + PHONE_AGENT_DEVICE_ID: iOS device UDID for multi-device setups +""" + +import argparse +import os +import shutil +import subprocess +import sys +from urllib.parse import urlparse + +from openai import OpenAI + +from phone_agent.agent_ios import IOSAgentConfig, IOSPhoneAgent +from phone_agent.config.apps_ios import list_supported_apps +from phone_agent.model import ModelConfig +from phone_agent.xctest import XCTestConnection, list_devices + + +def check_system_requirements(wda_url: str = "http://localhost:8100") -> bool: + """ + Check system requirements before running the agent. + + Checks: + 1. libimobiledevice tools installed + 2. At least one iOS device connected + 3. WebDriverAgent is running + + Args: + wda_url: WebDriverAgent URL to check. + + Returns: + True if all checks pass, False otherwise. + """ + print("🔍 Checking system requirements...") + print("-" * 50) + + all_passed = True + + # Check 1: libimobiledevice installed + print("1. Checking libimobiledevice installation...", end=" ") + if shutil.which("idevice_id") is None: + print("❌ FAILED") + print(" Error: libimobiledevice is not installed or not in PATH.") + print(" Solution: Install libimobiledevice:") + print(" - macOS: brew install libimobiledevice") + print(" - Linux: sudo apt-get install libimobiledevice-utils") + all_passed = False + else: + # Double check by running idevice_id + try: + result = subprocess.run( + ["idevice_id", "-ln"], capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + print("✅ OK") + else: + print("❌ FAILED") + print(" Error: idevice_id command failed to run.") + all_passed = False + except FileNotFoundError: + print("❌ FAILED") + print(" Error: idevice_id command not found.") + all_passed = False + except subprocess.TimeoutExpired: + print("❌ FAILED") + print(" Error: idevice_id command timed out.") + all_passed = False + + # If libimobiledevice is not installed, skip remaining checks + if not all_passed: + print("-" * 50) + print("❌ System check failed. Please fix the issues above.") + return False + + # Check 2: iOS Device connected + print("2. Checking connected iOS devices...", end=" ") + try: + devices = list_devices() + + if not devices: + print("❌ FAILED") + print(" Error: No iOS devices connected.") + print(" Solution:") + print(" 1. Connect your iOS device via USB") + print(" 2. Unlock the device and tap 'Trust This Computer'") + print(" 3. Verify connection: idevice_id -l") + print(" 4. Or connect via WiFi using device IP") + all_passed = False + else: + device_names = [ + d.device_name or d.device_id[:8] + "..." for d in devices + ] + print(f"✅ OK ({len(devices)} device(s): {', '.join(device_names)})") + except Exception as e: + print("❌ FAILED") + print(f" Error: {e}") + all_passed = False + + # If no device connected, skip WebDriverAgent check + if not all_passed: + print("-" * 50) + print("❌ System check failed. Please fix the issues above.") + return False + + # Check 3: WebDriverAgent running + print(f"3. Checking WebDriverAgent ({wda_url})...", end=" ") + try: + conn = XCTestConnection(wda_url=wda_url) + + if conn.is_wda_ready(): + print("✅ OK") + # Get WDA status for additional info + status = conn.get_wda_status() + if status: + session_id = status.get("sessionId", "N/A") + print(f" Session ID: {session_id}") + else: + print("❌ FAILED") + print(" Error: WebDriverAgent is not running or not accessible.") + print(" Solution:") + print(" 1. Run WebDriverAgent on your iOS device via Xcode") + print(" 2. For USB: Set up port forwarding: iproxy 8100 8100") + print( + " 3. For WiFi: Use device IP, e.g., --wda-url http://192.168.1.100:8100" + ) + print(" 4. Verify in browser: open http://localhost:8100/status") + print("\n Quick setup guide:") + print( + " git clone https://github.com/appium/WebDriverAgent.git && cd WebDriverAgent" + ) + print(" ./Scripts/bootstrap.sh") + print(" open WebDriverAgent.xcodeproj") + print(" # Configure signing, then Product > Test (Cmd+U)") + all_passed = False + except Exception as e: + print("❌ FAILED") + print(f" Error: {e}") + all_passed = False + + print("-" * 50) + + if all_passed: + print("✅ All system checks passed!\n") + else: + print("❌ System check failed. Please fix the issues above.") + + return all_passed + + +def check_model_api(base_url: str, api_key: str, model_name: str) -> bool: + """ + Check if the model API is accessible and the specified model exists. + + Checks: + 1. Network connectivity to the API endpoint + 2. Model exists in the available models list + + Args: + base_url: The API base URL + model_name: The model name to check + + Returns: + True if all checks pass, False otherwise. + """ + print("🔍 Checking model API...") + print("-" * 50) + + all_passed = True + + # Check 1: Network connectivity + print(f"1. Checking API connectivity ({base_url})...", end=" ") + try: + # Parse the URL to get host and port + parsed = urlparse(base_url) + + # Create OpenAI client + client = OpenAI(base_url=base_url, api_key=api_key, timeout=10.0) + + # Try to list models (this tests connectivity) + models_response = client.models.list() + available_models = [model.id for model in models_response.data] + + print("✅ OK") + + # Check 2: Model exists + print(f"2. Checking model '{model_name}'...", end=" ") + if model_name in available_models: + print("✅ OK") + else: + print("❌ FAILED") + print(f" Error: Model '{model_name}' not found.") + print(f" Available models:") + for m in available_models[:10]: # Show first 10 models + print(f" - {m}") + if len(available_models) > 10: + print(f" ... and {len(available_models) - 10} more") + all_passed = False + + except Exception as e: + print("❌ FAILED") + error_msg = str(e) + + # Provide more specific error messages + if "Connection refused" in error_msg or "Connection error" in error_msg: + print(f" Error: Cannot connect to {base_url}") + print(" Solution:") + print(" 1. Check if the model server is running") + print(" 2. Verify the base URL is correct") + print(f" 3. Try: curl {base_url}/models") + elif "timed out" in error_msg.lower() or "timeout" in error_msg.lower(): + print(f" Error: Connection to {base_url} timed out") + print(" Solution:") + print(" 1. Check your network connection") + print(" 2. Verify the server is responding") + elif ( + "Name or service not known" in error_msg + or "nodename nor servname" in error_msg + ): + print(f" Error: Cannot resolve hostname") + print(" Solution:") + print(" 1. Check the URL is correct") + print(" 2. Verify DNS settings") + else: + print(f" Error: {error_msg}") + + all_passed = False + + print("-" * 50) + + if all_passed: + print("✅ Model API checks passed!\n") + else: + print("❌ Model API check failed. Please fix the issues above.") + + return all_passed + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Phone Agent iOS - AI-powered iOS phone automation", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Run with default settings + python ios.py + + # Specify model endpoint + python ios.py --base-url http://localhost:8000/v1 + + # Run with specific device + python ios.py --device-id + + # Use WiFi connection + python ios.py --wda-url http://192.168.1.100:8100 + + # List connected devices + python ios.py --list-devices + + # Check device pairing status + python ios.py --pair + + # List supported apps + python ios.py --list-apps + + # Run a specific task + python ios.py "Open Safari and search for iPhone tips" + """, + ) + + # Model options + parser.add_argument( + "--base-url", + type=str, + default=os.getenv("PHONE_AGENT_BASE_URL", "http://localhost:8000/v1"), + help="Model API base URL", + ) + + parser.add_argument( + "--api-key", + type=str, + default="EMPTY", + help="Model API KEY", + ) + + parser.add_argument( + "--model", + type=str, + default=os.getenv("PHONE_AGENT_MODEL", "autoglm-phone-9b"), + help="Model name", + ) + + parser.add_argument( + "--max-steps", + type=int, + default=int(os.getenv("PHONE_AGENT_MAX_STEPS", "100")), + help="Maximum steps per task", + ) + + # iOS Device options + parser.add_argument( + "--device-id", + "-d", + type=str, + default=os.getenv("PHONE_AGENT_DEVICE_ID"), + help="iOS device UDID", + ) + + parser.add_argument( + "--wda-url", + type=str, + default=os.getenv("PHONE_AGENT_WDA_URL", "http://localhost:8100"), + help="WebDriverAgent URL (default: http://localhost:8100)", + ) + + parser.add_argument( + "--list-devices", action="store_true", help="List connected iOS devices and exit" + ) + + parser.add_argument( + "--pair", + action="store_true", + help="Pair with iOS device (required for some operations)", + ) + + parser.add_argument( + "--wda-status", + action="store_true", + help="Show WebDriverAgent status and exit", + ) + + # Other options + parser.add_argument( + "--quiet", "-q", action="store_true", help="Suppress verbose output" + ) + + parser.add_argument( + "--list-apps", action="store_true", help="List supported apps and exit" + ) + + parser.add_argument( + "--lang", + type=str, + choices=["cn", "en"], + default=os.getenv("PHONE_AGENT_LANG", "cn"), + help="Language for system prompt (cn or en, default: cn)", + ) + + parser.add_argument( + "task", + nargs="?", + type=str, + help="Task to execute (interactive mode if not provided)", + ) + + return parser.parse_args() + + +def handle_device_commands(args) -> bool: + """ + Handle iOS device-related commands. + + Returns: + True if a device command was handled (should exit), False otherwise. + """ + conn = XCTestConnection(wda_url=args.wda_url) + + # Handle --list-devices + if args.list_devices: + devices = list_devices() + if not devices: + print("No iOS devices connected.") + print("\nTroubleshooting:") + print(" 1. Connect device via USB") + print(" 2. Unlock device and trust this computer") + print(" 3. Run: idevice_id -l") + else: + print("Connected iOS devices:") + print("-" * 70) + for device in devices: + conn_type = device.connection_type.value + model_info = f"{device.model}" if device.model else "Unknown" + ios_info = f"iOS {device.ios_version}" if device.ios_version else "" + name_info = device.device_name or "Unnamed" + + print(f" ✓ {name_info}") + print(f" UDID: {device.device_id}") + print(f" Model: {model_info}") + print(f" OS: {ios_info}") + print(f" Connection: {conn_type}") + print("-" * 70) + return True + + # Handle --pair + if args.pair: + print("Pairing with iOS device...") + success, message = conn.pair_device(args.device_id) + print(f"{'✓' if success else '✗'} {message}") + return True + + # Handle --wda-status + if args.wda_status: + print(f"Checking WebDriverAgent status at {args.wda_url}...") + print("-" * 50) + + if conn.is_wda_ready(): + print("✓ WebDriverAgent is running") + + status = conn.get_wda_status() + if status: + print(f"\nStatus details:") + value = status.get("value", {}) + print(f" Session ID: {status.get('sessionId', 'N/A')}") + print(f" Build: {value.get('build', {}).get('time', 'N/A')}") + + current_app = value.get("currentApp", {}) + if current_app: + print(f"\nCurrent App:") + print(f" Bundle ID: {current_app.get('bundleId', 'N/A')}") + print(f" Process ID: {current_app.get('pid', 'N/A')}") + else: + print("✗ WebDriverAgent is not running") + print("\nPlease start WebDriverAgent on your iOS device:") + print(" 1. Open WebDriverAgent.xcodeproj in Xcode") + print(" 2. Select your device") + print(" 3. Run WebDriverAgentRunner (Product > Test or Cmd+U)") + print(f" 4. For USB: Run port forwarding: iproxy 8100 8100") + + return True + + return False + + +def main(): + """Main entry point.""" + args = parse_args() + + # Handle --list-apps (no system check needed) + if args.list_apps: + print("Supported iOS apps:") + print("\nNote: For iOS apps, Bundle IDs are configured in:") + print(" phone_agent/config/apps_ios.py") + print("\nCurrently configured apps:") + for app in sorted(list_supported_apps()): + print(f" - {app}") + print( + "\nTo add iOS apps, find the Bundle ID and add to APP_PACKAGES_IOS dictionary." + ) + return + + # Handle device commands (these may need partial system checks) + if handle_device_commands(args): + return + + # Run system requirements check before proceeding + if not check_system_requirements(wda_url=args.wda_url): + sys.exit(1) + + # Check model API connectivity and model availability + # if not check_model_api(args.base_url, args.api_key, args.model): + # sys.exit(1) + + # Create configurations + model_config = ModelConfig( + base_url=args.base_url, + model_name=args.model, + api_key=args.api_key + ) + + agent_config = IOSAgentConfig( + max_steps=args.max_steps, + wda_url=args.wda_url, + device_id=args.device_id, + verbose=not args.quiet, + lang=args.lang, + ) + + # Create iOS agent + agent = IOSPhoneAgent( + model_config=model_config, + agent_config=agent_config, + ) + + # Print header + print("=" * 50) + print("Phone Agent iOS - AI-powered iOS automation") + print("=" * 50) + print(f"Model: {model_config.model_name}") + print(f"Base URL: {model_config.base_url}") + print(f"WDA URL: {args.wda_url}") + print(f"Max Steps: {agent_config.max_steps}") + print(f"Language: {agent_config.lang}") + + # Show device info + devices = list_devices() + if agent_config.device_id: + print(f"Device: {agent_config.device_id}") + elif devices: + device = devices[0] + print(f"Device: {device.device_name or device.device_id[:16]}") + print(f" {device.model}, iOS {device.ios_version}") + + print("=" * 50) + + # Run with provided task or enter interactive mode + if args.task: + print(f"\nTask: {args.task}\n") + result = agent.run(args.task) + print(f"\nResult: {result}") + else: + # Interactive mode + print("\nEntering interactive mode. Type 'quit' to exit.\n") + + while True: + try: + task = input("Enter your task: ").strip() + + if task.lower() in ("quit", "exit", "q"): + print("Goodbye!") + break + + if not task: + continue + + print() + result = agent.run(task) + print(f"\nResult: {result}\n") + agent.reset() + + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + break + except Exception as e: + print(f"\nError: {e}\n") + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py old mode 100644 new mode 100755 diff --git a/phone_agent/__init__.py b/phone_agent/__init__.py index 0bb1fb2..f39d7e0 100644 --- a/phone_agent/__init__.py +++ b/phone_agent/__init__.py @@ -1,11 +1,12 @@ """ Phone Agent - An AI-powered phone automation framework. -This package provides tools for automating Android phone interactions +This package provides tools for automating Android and iOS phone interactions using AI models for visual understanding and decision making. """ from phone_agent.agent import PhoneAgent +from phone_agent.agent_ios import IOSPhoneAgent __version__ = "0.1.0" -__all__ = ["PhoneAgent"] +__all__ = ["PhoneAgent", "IOSPhoneAgent"] diff --git a/phone_agent/actions/handler_ios.py b/phone_agent/actions/handler_ios.py new file mode 100644 index 0000000..c37f50d --- /dev/null +++ b/phone_agent/actions/handler_ios.py @@ -0,0 +1,280 @@ +"""Action handler for iOS automation using WebDriverAgent.""" + +import time +from dataclasses import dataclass +from typing import Any, Callable + +from phone_agent.xctest import ( + back, + double_tap, + home, + launch_app, + long_press, + swipe, + tap, +) +from phone_agent.xctest.input import clear_text, hide_keyboard, type_text + + +@dataclass +class ActionResult: + """Result of an action execution.""" + + success: bool + should_finish: bool + message: str | None = None + requires_confirmation: bool = False + + +class IOSActionHandler: + """ + Handles execution of actions from AI model output for iOS devices. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + confirmation_callback: Optional callback for sensitive action confirmation. + Should return True to proceed, False to cancel. + takeover_callback: Optional callback for takeover requests (login, captcha). + """ + + def __init__( + self, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + confirmation_callback: Callable[[str], bool] | None = None, + takeover_callback: Callable[[str], None] | None = None, + ): + self.wda_url = wda_url + self.session_id = session_id + self.confirmation_callback = confirmation_callback or self._default_confirmation + self.takeover_callback = takeover_callback or self._default_takeover + + def execute( + self, action: dict[str, Any], screen_width: int, screen_height: int + ) -> ActionResult: + """ + Execute an action from the AI model. + + Args: + action: The action dictionary from the model. + screen_width: Current screen width in pixels. + screen_height: Current screen height in pixels. + + Returns: + ActionResult indicating success and whether to finish. + """ + action_type = action.get("_metadata") + + if action_type == "finish": + return ActionResult( + success=True, should_finish=True, message=action.get("message") + ) + + if action_type != "do": + return ActionResult( + success=False, + should_finish=True, + message=f"Unknown action type: {action_type}", + ) + + action_name = action.get("action") + handler_method = self._get_handler(action_name) + + if handler_method is None: + return ActionResult( + success=False, + should_finish=False, + message=f"Unknown action: {action_name}", + ) + + try: + return handler_method(action, screen_width, screen_height) + except Exception as e: + return ActionResult( + success=False, should_finish=False, message=f"Action failed: {e}" + ) + + def _get_handler(self, action_name: str) -> Callable | None: + """Get the handler method for an action.""" + handlers = { + "Launch": self._handle_launch, + "Tap": self._handle_tap, + "Type": self._handle_type, + "Type_Name": self._handle_type, + "Swipe": self._handle_swipe, + "Back": self._handle_back, + "Home": self._handle_home, + "Double Tap": self._handle_double_tap, + "Long Press": self._handle_long_press, + "Wait": self._handle_wait, + "Take_over": self._handle_takeover, + "Note": self._handle_note, + "Call_API": self._handle_call_api, + "Interact": self._handle_interact, + } + return handlers.get(action_name) + + def _convert_relative_to_absolute( + self, element: list[int], screen_width: int, screen_height: int + ) -> tuple[int, int]: + """Convert relative coordinates (0-1000) to absolute pixels.""" + x = int(element[0] / 1000 * screen_width) + y = int(element[1] / 1000 * screen_height) + return x, y + + def _handle_launch(self, action: dict, width: int, height: int) -> ActionResult: + """Handle app launch action.""" + app_name = action.get("app") + if not app_name: + return ActionResult(False, False, "No app name specified") + + success = launch_app( + app_name, wda_url=self.wda_url, session_id=self.session_id + ) + if success: + return ActionResult(True, False) + return ActionResult(False, False, f"App not found: {app_name}") + + def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult: + """Handle tap action.""" + element = action.get("element") + if not element: + return ActionResult(False, False, "No element coordinates") + + x, y = self._convert_relative_to_absolute(element, width, height) + + print(f"Physically tap on ({x}, {y})") + + # Check for sensitive operation + if "message" in action: + if not self.confirmation_callback(action["message"]): + return ActionResult( + success=False, + should_finish=True, + message="User cancelled sensitive operation", + ) + + tap(x, y, wda_url=self.wda_url, session_id=self.session_id) + return ActionResult(True, False) + + def _handle_type(self, action: dict, width: int, height: int) -> ActionResult: + """Handle text input action.""" + text = action.get("text", "") + + # Clear existing text and type new text + clear_text(wda_url=self.wda_url, session_id=self.session_id) + time.sleep(0.5) + + type_text(text, wda_url=self.wda_url, session_id=self.session_id) + time.sleep(0.5) + + # Hide keyboard after typing + hide_keyboard(wda_url=self.wda_url, session_id=self.session_id) + time.sleep(0.5) + + return ActionResult(True, False) + + def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult: + """Handle swipe action.""" + start = action.get("start") + end = action.get("end") + + if not start or not end: + return ActionResult(False, False, "Missing swipe coordinates") + + start_x, start_y = self._convert_relative_to_absolute(start, width, height) + end_x, end_y = self._convert_relative_to_absolute(end, width, height) + + print(f"Physically scroll from ({start_x}, {start_y}) to ({end_x}, {end_y})") + + swipe( + start_x, + start_y, + end_x, + end_y, + wda_url=self.wda_url, + session_id=self.session_id, + ) + return ActionResult(True, False) + + def _handle_back(self, action: dict, width: int, height: int) -> ActionResult: + """Handle back gesture (swipe from left edge).""" + back(wda_url=self.wda_url, session_id=self.session_id) + return ActionResult(True, False) + + def _handle_home(self, action: dict, width: int, height: int) -> ActionResult: + """Handle home button action.""" + home(wda_url=self.wda_url, session_id=self.session_id) + return ActionResult(True, False) + + def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult: + """Handle double tap action.""" + element = action.get("element") + if not element: + return ActionResult(False, False, "No element coordinates") + + x, y = self._convert_relative_to_absolute(element, width, height) + double_tap(x, y, wda_url=self.wda_url, session_id=self.session_id) + return ActionResult(True, False) + + def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult: + """Handle long press action.""" + element = action.get("element") + if not element: + return ActionResult(False, False, "No element coordinates") + + x, y = self._convert_relative_to_absolute(element, width, height) + long_press( + x, + y, + duration=3.0, + wda_url=self.wda_url, + session_id=self.session_id, + ) + return ActionResult(True, False) + + def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult: + """Handle wait action.""" + duration_str = action.get("duration", "1 seconds") + try: + duration = float(duration_str.replace("seconds", "").strip()) + except ValueError: + duration = 1.0 + + time.sleep(duration) + return ActionResult(True, False) + + def _handle_takeover(self, action: dict, width: int, height: int) -> ActionResult: + """Handle takeover request (login, captcha, etc.).""" + message = action.get("message", "User intervention required") + self.takeover_callback(message) + return ActionResult(True, False) + + def _handle_note(self, action: dict, width: int, height: int) -> ActionResult: + """Handle note action (placeholder for content recording).""" + # This action is typically used for recording page content + # Implementation depends on specific requirements + return ActionResult(True, False) + + def _handle_call_api(self, action: dict, width: int, height: int) -> ActionResult: + """Handle API call action (placeholder for summarization).""" + # This action is typically used for content summarization + # Implementation depends on specific requirements + return ActionResult(True, False) + + def _handle_interact(self, action: dict, width: int, height: int) -> ActionResult: + """Handle interaction request (user choice needed).""" + # This action signals that user input is needed + return ActionResult(True, False, message="User interaction required") + + @staticmethod + def _default_confirmation(message: str) -> bool: + """Default confirmation callback using console input.""" + response = input(f"Sensitive operation: {message}\nConfirm? (Y/N): ") + return response.upper() == "Y" + + @staticmethod + def _default_takeover(message: str) -> None: + """Default takeover callback using console input.""" + input(f"{message}\nPress Enter after completing manual operation...") diff --git a/phone_agent/agent_ios.py b/phone_agent/agent_ios.py new file mode 100644 index 0000000..a3b20d9 --- /dev/null +++ b/phone_agent/agent_ios.py @@ -0,0 +1,277 @@ +"""iOS PhoneAgent class for orchestrating iOS phone automation.""" + +import json +import traceback +from dataclasses import dataclass +from typing import Any, Callable + +from phone_agent.actions.handler import do, finish, parse_action +from phone_agent.actions.handler_ios import IOSActionHandler +from phone_agent.config import get_messages, get_system_prompt +from phone_agent.model import ModelClient, ModelConfig +from phone_agent.model.client import MessageBuilder +from phone_agent.xctest import XCTestConnection, get_current_app, get_screenshot + + +@dataclass +class IOSAgentConfig: + """Configuration for the iOS PhoneAgent.""" + + max_steps: int = 100 + wda_url: str = "http://localhost:8100" + session_id: str | None = None + device_id: str | None = None # iOS device UDID + lang: str = "cn" + system_prompt: str | None = None + verbose: bool = True + + def __post_init__(self): + if self.system_prompt is None: + self.system_prompt = get_system_prompt(self.lang) + + +@dataclass +class StepResult: + """Result of a single agent step.""" + + success: bool + finished: bool + action: dict[str, Any] | None + thinking: str + message: str | None = None + + +class IOSPhoneAgent: + """ + AI-powered agent for automating iOS phone interactions. + + The agent uses a vision-language model to understand screen content + and decide on actions to complete user tasks via WebDriverAgent. + + Args: + model_config: Configuration for the AI model. + agent_config: Configuration for the iOS agent behavior. + confirmation_callback: Optional callback for sensitive action confirmation. + takeover_callback: Optional callback for takeover requests. + + Example: + >>> from phone_agent.agent_ios import IOSPhoneAgent, IOSAgentConfig + >>> from phone_agent.model import ModelConfig + >>> + >>> model_config = ModelConfig(base_url="http://localhost:8000/v1") + >>> agent_config = IOSAgentConfig(wda_url="http://localhost:8100") + >>> agent = IOSPhoneAgent(model_config, agent_config) + >>> agent.run("Open Safari and search for Apple") + """ + + def __init__( + self, + model_config: ModelConfig | None = None, + agent_config: IOSAgentConfig | None = None, + confirmation_callback: Callable[[str], bool] | None = None, + takeover_callback: Callable[[str], None] | None = None, + ): + self.model_config = model_config or ModelConfig() + self.agent_config = agent_config or IOSAgentConfig() + + self.model_client = ModelClient(self.model_config) + + # Initialize WDA connection and create session if needed + self.wda_connection = XCTestConnection(wda_url=self.agent_config.wda_url) + + # Auto-create session if not provided + if self.agent_config.session_id is None: + success, session_id = self.wda_connection.start_wda_session() + if success and session_id != "session_started": + self.agent_config.session_id = session_id + if self.agent_config.verbose: + print(f"✅ Created WDA session: {session_id}") + elif self.agent_config.verbose: + print(f"⚠️ Using default WDA session (no explicit session ID)") + + self.action_handler = IOSActionHandler( + wda_url=self.agent_config.wda_url, + session_id=self.agent_config.session_id, + confirmation_callback=confirmation_callback, + takeover_callback=takeover_callback, + ) + + self._context: list[dict[str, Any]] = [] + self._step_count = 0 + + def run(self, task: str) -> str: + """ + Run the agent to complete a task. + + Args: + task: Natural language description of the task. + + Returns: + Final message from the agent. + """ + self._context = [] + self._step_count = 0 + + # First step with user prompt + result = self._execute_step(task, is_first=True) + + if result.finished: + return result.message or "Task completed" + + # Continue until finished or max steps reached + while self._step_count < self.agent_config.max_steps: + result = self._execute_step(is_first=False) + + if result.finished: + return result.message or "Task completed" + + return "Max steps reached" + + def step(self, task: str | None = None) -> StepResult: + """ + Execute a single step of the agent. + + Useful for manual control or debugging. + + Args: + task: Task description (only needed for first step). + + Returns: + StepResult with step details. + """ + is_first = len(self._context) == 0 + + if is_first and not task: + raise ValueError("Task is required for the first step") + + return self._execute_step(task, is_first) + + def reset(self) -> None: + """Reset the agent state for a new task.""" + self._context = [] + self._step_count = 0 + + def _execute_step( + self, user_prompt: str | None = None, is_first: bool = False + ) -> StepResult: + """Execute a single step of the agent loop.""" + self._step_count += 1 + + # Capture current screen state + screenshot = get_screenshot( + wda_url=self.agent_config.wda_url, + session_id=self.agent_config.session_id, + device_id=self.agent_config.device_id, + ) + current_app = get_current_app( + wda_url=self.agent_config.wda_url, session_id=self.agent_config.session_id + ) + + # Build messages + if is_first: + self._context.append( + MessageBuilder.create_system_message(self.agent_config.system_prompt) + ) + + screen_info = MessageBuilder.build_screen_info(current_app) + text_content = f"{user_prompt}\n\n{screen_info}" + + self._context.append( + MessageBuilder.create_user_message( + text=text_content, image_base64=screenshot.base64_data + ) + ) + else: + screen_info = MessageBuilder.build_screen_info(current_app) + text_content = f"** Screen Info **\n\n{screen_info}" + + self._context.append( + MessageBuilder.create_user_message( + text=text_content, image_base64=screenshot.base64_data + ) + ) + + # Get model response + try: + response = self.model_client.request(self._context) + except Exception as e: + if self.agent_config.verbose: + traceback.print_exc() + return StepResult( + success=False, + finished=True, + action=None, + thinking="", + message=f"Model error: {e}", + ) + + # Parse action from response + try: + action = parse_action(response.action) + except ValueError: + if self.agent_config.verbose: + traceback.print_exc() + action = finish(message=response.action) + + if self.agent_config.verbose: + # Print thinking process + msgs = get_messages(self.agent_config.lang) + print("\n" + "=" * 50) + print(f"💭 {msgs['thinking']}:") + print("-" * 50) + print(response.thinking) + print("-" * 50) + print(f"🎯 {msgs['action']}:") + print(json.dumps(action, ensure_ascii=False, indent=2)) + print("=" * 50 + "\n") + + # Remove image from context to save space + self._context[-1] = MessageBuilder.remove_images_from_message(self._context[-1]) + + # Execute action + try: + result = self.action_handler.execute( + action, screenshot.width, screenshot.height + ) + except Exception as e: + if self.agent_config.verbose: + traceback.print_exc() + result = self.action_handler.execute( + finish(message=str(e)), screenshot.width, screenshot.height + ) + + # Add assistant response to context + self._context.append( + MessageBuilder.create_assistant_message( + f"{response.thinking}{response.action}" + ) + ) + + # Check if finished + finished = action.get("_metadata") == "finish" or result.should_finish + + if finished and self.agent_config.verbose: + msgs = get_messages(self.agent_config.lang) + print("\n" + "🎉 " + "=" * 48) + print( + f"✅ {msgs['task_completed']}: {result.message or action.get('message', msgs['done'])}" + ) + print("=" * 50 + "\n") + + return StepResult( + success=result.success, + finished=finished, + action=action, + thinking=response.thinking, + message=result.message or action.get("message"), + ) + + @property + def context(self) -> list[dict[str, Any]]: + """Get the current conversation context.""" + return self._context.copy() + + @property + def step_count(self) -> int: + """Get the current step count.""" + return self._step_count diff --git a/phone_agent/config/__init__.py b/phone_agent/config/__init__.py index 1359f9e..e9dc406 100644 --- a/phone_agent/config/__init__.py +++ b/phone_agent/config/__init__.py @@ -1,6 +1,7 @@ """Configuration module for Phone Agent.""" from phone_agent.config.apps import APP_PACKAGES +from phone_agent.config.apps_ios import APP_PACKAGES_IOS from phone_agent.config.i18n import get_message, get_messages from phone_agent.config.prompts_en import SYSTEM_PROMPT as SYSTEM_PROMPT_EN from phone_agent.config.prompts_zh import SYSTEM_PROMPT as SYSTEM_PROMPT_ZH @@ -26,6 +27,7 @@ SYSTEM_PROMPT = SYSTEM_PROMPT_ZH __all__ = [ "APP_PACKAGES", + "APP_PACKAGES_IOS", "SYSTEM_PROMPT", "SYSTEM_PROMPT_ZH", "SYSTEM_PROMPT_EN", diff --git a/phone_agent/config/apps.py b/phone_agent/config/apps.py index 0db93e1..e4a3e5e 100644 --- a/phone_agent/config/apps.py +++ b/phone_agent/config/apps.py @@ -224,4 +224,4 @@ def list_supported_apps() -> list[str]: Returns: List of app names. """ - return list(APP_PACKAGES.keys()) + return list(APP_PACKAGES.keys()) \ No newline at end of file diff --git a/phone_agent/config/apps_ios.py b/phone_agent/config/apps_ios.py new file mode 100644 index 0000000..b653032 --- /dev/null +++ b/phone_agent/config/apps_ios.py @@ -0,0 +1,339 @@ +"""App name to iOS bundle ID mapping for supported applications. + +Based on iOS app bundle ID conventions and common iOS applications. +Bundle IDs are in the format: com.company.appName +""" + +APP_PACKAGES_IOS: dict[str, str] = { + # Tencent Apps (腾讯系) + "微信": "com.tencent.xin", + "企业微信": "com.tencent.ww", + "微信读书": "com.tencent.weread", + "微信听书": "com.tencent.wehear", + "QQ": "com.tencent.mqq", + "QQ音乐": "com.tencent.QQMusic", + "QQ阅读": "com.tencent.qqreaderiphone", + "QQ邮箱": "com.tencent.qqmail", + "QQ浏览器": "com.tencent.mttlite", + "TIM": "com.tencent.tim", + "微视": "com.tencent.microvision", + "腾讯新闻": "com.tencent.info", + "腾讯视频": "com.tencent.live4iphone", + "腾讯动漫": "com.tencent.ied.app.comic", + "腾讯微云": "com.tencent.weiyun", + "腾讯体育": "com.tencent.sportskbs", + "腾讯文档": "com.tencent.txdocs", + "腾讯翻译君": "com.tencent.qqtranslator", + "腾讯课堂": "com.tencent.edu", + "腾讯地图": "com.tencent.sosomap", + "小鹅拼拼": "com.tencent.dwdcoco", + "全民k歌": "com.tencent.QQKSong", + # Alibaba Apps (阿里系) + "支付宝": "com.alipay.iphoneclient", + "钉钉": "com.laiwang.DingTalk", + "闲鱼": "com.taobao.fleamarket", + "淘宝": "com.taobao.taobao4iphone", + "斗鱼": "tv.douyu.live", + "天猫": "com.taobao.tmall", + "口碑": "com.taobao.kbmeishi", + "饿了么": "me.ele.ios.eleme", + "高德地图": "com.autonavi.amap", + "UC浏览器": "com.ucweb.iphone.lowversion", + "一淘": "com.taobao.etaocoupon", + "飞猪": "com.taobao.travel", + "虾米音乐": "com.xiami.spark", + "淘票票": "com.taobao.movie.MoviePhoneClient", + "优酷": "com.youku.YouKu", + "菜鸟裹裹": "com.cainiao.cnwireless", + "土豆视频": "com.tudou.tudouiphone", + # ByteDance Apps (字节系) + "抖音": "com.ss.iphone.ugc.Aweme", + "抖音极速版": "com.ss.iphone.ugc.aweme.lite", + "抖音火山版": "com.ss.iphone.ugc.Live", + "Tiktok": "com.zhiliaoapp.musically", + "飞书": "com.bytedance.ee.lark", + "今日头条": "com.ss.iphone.article.News", + "西瓜视频": "com.ss.iphone.article.Video", + "皮皮虾": "com.bd.iphone.super", + # Meituan Apps (美团系) + "美团": "com.meituan.imeituan", + "美团外卖": "com.meituan.itakeaway", + "大众点评": "com.dianping.dpscope", + "美团优选": "com.meituan.iyouxuan", + "美团优选团长": "com.meituan.igrocery.gh", + "美团骑手": "com.meituan.banma.homebrew", + "美团开店宝": "com.meituan.imerchantbiz", + "美团拍店": "com.meituan.pai", + "美团众包": "com.meituan.banma.crowdsource", + "美团买菜": "com.baobaoaichi.imaicai", + # JD Apps (京东系) + "京东": "com.360buy.jdmobile", + "京东读书": "com.jd.reader", + # NetEase Apps (网易系) + "网易新闻": "com.netease.news", + "网易云音乐": "com.netease.cloudmusic", + "网易邮箱大师": "com.netease.macmail", + "网易严选": "com.netease.yanxuan", + "网易公开课": "com.netease.videoHD", + "网易有道词典": "youdaoPro", + "有道云笔记": "com.youdao.note.YoudaoNoteMac", + # Baidu Apps (百度系) + "百度": "com.baidu.BaiduMobile", + "百度网盘": "com.baidu.netdisk", + "百度贴吧": "com.baidu.tieba", + "百度地图": "com.baidu.map", + "百度阅读": "com.baidu.yuedu", + "百度翻译": "com.baidu.translate", + "百度文库": "com.baidu.Wenku", + "百度视频": "com.baidu.videoiphone", + "百度输入法": "com.baidu.inputMethod", + # Kuaishou Apps (快手系) + "快手": "com.jiangjia.gif", + "快手极速版": "com.kuaishou.nebula", + # Other Popular Apps + "哔哩哔哩": "tv.danmaku.bilianime", + "芒果TV": "com.hunantv.imgotv", + "苏宁易购": "SuningEMall", + "微博": "com.sina.weibo", + "微博极速版": "com.sina.weibolite", + "微博国际": "com.weibo.international", + "墨客": "com.moke.moke.iphone", + "豆瓣": "com.douban.frodo", + "知乎": "com.zhihu.ios", + "小红书": "com.xingin.discover", + "喜马拉雅": "com.gemd.iting", + "得到": "com.luojilab.LuoJiFM-IOS", + "得物": "com.siwuai.duapp", + "起点读书": "m.qidian.QDReaderAppStore", + "番茄小说": "com.dragon.read", + "书旗小说": "com.shuqicenter.reader", + "拼多多": "com.xunmeng.pinduoduo", + "多点": "com.dmall.dmall", + "便利蜂": "com.bianlifeng.customer.ios", + "亿通行": "com.ruubypay.yitongxing", + "云闪付": "com.unionpay.chsp", + "大都会Metro": "com.DDH.SHSubway", + "爱奇艺视频": "com.qiyi.iphone", + "搜狐视频": "com.sohu.iPhoneVideo", + "搜狐新闻": "com.sohu.newspaper", + "搜狗浏览器": "com.sogou.SogouExplorerMobile", + "虎牙": "com.yy.kiwi", + "比心": "com.yitan.bixin", + "转转": "com.wuba.zhuanzhuan", + "YY": "yyvoice", + "绿洲": "com.sina.oasis", + "陌陌": "com.wemomo.momoappdemo1", + "什么值得买": "com.smzdm.client.ios", + "美团秀秀": "com.meitu.mtxx", + "唯品会": "com.vipshop.iphone", + "唱吧": "com.changba.ktv", + "酷狗音乐": "com.kugou.kugou1002", + "CSDN": "net.csdn.CsdnPlus", + "多抓鱼": "com.duozhuyu.dejavu", + "自如": "com.ziroom.ZiroomProject", + "携程": "ctrip.com", + "去哪儿旅行": "com.qunar.iphoneclient8", + "Xmind": "net.xmind.brownieapp", + "印象笔记": "com.yinxiang.iPhone", + "欧陆词典": "eusoft.eudic.pro", + "115": "com.115.personal", + "名片全能王": "com.intsig.camcard.lite", + "中国银行": "com.boc.BOCMBCI", + "58同城": "com.taofang.iphone", + # International Apps + "Google Chrome": "com.google.chrome.ios", + "Gmail": "com.google.Gmail", + "Facebook": "com.facebook.Facebook", + "Firefox": "org.mozilla.ios.Firefox", + "Messenger": "com.facebook.Messenger", + "Instagram": "com.burbn.instagram", + "Starbucks": "com.starbucks.mystarbucks", + "Luckin Coffee": "com.bjlc.luckycoffee", + "Line": "jp.naver.line", + "Linkedin": "com.linkedin.LinkedIn", + "Dcard": "com.dcard.app.Dcard", + "Youtube": "com.google.ios.youtube", + "Spotify": "com.spotify.client", + "Netflix": "com.netflix.Netflix", + "Twitter": "com.atebits.Tweetie2", + "WhatsApp": "net.whatsapp.WhatsApp", + # Apple Native Apps (Apple 原生应用) + "Safari": "com.apple.mobilesafari", + "App Store": "com.apple.AppStore", + "设置": "com.apple.Preferences", + "相机": "com.apple.camera", + "照片": "com.apple.mobileslideshow", + "时钟": "com.apple.mobiletimer", + "闹钟": "com.apple.mobiletimer", + "备忘录": "com.apple.mobilenotes", + "提醒事项": "com.apple.reminders", + "快捷指令": "com.apple.shortcuts", + "天气": "com.apple.weather", + "日历": "com.apple.mobilecal", + "地图": "com.apple.Maps", + "电话": "com.apple.mobilephone", + "通讯录": "com.apple.MobileAddressBook", + "信息": "com.apple.MobileSMS", + "Facetime": "com.apple.facetime", + "FaceTime": "com.apple.facetime", + "计算器": "com.apple.calculator", + "家庭": "com.apple.Home", + "健康": "com.apple.Health", + "钱包": "com.apple.Passbook", + "股市": "com.apple.stocks", + "图书": "com.apple.iBooks", + "新闻": "com.apple.news", + "视频": "com.apple.tv", + "文件": "com.apple.DocumentsApp", + "邮件": "com.apple.mobilemail", + "查找": "com.apple.findmy", + "翻译": "com.apple.Translate", + "音乐": "com.apple.Music", + "播客": "com.apple.podcasts", + "库乐队": "com.apple.mobilegarageband", + "语音备忘录": "com.apple.VoiceMemos", + "iMovie": "com.apple.iMovie", + "Watch": "com.apple.Bridge", + "Apple Store": "com.apple.store.Jolly", + "TestFlight": "com.apple.TestFlight", + "Keynote": "com.apple.Keynote", + "Keynote 讲演": "com.apple.Keynote", +} + + +def get_bundle_id(app_name: str) -> str | None: + """ + Get the iOS bundle ID for an app. + + Args: + app_name: The display name of the app. + + Returns: + The iOS bundle ID, or None if not found. + """ + return APP_PACKAGES_IOS.get(app_name) + + +def get_app_name(bundle_id: str) -> str | None: + """ + Get the app name from an iOS bundle ID. + + Args: + bundle_id: The iOS bundle ID. + + Returns: + The display name of the app, or None if not found. + """ + for name, bid in APP_PACKAGES_IOS.items(): + if bid == bundle_id: + return name + return None + + +def list_supported_apps() -> list[str]: + """ + Get a list of all supported iOS app names. + + Returns: + List of app names. + """ + return list(APP_PACKAGES_IOS.keys()) + + +def check_app_installed(app_name: str, wda_url: str = "http://localhost:8100") -> bool: + """ + Check if an app is installed on the iOS device. + + Args: + app_name: The display name of the app. + wda_url: WebDriverAgent URL. + + Returns: + True if app is installed, False otherwise. + + Note: + This uses the iTunes API to get app information. For actual + installation check on device, you would need to use WDA's + app listing capabilities or URL scheme checking. + """ + bundle_id = get_bundle_id(app_name) + if not bundle_id: + return False + + try: + import requests + + # Query iTunes API for app info + url = f"https://itunes.apple.com/lookup?bundleId={bundle_id}" + response = requests.get(url, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("resultCount", 0) > 0 + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error checking app installation: {e}") + + return False + + +def get_app_info_from_itunes(bundle_id: str) -> dict | None: + """ + Get app information from iTunes API using bundle ID. + + Args: + bundle_id: The iOS bundle ID. + + Returns: + Dictionary with app info (name, version, etc.) or None if not found. + """ + try: + import requests + + url = f"https://itunes.apple.com/lookup?bundleId={bundle_id}" + response = requests.get(url, timeout=10) + + if response.status_code == 200: + data = response.json() + results = data.get("results", []) + if results: + return results[0] + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error fetching app info: {e}") + + return None + + +def get_app_info_by_id(app_store_id: str) -> dict | None: + """ + Get app information from iTunes API using App Store ID. + + Args: + app_store_id: The numeric App Store ID (e.g., "414478124" for WeChat). + + Returns: + Dictionary with app info or None if not found. + """ + try: + import requests + + url = f"https://itunes.apple.com/lookup?id={app_store_id}" + response = requests.get(url, timeout=10) + + if response.status_code == 200: + data = response.json() + results = data.get("results", []) + if results: + return results[0] + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error fetching app info by ID: {e}") + + return None diff --git a/phone_agent/xctest/__init__.py b/phone_agent/xctest/__init__.py new file mode 100644 index 0000000..f9fad38 --- /dev/null +++ b/phone_agent/xctest/__init__.py @@ -0,0 +1,47 @@ +"""XCTest utilities for iOS device interaction via WebDriverAgent/XCUITest.""" + +from phone_agent.xctest.connection import ( + ConnectionType, + DeviceInfo, + XCTestConnection, + list_devices, + quick_connect, +) +from phone_agent.xctest.device import ( + back, + double_tap, + get_current_app, + home, + launch_app, + long_press, + swipe, + tap, +) +from phone_agent.xctest.input import ( + clear_text, + type_text, +) +from phone_agent.xctest.screenshot import get_screenshot + +__all__ = [ + # Screenshot + "get_screenshot", + # Input + "type_text", + "clear_text", + # Device control + "get_current_app", + "tap", + "swipe", + "back", + "home", + "double_tap", + "long_press", + "launch_app", + # Connection management + "XCTestConnection", + "DeviceInfo", + "ConnectionType", + "quick_connect", + "list_devices", +] diff --git a/phone_agent/xctest/connection.py b/phone_agent/xctest/connection.py new file mode 100644 index 0000000..deb2936 --- /dev/null +++ b/phone_agent/xctest/connection.py @@ -0,0 +1,382 @@ +"""iOS device connection management via idevice tools and WebDriverAgent.""" + +import subprocess +import time +from dataclasses import dataclass +from enum import Enum + + +class ConnectionType(Enum): + """Type of iOS connection.""" + + USB = "usb" + NETWORK = "network" + + +@dataclass +class DeviceInfo: + """Information about a connected iOS device.""" + + device_id: str # UDID + status: str + connection_type: ConnectionType + model: str | None = None + ios_version: str | None = None + device_name: str | None = None + + +class XCTestConnection: + """ + Manages connections to iOS devices via libimobiledevice and WebDriverAgent. + + Requires: + - libimobiledevice (idevice_id, ideviceinfo) + - WebDriverAgent running on the iOS device + - ios-deploy (optional, for app installation) + + Example: + >>> conn = XCTestConnection() + >>> # List connected devices + >>> devices = conn.list_devices() + >>> # Get device info + >>> info = conn.get_device_info() + >>> # Check if WDA is running + >>> is_ready = conn.is_wda_ready() + """ + + def __init__(self, wda_url: str = "http://localhost:8100"): + """ + Initialize iOS connection manager. + + Args: + wda_url: WebDriverAgent URL (default: http://localhost:8100). + For network devices, use http://:8100 + """ + self.wda_url = wda_url.rstrip("/") + + def list_devices(self) -> list[DeviceInfo]: + """ + List all connected iOS devices. + + Returns: + List of DeviceInfo objects. + + Note: + Requires libimobiledevice to be installed. + Install on macOS: brew install libimobiledevice + """ + try: + # Get list of device UDIDs + result = subprocess.run( + ["idevice_id", "-ln"], + capture_output=True, + text=True, + timeout=5, + ) + + devices = [] + for line in result.stdout.strip().split("\n"): + udid = line.strip() + if not udid: + continue + + # Determine connection type (network devices have specific format) + conn_type = ( + ConnectionType.NETWORK + if "-" in udid and len(udid) > 40 + else ConnectionType.USB + ) + + # Get detailed device info + device_info = self._get_device_details(udid) + + devices.append( + DeviceInfo( + device_id=udid, + status="connected", + connection_type=conn_type, + model=device_info.get("model"), + ios_version=device_info.get("ios_version"), + device_name=device_info.get("name"), + ) + ) + + return devices + + except FileNotFoundError: + print( + "Error: idevice_id not found. Install libimobiledevice: brew install libimobiledevice" + ) + return [] + except Exception as e: + print(f"Error listing devices: {e}") + return [] + + def _get_device_details(self, udid: str) -> dict[str, str]: + """ + Get detailed information about a specific device. + + Args: + udid: Device UDID. + + Returns: + Dictionary with device details. + """ + try: + result = subprocess.run( + ["ideviceinfo", "-u", udid], + capture_output=True, + text=True, + timeout=5, + ) + + info = {} + for line in result.stdout.split("\n"): + if ": " in line: + key, value = line.split(": ", 1) + key = key.strip() + value = value.strip() + + if key == "ProductType": + info["model"] = value + elif key == "ProductVersion": + info["ios_version"] = value + elif key == "DeviceName": + info["name"] = value + + return info + + except Exception: + return {} + + def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None: + """ + Get detailed information about a device. + + Args: + device_id: Device UDID. If None, uses first available device. + + Returns: + DeviceInfo or None if not found. + """ + devices = self.list_devices() + + if not devices: + return None + + if device_id is None: + return devices[0] + + for device in devices: + if device.device_id == device_id: + return device + + return None + + def is_connected(self, device_id: str | None = None) -> bool: + """ + Check if a device is connected. + + Args: + device_id: Device UDID to check. If None, checks if any device is connected. + + Returns: + True if connected, False otherwise. + """ + devices = self.list_devices() + + if not devices: + return False + + if device_id is None: + return len(devices) > 0 + + return any(d.device_id == device_id for d in devices) + + def is_wda_ready(self, timeout: int = 2) -> bool: + """ + Check if WebDriverAgent is running and accessible. + + Args: + timeout: Request timeout in seconds. + + Returns: + True if WDA is ready, False otherwise. + """ + try: + import requests + + response = requests.get( + f"{self.wda_url}/status", timeout=timeout, verify=False + ) + return response.status_code == 200 + except ImportError: + print( + "Error: requests library not found. Install it: pip install requests" + ) + return False + except Exception: + return False + + def start_wda_session(self) -> tuple[bool, str]: + """ + Start a new WebDriverAgent session. + + Returns: + Tuple of (success, session_id or error_message). + """ + try: + import requests + + response = requests.post( + f"{self.wda_url}/session", + json={"capabilities": {}}, + timeout=30, + verify=False, + ) + + if response.status_code in (200, 201): + data = response.json() + session_id = data.get("sessionId") or data.get("value", {}).get( + "sessionId" + ) + return True, session_id or "session_started" + else: + return False, f"Failed to start session: {response.text}" + + except ImportError: + return ( + False, + "requests library not found. Install it: pip install requests", + ) + except Exception as e: + return False, f"Error starting WDA session: {e}" + + def get_wda_status(self) -> dict | None: + """ + Get WebDriverAgent status information. + + Returns: + Status dictionary or None if not available. + """ + try: + import requests + + response = requests.get(f"{self.wda_url}/status", timeout=5, verify=False) + + if response.status_code == 200: + return response.json() + return None + + except Exception: + return None + + def pair_device(self, device_id: str | None = None) -> tuple[bool, str]: + """ + Pair with an iOS device (required for some operations). + + Args: + device_id: Device UDID. If None, uses first available device. + + Returns: + Tuple of (success, message). + """ + try: + cmd = ["idevicepair"] + if device_id: + cmd.extend(["-u", device_id]) + cmd.append("pair") + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + output = result.stdout + result.stderr + + if "SUCCESS" in output or "already paired" in output.lower(): + return True, "Device paired successfully" + else: + return False, output.strip() + + except FileNotFoundError: + return ( + False, + "idevicepair not found. Install libimobiledevice: brew install libimobiledevice", + ) + except Exception as e: + return False, f"Error pairing device: {e}" + + def get_device_name(self, device_id: str | None = None) -> str | None: + """ + Get the device name. + + Args: + device_id: Device UDID. If None, uses first available device. + + Returns: + Device name string or None if not found. + """ + try: + cmd = ["ideviceinfo"] + if device_id: + cmd.extend(["-u", device_id]) + cmd.extend(["-k", "DeviceName"]) + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) + + return result.stdout.strip() or None + + except Exception as e: + print(f"Error getting device name: {e}") + return None + + def restart_wda(self) -> tuple[bool, str]: + """ + Restart WebDriverAgent (requires manual restart on device). + + Returns: + Tuple of (success, message). + + Note: + This method only checks if WDA needs restart. + Actual restart requires re-running WDA on the device via Xcode or other means. + """ + if self.is_wda_ready(): + return True, "WDA is already running" + else: + return ( + False, + "WDA is not running. Please start it manually on the device.", + ) + + +def quick_connect(wda_url: str = "http://localhost:8100") -> tuple[bool, str]: + """ + Quick helper to check iOS device connection and WDA status. + + Args: + wda_url: WebDriverAgent URL. + + Returns: + Tuple of (success, message). + """ + conn = XCTestConnection(wda_url=wda_url) + + # Check if device is connected + if not conn.is_connected(): + return False, "No iOS device connected" + + # Check if WDA is ready + if not conn.is_wda_ready(): + return False, "WebDriverAgent is not running" + + return True, "iOS device connected and WDA ready" + + +def list_devices() -> list[DeviceInfo]: + """ + Quick helper to list connected iOS devices. + + Returns: + List of DeviceInfo objects. + """ + conn = XCTestConnection() + return conn.list_devices() diff --git a/phone_agent/xctest/device.py b/phone_agent/xctest/device.py new file mode 100644 index 0000000..49fc379 --- /dev/null +++ b/phone_agent/xctest/device.py @@ -0,0 +1,458 @@ +"""Device control utilities for iOS automation via WebDriverAgent.""" + +import subprocess +import time +from typing import Optional + +from phone_agent.config.apps_ios import APP_PACKAGES_IOS as APP_PACKAGES + +SCALE_FACTOR = 3 # 3 for most modern iPhone + +def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str: + """ + Get the correct WDA URL for a session endpoint. + + Args: + wda_url: Base WDA URL. + session_id: Optional session ID. + endpoint: The endpoint path. + + Returns: + Full URL for the endpoint. + """ + base = wda_url.rstrip("/") + if session_id: + return f"{base}/session/{session_id}/{endpoint}" + else: + # Try to use WDA endpoints without session when possible + return f"{base}/{endpoint}" + + +def get_current_app( + wda_url: str = "http://localhost:8100", session_id: str | None = None +) -> str: + """ + Get the currently active app bundle ID and name. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + + Returns: + The app name if recognized, otherwise "System Home". + """ + try: + import requests + + # Get active app info from WDA using activeAppInfo endpoint + response = requests.get( + f"{wda_url.rstrip('/')}/wda/activeAppInfo", timeout=5, verify=False + ) + + if response.status_code == 200: + data = response.json() + # Extract bundle ID from response + # Response format: {"value": {"bundleId": "com.apple.AppStore", "name": "", "pid": 825, "processArguments": {...}}, "sessionId": "..."} + value = data.get("value", {}) + bundle_id = value.get("bundleId", "") + + if bundle_id: + # Try to find app name from bundle ID + for app_name, package in APP_PACKAGES.items(): + if package == bundle_id: + return app_name + + return "System Home" + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error getting current app: {e}") + + return "System Home" + + +def tap( + x: int, + y: int, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Tap at the specified coordinates using WebDriver W3C Actions API. + + Args: + x: X coordinate. + y: Y coordinate. + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after tap. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "actions") + + # W3C WebDriver Actions API for tap/click + actions = { + "actions": [ + { + "type": "pointer", + "id": "finger1", + "parameters": {"pointerType": "touch"}, + "actions": [ + {"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR}, + {"type": "pointerDown", "button": 0}, + {"type": "pause", "duration": 0.1}, + {"type": "pointerUp", "button": 0}, + ], + } + ] + } + + requests.post(url, json=actions, timeout=15, verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error tapping: {e}") + + +def double_tap( + x: int, + y: int, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Double tap at the specified coordinates using WebDriver W3C Actions API. + + Args: + x: X coordinate. + y: Y coordinate. + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after double tap. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "actions") + + # W3C WebDriver Actions API for double tap + actions = { + "actions": [ + { + "type": "pointer", + "id": "finger1", + "parameters": {"pointerType": "touch"}, + "actions": [ + {"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR}, + {"type": "pointerDown", "button": 0}, + {"type": "pause", "duration": 100}, + {"type": "pointerUp", "button": 0}, + {"type": "pause", "duration": 100}, + {"type": "pointerDown", "button": 0}, + {"type": "pause", "duration": 100}, + {"type": "pointerUp", "button": 0}, + ], + } + ] + } + + requests.post(url, json=actions, timeout=10, verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error double tapping: {e}") + + +def long_press( + x: int, + y: int, + duration: float = 3.0, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Long press at the specified coordinates using WebDriver W3C Actions API. + + Args: + x: X coordinate. + y: Y coordinate. + duration: Duration of press in seconds. + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after long press. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "actions") + + # W3C WebDriver Actions API for long press + # Convert duration to milliseconds + duration_ms = int(duration * 1000) + + actions = { + "actions": [ + { + "type": "pointer", + "id": "finger1", + "parameters": {"pointerType": "touch"}, + "actions": [ + {"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR}, + {"type": "pointerDown", "button": 0}, + {"type": "pause", "duration": duration_ms}, + {"type": "pointerUp", "button": 0}, + ], + } + ] + } + + requests.post(url, json=actions, timeout=int(duration + 10), verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error long pressing: {e}") + + +def swipe( + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration: float | None = None, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Swipe from start to end coordinates using WDA dragfromtoforduration endpoint. + + Args: + start_x: Starting X coordinate. + start_y: Starting Y coordinate. + end_x: Ending X coordinate. + end_y: Ending Y coordinate. + duration: Duration of swipe in seconds (auto-calculated if None). + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after swipe. + """ + try: + import requests + + if duration is None: + # Calculate duration based on distance + dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2 + duration = dist_sq / 1000000 # Convert to seconds + duration = max(0.3, min(duration, 2.0)) # Clamp between 0.3-2 seconds + + url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration") + + # WDA dragfromtoforduration API payload + payload = { + "fromX": start_x / SCALE_FACTOR, + "fromY": start_y / SCALE_FACTOR, + "toX": end_x / SCALE_FACTOR, + "toY": end_y / SCALE_FACTOR, + "duration": duration, + } + + requests.post(url, json=payload, timeout=int(duration + 10), verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error swiping: {e}") + + +def back( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Navigate back (swipe from left edge). + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after navigation. + + Note: + iOS doesn't have a universal back button. This simulates a back gesture + by swiping from the left edge of the screen. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration") + + # Swipe from left edge to simulate back gesture + payload = { + "fromX": 0, + "fromY": 640, + "toX": 400, + "toY": 640, + "duration": 0.3, + } + + requests.post(url, json=payload, timeout=10, verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error performing back gesture: {e}") + + +def home( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Press the home button. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after pressing home. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/wda/homescreen" + + requests.post(url, timeout=10, verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error pressing home: {e}") + + +def launch_app( + app_name: str, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> bool: + """ + Launch an app by name. + + Args: + app_name: The app name (must be in APP_PACKAGES). + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after launching. + + Returns: + True if app was launched, False if app not found. + """ + if app_name not in APP_PACKAGES: + return False + + try: + import requests + + bundle_id = APP_PACKAGES[app_name] + url = _get_wda_session_url(wda_url, session_id, "wda/apps/launch") + + response = requests.post( + url, json={"bundleId": bundle_id}, timeout=10, verify=False + ) + + time.sleep(delay) + return response.status_code in (200, 201) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + return False + except Exception as e: + print(f"Error launching app: {e}") + return False + + +def get_screen_size( + wda_url: str = "http://localhost:8100", session_id: str | None = None +) -> tuple[int, int]: + """ + Get the screen dimensions. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + + Returns: + Tuple of (width, height). Returns (375, 812) as default if unable to fetch. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "window/size") + + response = requests.get(url, timeout=5, verify=False) + + if response.status_code == 200: + data = response.json() + value = data.get("value", {}) + width = value.get("width", 375) + height = value.get("height", 812) + return width, height + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error getting screen size: {e}") + + # Default iPhone screen size (iPhone X and later) + return 375, 812 + + +def press_button( + button_name: str, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 1.0, +) -> None: + """ + Press a physical button. + + Args: + button_name: Button name (e.g., "home", "volumeUp", "volumeDown"). + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after pressing. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/wda/pressButton" + + requests.post(url, json={"name": button_name}, timeout=10, verify=False) + + time.sleep(delay) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error pressing button: {e}") diff --git a/phone_agent/xctest/input.py b/phone_agent/xctest/input.py new file mode 100644 index 0000000..9c8d865 --- /dev/null +++ b/phone_agent/xctest/input.py @@ -0,0 +1,299 @@ +"""Input utilities for iOS device text input via WebDriverAgent.""" + +import time + + +def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str: + """ + Get the correct WDA URL for a session endpoint. + + Args: + wda_url: Base WDA URL. + session_id: Optional session ID. + endpoint: The endpoint path. + + Returns: + Full URL for the endpoint. + """ + base = wda_url.rstrip("/") + if session_id: + return f"{base}/session/{session_id}/{endpoint}" + else: + # Try to use WDA endpoints without session when possible + return f"{base}/{endpoint}" + + +def type_text( + text: str, + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + frequency: int = 60, +) -> None: + """ + Type text into the currently focused input field. + + Args: + text: The text to type. + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + frequency: Typing frequency (keys per minute). Default is 60. + + Note: + The input field must be focused before calling this function. + Use tap() to focus on the input field first. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "wda/keys") + + # Send text to WDA + response = requests.post( + url, json={"value": list(text), "frequency": frequency}, timeout=30, verify=False + ) + + if response.status_code not in (200, 201): + print(f"Warning: Text input may have failed. Status: {response.status_code}") + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error typing text: {e}") + + +def clear_text( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, +) -> None: + """ + Clear text in the currently focused input field. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + + Note: + This sends a clear command to the active element. + The input field must be focused before calling this function. + """ + try: + import requests + + # First, try to get the active element + url = _get_wda_session_url(wda_url, session_id, "element/active") + + response = requests.get(url, timeout=10, verify=False) + + if response.status_code == 200: + data = response.json() + element_id = data.get("value", {}).get("ELEMENT") or data.get("value", {}).get("element-6066-11e4-a52e-4f735466cecf") + + if element_id: + # Clear the element + clear_url = _get_wda_session_url(wda_url, session_id, f"element/{element_id}/clear") + requests.post(clear_url, timeout=10, verify=False) + return + + # Fallback: send backspace commands + _clear_with_backspace(wda_url, session_id) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error clearing text: {e}") + + +def _clear_with_backspace( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + max_backspaces: int = 100, +) -> None: + """ + Clear text by sending backspace keys. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + max_backspaces: Maximum number of backspaces to send. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "wda/keys") + + # Send backspace character multiple times + backspace_char = "\u0008" # Backspace Unicode character + requests.post( + url, + json={"value": [backspace_char] * max_backspaces}, + timeout=10, + verify=False, + ) + + except Exception as e: + print(f"Error clearing with backspace: {e}") + + +def send_keys( + keys: list[str], + wda_url: str = "http://localhost:8100", + session_id: str | None = None, +) -> None: + """ + Send a sequence of keys. + + Args: + keys: List of keys to send. + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + + Example: + >>> send_keys(["H", "e", "l", "l", "o"]) + >>> send_keys(["\n"]) # Send enter key + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "wda/keys") + + requests.post(url, json={"value": keys}, timeout=10, verify=False) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error sending keys: {e}") + + +def press_enter( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + delay: float = 0.5, +) -> None: + """ + Press the Enter/Return key. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + delay: Delay in seconds after pressing enter. + """ + send_keys(["\n"], wda_url, session_id) + time.sleep(delay) + + +def hide_keyboard( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, +) -> None: + """ + Hide the on-screen keyboard. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/wda/keyboard/dismiss" + + requests.post(url, timeout=10, verify=False) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error hiding keyboard: {e}") + + +def is_keyboard_shown( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, +) -> bool: + """ + Check if the on-screen keyboard is currently shown. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + + Returns: + True if keyboard is shown, False otherwise. + """ + try: + import requests + + url = _get_wda_session_url(wda_url, session_id, "wda/keyboard/shown") + + response = requests.get(url, timeout=5, verify=False) + + if response.status_code == 200: + data = response.json() + return data.get("value", False) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception: + pass + + return False + + +def set_pasteboard( + text: str, + wda_url: str = "http://localhost:8100", +) -> None: + """ + Set the device pasteboard (clipboard) content. + + Args: + text: Text to set in pasteboard. + wda_url: WebDriverAgent URL. + + Note: + This can be useful for inputting large amounts of text. + After setting pasteboard, you can simulate paste gesture. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/wda/setPasteboard" + + requests.post( + url, json={"content": text, "contentType": "plaintext"}, timeout=10, verify=False + ) + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error setting pasteboard: {e}") + + +def get_pasteboard( + wda_url: str = "http://localhost:8100", +) -> str | None: + """ + Get the device pasteboard (clipboard) content. + + Args: + wda_url: WebDriverAgent URL. + + Returns: + Pasteboard content or None if failed. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/wda/getPasteboard" + + response = requests.post(url, timeout=10, verify=False) + + if response.status_code == 200: + data = response.json() + return data.get("value") + + except ImportError: + print("Error: requests library required. Install: pip install requests") + except Exception as e: + print(f"Error getting pasteboard: {e}") + + return None diff --git a/phone_agent/xctest/screenshot.py b/phone_agent/xctest/screenshot.py new file mode 100644 index 0000000..bbf4bdd --- /dev/null +++ b/phone_agent/xctest/screenshot.py @@ -0,0 +1,230 @@ +"""Screenshot utilities for capturing iOS device screen.""" + +import base64 +import os +import subprocess +import tempfile +import uuid +from dataclasses import dataclass +from io import BytesIO + +from PIL import Image + + +@dataclass +class Screenshot: + """Represents a captured screenshot.""" + + base64_data: str + width: int + height: int + is_sensitive: bool = False + + +def get_screenshot( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + device_id: str | None = None, + timeout: int = 10, +) -> Screenshot: + """ + Capture a screenshot from the connected iOS device. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + device_id: Optional device UDID (for idevicescreenshot fallback). + timeout: Timeout in seconds for screenshot operations. + + Returns: + Screenshot object containing base64 data and dimensions. + + Note: + Tries WebDriverAgent first, falls back to idevicescreenshot if available. + If both fail, returns a black fallback image. + """ + # Try WebDriverAgent first (preferred method) + screenshot = _get_screenshot_wda(wda_url, session_id, timeout) + if screenshot: + return screenshot + + # Fallback to idevicescreenshot + screenshot = _get_screenshot_idevice(device_id, timeout) + if screenshot: + return screenshot + + # Return fallback black image + return _create_fallback_screenshot(is_sensitive=False) + + +def _get_screenshot_wda( + wda_url: str, session_id: str | None, timeout: int +) -> Screenshot | None: + """ + Capture screenshot using WebDriverAgent. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + timeout: Timeout in seconds. + + Returns: + Screenshot object or None if failed. + """ + try: + import requests + + url = f"{wda_url.rstrip('/')}/screenshot" + + response = requests.get(url, timeout=timeout, verify=False) + + if response.status_code == 200: + data = response.json() + base64_data = data.get("value", "") + + if base64_data: + # Decode to get dimensions + img_data = base64.b64decode(base64_data) + img = Image.open(BytesIO(img_data)) + width, height = img.size + + return Screenshot( + base64_data=base64_data, + width=width, + height=height, + is_sensitive=False, + ) + + except ImportError: + print("Note: requests library not installed. Install: pip install requests") + except Exception as e: + print(f"WDA screenshot failed: {e}") + + return None + + +def _get_screenshot_idevice( + device_id: str | None, timeout: int +) -> Screenshot | None: + """ + Capture screenshot using idevicescreenshot (libimobiledevice). + + Args: + device_id: Optional device UDID. + timeout: Timeout in seconds. + + Returns: + Screenshot object or None if failed. + """ + try: + temp_path = os.path.join( + tempfile.gettempdir(), f"ios_screenshot_{uuid.uuid4()}.png" + ) + + cmd = ["idevicescreenshot"] + if device_id: + cmd.extend(["-u", device_id]) + cmd.append(temp_path) + + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout + ) + + if result.returncode == 0 and os.path.exists(temp_path): + # Read and encode image + img = Image.open(temp_path) + width, height = img.size + + buffered = BytesIO() + img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + + # Cleanup + os.remove(temp_path) + + return Screenshot( + base64_data=base64_data, width=width, height=height, is_sensitive=False + ) + + except FileNotFoundError: + print( + "Note: idevicescreenshot not found. Install: brew install libimobiledevice" + ) + except Exception as e: + print(f"idevicescreenshot failed: {e}") + + return None + + +def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot: + """ + Create a black fallback image when screenshot fails. + + Args: + is_sensitive: Whether the failure was due to sensitive content. + + Returns: + Screenshot object with black image. + """ + # Default iPhone screen size (iPhone 14 Pro) + default_width, default_height = 1179, 2556 + + black_img = Image.new("RGB", (default_width, default_height), color="black") + buffered = BytesIO() + black_img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + + return Screenshot( + base64_data=base64_data, + width=default_width, + height=default_height, + is_sensitive=is_sensitive, + ) + + +def save_screenshot( + screenshot: Screenshot, + file_path: str, +) -> bool: + """ + Save a screenshot to a file. + + Args: + screenshot: Screenshot object. + file_path: Path to save the screenshot. + + Returns: + True if successful, False otherwise. + """ + try: + img_data = base64.b64decode(screenshot.base64_data) + img = Image.open(BytesIO(img_data)) + img.save(file_path) + return True + except Exception as e: + print(f"Error saving screenshot: {e}") + return False + + +def get_screenshot_png( + wda_url: str = "http://localhost:8100", + session_id: str | None = None, + device_id: str | None = None, +) -> bytes | None: + """ + Get screenshot as PNG bytes. + + Args: + wda_url: WebDriverAgent URL. + session_id: Optional WDA session ID. + device_id: Optional device UDID. + + Returns: + PNG bytes or None if failed. + """ + screenshot = get_screenshot(wda_url, session_id, device_id) + + try: + return base64.b64decode(screenshot.base64_data) + except Exception: + return None diff --git a/requirements.txt b/requirements.txt index 16aa426..4381c20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,9 @@ Pillow>=12.0.0 openai>=2.9.0 +# For iOS Support +requests>=2.31.0 + # For Model Deployment ## After installing sglang or vLLM, please run pip install -U transformers again to upgrade to 5.0.0rc0.