From 780b756e213c5a806020900f120a3ad7d4bd64e6 Mon Sep 17 00:00:00 2001 From: liuyongbin Date: Fri, 19 Dec 2025 18:56:43 +0800 Subject: [PATCH] fix format --- main.py | 335 +++++++++++++++++++++++++++++----- phone_agent/device_factory.py | 41 ++++- 2 files changed, 326 insertions(+), 50 deletions(-) diff --git a/main.py b/main.py index 624d2bf..8cdc34b 100755 --- a/main.py +++ b/main.py @@ -24,23 +24,31 @@ from openai import OpenAI from phone_agent import PhoneAgent from phone_agent.agent import AgentConfig +from phone_agent.agent_ios import IOSAgentConfig, IOSPhoneAgent from phone_agent.config.apps import list_supported_apps from phone_agent.config.apps_harmonyos import list_supported_apps as list_harmonyos_apps +from phone_agent.config.apps_ios import list_supported_apps as list_ios_apps from phone_agent.device_factory import DeviceType, get_device_factory, set_device_type from phone_agent.model import ModelConfig +from phone_agent.xctest import XCTestConnection +from phone_agent.xctest import list_devices as list_ios_devices -def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: +def check_system_requirements( + device_type: DeviceType = DeviceType.ADB, wda_url: str = "http://localhost:8100" +) -> bool: """ Check system requirements before running the agent. Checks: - 1. ADB/HDC tools installed + 1. ADB/HDC/iOS tools installed 2. At least one device connected 3. ADB Keyboard installed on the device (for ADB only) + 4. WebDriverAgent running (for iOS only) Args: - device_type: Type of device tool (ADB or HDC). + device_type: Type of device tool (ADB, HDC, or IOS). + wda_url: WebDriverAgent URL (for iOS only). Returns: True if all checks pass, False otherwise. @@ -51,8 +59,12 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: all_passed = True # Determine tool name and command - tool_name = "ADB" if device_type == DeviceType.ADB else "HDC" - tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc" + if device_type == DeviceType.IOS: + tool_name = "libimobiledevice" + tool_cmd = "idevice_id" + else: + tool_name = "ADB" if device_type == DeviceType.ADB else "HDC" + tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc" # Check 1: Tool installed print(f"1. Checking {tool_name} installation...", end=" ") @@ -66,20 +78,31 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: print( " - Windows: Download from https://developer.android.com/studio/releases/platform-tools" ) - else: - print(" - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs") + elif device_type == DeviceType.HDC: + print( + " - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs" + ) print(" - Add to PATH environment variable") + else: # IOS + print(" - macOS: brew install libimobiledevice") + print(" - Linux: sudo apt-get install libimobiledevice-utils") all_passed = False else: # Double check by running version command try: - version_cmd = [tool_cmd, "version"] if device_type == DeviceType.ADB else [tool_cmd, "-v"] + if device_type == DeviceType.ADB: + version_cmd = [tool_cmd, "version"] + elif device_type == DeviceType.HDC: + version_cmd = [tool_cmd, "-v"] + else: # IOS + version_cmd = [tool_cmd, "-ln"] + result = subprocess.run( version_cmd, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: version_line = result.stdout.strip().split("\n")[0] - print(f"✅ OK ({version_line})") + print(f"✅ OK ({version_line if version_line else 'installed'})") else: print("❌ FAILED") print(f" Error: {tool_name} command failed to run.") @@ -108,13 +131,18 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: ) lines = result.stdout.strip().split("\n") # Filter out header and empty lines, look for 'device' status - devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line] - else: # HDC + devices = [ + line for line in lines[1:] if line.strip() and "\tdevice" in line + ] + elif device_type == DeviceType.HDC: result = subprocess.run( ["hdc", "list", "targets"], capture_output=True, text=True, timeout=10 ) lines = result.stdout.strip().split("\n") devices = [line for line in lines if line.strip()] + else: # IOS + ios_devices = list_ios_devices() + devices = [d.device_id for d in ios_devices] if not devices: print("❌ FAILED") @@ -123,18 +151,31 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: if device_type == DeviceType.ADB: print(" 1. Enable USB debugging on your Android device") print(" 2. Connect via USB and authorize the connection") - print(" 3. Or connect remotely: python main.py --connect :") - else: + print( + " 3. Or connect remotely: python main.py --connect :" + ) + elif device_type == DeviceType.HDC: print(" 1. Enable USB debugging on your HarmonyOS device") print(" 2. Connect via USB and authorize the connection") - print(" 3. Or connect remotely: python main.py --device-type hdc --connect :") + print( + " 3. Or connect remotely: python main.py --device-type hdc --connect :" + ) + else: # IOS + print(" 1. Connect your iOS device via USB") + print(" 2. Unlock device and tap 'Trust This Computer'") + print(" 3. Verify: idevice_id -l") + print(" 4. Or connect via WiFi using device IP") all_passed = False else: if device_type == DeviceType.ADB: device_ids = [d.split("\t")[0] for d in devices] - else: + elif device_type == DeviceType.HDC: device_ids = [d.strip() for d in devices] - print(f"✅ OK ({len(devices)} device(s): {', '.join(device_ids)})") + else: # IOS + device_ids = devices + print( + f"✅ OK ({len(devices)} device(s): {', '.join(device_ids[:2])}{'...' if len(device_ids) > 2 else ''})" + ) except subprocess.TimeoutExpired: print("❌ FAILED") print(f" Error: {tool_name} command timed out.") @@ -150,7 +191,7 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: print("❌ System check failed. Please fix the issues above.") return False - # Check 3: ADB Keyboard installed (only for ADB) + # Check 3: ADB Keyboard installed (only for ADB) or WebDriverAgent (for iOS) if device_type == DeviceType.ADB: print("3. Checking ADB Keyboard...", end=" ") try: @@ -185,10 +226,38 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: print("❌ FAILED") print(f" Error: {e}") all_passed = False - else: + elif device_type == DeviceType.HDC: # For HDC, skip keyboard check as it uses different input method print("3. Skipping keyboard check for HarmonyOS...", end=" ") print("✅ OK (using native input)") + else: # IOS + # Check WebDriverAgent + print(f"3. Checking WebDriverAgent ({wda_url})...", end=" ") + try: + conn = XCTestConnection(wda_url=wda_url) + + if conn.is_wda_ready(): + print("✅ OK") + # Get WDA status for additional info + status = conn.get_wda_status() + if status: + session_id = status.get("sessionId", "N/A") + print(f" Session ID: {session_id}") + else: + print("❌ FAILED") + print(" Error: WebDriverAgent is not running or not accessible.") + print(" Solution:") + print(" 1. Run WebDriverAgent on your iOS device via Xcode") + print(" 2. For USB: Set up port forwarding: iproxy 8100 8100") + print( + " 3. For WiFi: Use device IP, e.g., --wda-url http://192.168.1.100:8100" + ) + print(" 4. Verify in browser: open http://localhost:8100/status") + all_passed = False + except Exception as e: + print("❌ FAILED") + print(f" Error: {e}") + all_passed = False print("-" * 50) @@ -290,7 +359,7 @@ def parse_args() -> argparse.Namespace: formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Run with default settings + # Run with default settings (Android) python main.py # Specify model endpoint @@ -313,6 +382,22 @@ Examples: # List supported apps python main.py --list-apps + + # iOS specific examples + # Run with iOS device + python main.py --device-type ios "Open Safari and search for iPhone tips" + + # Use WiFi connection for iOS + python main.py --device-type ios --wda-url http://192.168.1.100:8100 + + # List connected iOS devices + python main.py --device-type ios --list-devices + + # Check WebDriverAgent status + python main.py --device-type ios --wda-status + + # Pair with iOS device + python main.py --device-type ios --pair """, ) @@ -384,6 +469,26 @@ Examples: help="Enable TCP/IP debugging on USB device (default port: 5555)", ) + # iOS specific options + parser.add_argument( + "--wda-url", + type=str, + default=os.getenv("PHONE_AGENT_WDA_URL", "http://localhost:8100"), + help="WebDriverAgent URL for iOS (default: http://localhost:8100)", + ) + + parser.add_argument( + "--pair", + action="store_true", + help="Pair with iOS device (required for some operations)", + ) + + parser.add_argument( + "--wda-status", + action="store_true", + help="Show WebDriverAgent status and exit (iOS only)", + ) + # Other options parser.add_argument( "--quiet", "-q", action="store_true", help="Suppress verbose output" @@ -404,9 +509,9 @@ Examples: parser.add_argument( "--device-type", type=str, - choices=["adb", "hdc"], + choices=["adb", "hdc", "ios"], default=os.getenv("PHONE_AGENT_DEVICE_TYPE", "adb"), - help="Device type: adb for Android, hdc for HarmonyOS (default: adb)", + help="Device type: adb for Android, hdc for HarmonyOS, ios for iPhone (default: adb)", ) parser.add_argument( @@ -419,6 +524,81 @@ Examples: return parser.parse_args() +def handle_ios_device_commands(args) -> bool: + """ + Handle iOS device-related commands. + + Returns: + True if a device command was handled (should exit), False otherwise. + """ + conn = XCTestConnection(wda_url=args.wda_url) + + # Handle --list-devices + if args.list_devices: + devices = list_ios_devices() + if not devices: + print("No iOS devices connected.") + print("\nTroubleshooting:") + print(" 1. Connect device via USB") + print(" 2. Unlock device and trust this computer") + print(" 3. Run: idevice_id -l") + else: + print("Connected iOS devices:") + print("-" * 70) + for device in devices: + conn_type = device.connection_type.value + model_info = f"{device.model}" if device.model else "Unknown" + ios_info = f"iOS {device.ios_version}" if device.ios_version else "" + name_info = device.device_name or "Unnamed" + + print(f" ✓ {name_info}") + print(f" UUID: {device.device_id}") + print(f" Model: {model_info}") + print(f" OS: {ios_info}") + print(f" Connection: {conn_type}") + print("-" * 70) + return True + + # Handle --pair + if args.pair: + print("Pairing with iOS device...") + success, message = conn.pair_device(args.device_id) + print(f"{'✓' if success else '✗'} {message}") + return True + + # Handle --wda-status + if args.wda_status: + print(f"Checking WebDriverAgent status at {args.wda_url}...") + print("-" * 50) + + if conn.is_wda_ready(): + print("✓ WebDriverAgent is running") + + status = conn.get_wda_status() + if status: + print(f"\nStatus details:") + value = status.get("value", {}) + print(f" Session ID: {status.get('sessionId', 'N/A')}") + print(f" Build: {value.get('build', {}).get('time', 'N/A')}") + + current_app = value.get("currentApp", {}) + if current_app: + print(f"\nCurrent App:") + print(f" Bundle ID: {current_app.get('bundleId', 'N/A')}") + print(f" Process ID: {current_app.get('pid', 'N/A')}") + else: + print("✗ WebDriverAgent is not running") + print("\nPlease start WebDriverAgent on your iOS device:") + print(" 1. Open WebDriverAgent.xcodeproj in Xcode") + print(" 2. Select your device") + print(" 3. Run WebDriverAgentRunner (Product > Test or Cmd+U)") + print(f" 4. For USB: Run port forwarding: iproxy 8100 8100") + + return True + + return False + + def handle_device_commands(args) -> bool: """ Handle device-related commands. @@ -426,6 +606,16 @@ def handle_device_commands(args) -> bool: Returns: True if a device command was handled (should exit), False otherwise. """ + device_type = ( + DeviceType.ADB + if args.device_type == "adb" + else (DeviceType.HDC if args.device_type == "hdc" else DeviceType.IOS) + ) + + # Handle iOS-specific commands + if device_type == DeviceType.IOS: + return handle_ios_device_commands(args) + device_factory = get_device_factory() ConnectionClass = device_factory.get_connection_class() conn = ConnectionClass() @@ -496,12 +686,21 @@ def main(): args = parse_args() # Set device type globally based on args - device_type = DeviceType.ADB if args.device_type == "adb" else DeviceType.HDC - set_device_type(device_type) + if args.device_type == "adb": + device_type = DeviceType.ADB + elif args.device_type == "hdc": + device_type = DeviceType.HDC + else: # ios + device_type = DeviceType.IOS + + # Set device type globally for non-iOS devices + if device_type != DeviceType.IOS: + set_device_type(device_type) # Enable HDC verbose mode if using HDC if device_type == DeviceType.HDC: from phone_agent.hdc import set_hdc_verbose + set_hdc_verbose(True) # Handle --list-apps (no system check needed) @@ -509,12 +708,23 @@ def main(): if device_type == DeviceType.HDC: print("Supported HarmonyOS apps:") apps = list_harmonyos_apps() + elif device_type == DeviceType.IOS: + print("Supported iOS apps:") + print("\nNote: For iOS apps, Bundle IDs are configured in:") + print(" phone_agent/config/apps_ios.py") + print("\nCurrently configured apps:") + apps = list_ios_apps() else: print("Supported Android apps:") apps = list_supported_apps() - for app in apps: + for app in sorted(apps): print(f" - {app}") + + if device_type == DeviceType.IOS: + print( + "\nTo add iOS apps, find the Bundle ID and add to APP_PACKAGES_IOS dictionary." + ) return # Handle device commands (these may need partial system checks) @@ -522,14 +732,19 @@ def main(): return # Run system requirements check before proceeding - if not check_system_requirements(device_type): + if not check_system_requirements( + device_type, + wda_url=args.wda_url + if device_type == DeviceType.IOS + else "http://localhost:8100", + ): sys.exit(1) # Check model API connectivity and model availability if not check_model_api(args.base_url, args.model, args.apikey): sys.exit(1) - # Create configurations + # Create configurations and agent based on device type model_config = ModelConfig( base_url=args.base_url, model_name=args.model, @@ -537,22 +752,40 @@ def main(): lang=args.lang, ) - agent_config = AgentConfig( - max_steps=args.max_steps, - device_id=args.device_id, - verbose=not args.quiet, - lang=args.lang, - ) + if device_type == DeviceType.IOS: + # Create iOS agent + agent_config = IOSAgentConfig( + max_steps=args.max_steps, + wda_url=args.wda_url, + device_id=args.device_id, + verbose=not args.quiet, + lang=args.lang, + ) - # Create agent - agent = PhoneAgent( - model_config=model_config, - agent_config=agent_config, - ) + agent = IOSPhoneAgent( + model_config=model_config, + agent_config=agent_config, + ) + else: + # Create Android/HarmonyOS agent + agent_config = AgentConfig( + max_steps=args.max_steps, + device_id=args.device_id, + verbose=not args.quiet, + lang=args.lang, + ) + + agent = PhoneAgent( + model_config=model_config, + agent_config=agent_config, + ) # Print header print("=" * 50) - print("Phone Agent - AI-powered phone automation") + if device_type == DeviceType.IOS: + print("Phone Agent iOS - AI-powered iOS automation") + else: + print("Phone Agent - AI-powered phone automation") print("=" * 50) print(f"Model: {model_config.model_name}") print(f"Base URL: {model_config.base_url}") @@ -560,13 +793,27 @@ def main(): print(f"Language: {agent_config.lang}") print(f"Device Type: {args.device_type.upper()}") + # Show iOS-specific config + if device_type == DeviceType.IOS: + print(f"WDA URL: {args.wda_url}") + # Show device info - device_factory = get_device_factory() - devices = device_factory.list_devices() - if agent_config.device_id: - print(f"Device: {agent_config.device_id}") - elif devices: - print(f"Device: {devices[0].device_id} (auto-detected)") + if device_type == DeviceType.IOS: + devices = list_ios_devices() + if agent_config.device_id: + print(f"Device: {agent_config.device_id}") + elif devices: + device = devices[0] + print(f"Device: {device.device_name or device.device_id[:16]}") + if device.model and device.ios_version: + print(f" {device.model}, iOS {device.ios_version}") + else: + device_factory = get_device_factory() + devices = device_factory.list_devices() + if agent_config.device_id: + print(f"Device: {agent_config.device_id}") + elif devices: + print(f"Device: {devices[0].device_id} (auto-detected)") print("=" * 50) diff --git a/phone_agent/device_factory.py b/phone_agent/device_factory.py index f7d3c46..915ff52 100644 --- a/phone_agent/device_factory.py +++ b/phone_agent/device_factory.py @@ -9,6 +9,7 @@ class DeviceType(Enum): ADB = "adb" HDC = "hdc" + IOS = "ios" class DeviceFactory: @@ -34,9 +35,11 @@ class DeviceFactory: if self._module is None: if self.device_type == DeviceType.ADB: from phone_agent import adb + self._module = adb elif self.device_type == DeviceType.HDC: from phone_agent import hdc + self._module = hdc else: raise ValueError(f"Unknown device type: {self.device_type}") @@ -50,21 +53,43 @@ class DeviceFactory: """Get current app name.""" return self.module.get_current_app(device_id) - def tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None): + def tap( + self, x: int, y: int, device_id: str | None = None, delay: float | None = None + ): """Tap at coordinates.""" return self.module.tap(x, y, device_id, delay) - def double_tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None): + def double_tap( + self, x: int, y: int, device_id: str | None = None, delay: float | None = None + ): """Double tap at coordinates.""" return self.module.double_tap(x, y, device_id, delay) - def long_press(self, x: int, y: int, duration_ms: int = 3000, device_id: str | None = None, delay: float | None = None): + def long_press( + self, + x: int, + y: int, + duration_ms: int = 3000, + device_id: str | None = None, + delay: float | None = None, + ): """Long press at coordinates.""" return self.module.long_press(x, y, duration_ms, device_id, delay) - def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, duration_ms: int | None = None, device_id: str | None = None, delay: float | None = None): + def swipe( + self, + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int | None = None, + device_id: str | None = None, + delay: float | None = None, + ): """Swipe from start to end.""" - return self.module.swipe(start_x, start_y, end_x, end_y, duration_ms, device_id, delay) + return self.module.swipe( + start_x, start_y, end_x, end_y, duration_ms, device_id, delay + ) def back(self, device_id: str | None = None, delay: float | None = None): """Press back button.""" @@ -74,7 +99,9 @@ class DeviceFactory: """Press home button.""" return self.module.home(device_id, delay) - def launch_app(self, app_name: str, device_id: str | None = None, delay: float | None = None) -> bool: + def launch_app( + self, app_name: str, device_id: str | None = None, delay: float | None = None + ) -> bool: """Launch an app.""" return self.module.launch_app(app_name, device_id, delay) @@ -102,9 +129,11 @@ class DeviceFactory: """Get the connection class (ADBConnection or HDCConnection).""" if self.device_type == DeviceType.ADB: from phone_agent.adb import ADBConnection + return ADBConnection elif self.device_type == DeviceType.HDC: from phone_agent.hdc import HDCConnection + return HDCConnection else: raise ValueError(f"Unknown device type: {self.device_type}")