fix format

This commit is contained in:
liuyongbin
2025-12-19 18:56:43 +08:00
parent 7729568ae0
commit 780b756e21
2 changed files with 326 additions and 50 deletions

335
main.py
View File

@@ -24,23 +24,31 @@ from openai import OpenAI
from phone_agent import PhoneAgent
from phone_agent.agent import AgentConfig
from phone_agent.agent_ios import IOSAgentConfig, IOSPhoneAgent
from phone_agent.config.apps import list_supported_apps
from phone_agent.config.apps_harmonyos import list_supported_apps as list_harmonyos_apps
from phone_agent.config.apps_ios import list_supported_apps as list_ios_apps
from phone_agent.device_factory import DeviceType, get_device_factory, set_device_type
from phone_agent.model import ModelConfig
from phone_agent.xctest import XCTestConnection
from phone_agent.xctest import list_devices as list_ios_devices
def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
def check_system_requirements(
device_type: DeviceType = DeviceType.ADB, wda_url: str = "http://localhost:8100"
) -> bool:
"""
Check system requirements before running the agent.
Checks:
1. ADB/HDC tools installed
1. ADB/HDC/iOS tools installed
2. At least one device connected
3. ADB Keyboard installed on the device (for ADB only)
4. WebDriverAgent running (for iOS only)
Args:
device_type: Type of device tool (ADB or HDC).
device_type: Type of device tool (ADB, HDC, or IOS).
wda_url: WebDriverAgent URL (for iOS only).
Returns:
True if all checks pass, False otherwise.
@@ -51,8 +59,12 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
all_passed = True
# Determine tool name and command
tool_name = "ADB" if device_type == DeviceType.ADB else "HDC"
tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc"
if device_type == DeviceType.IOS:
tool_name = "libimobiledevice"
tool_cmd = "idevice_id"
else:
tool_name = "ADB" if device_type == DeviceType.ADB else "HDC"
tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc"
# Check 1: Tool installed
print(f"1. Checking {tool_name} installation...", end=" ")
@@ -66,20 +78,31 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
print(
" - Windows: Download from https://developer.android.com/studio/releases/platform-tools"
)
else:
print(" - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs")
elif device_type == DeviceType.HDC:
print(
" - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs"
)
print(" - Add to PATH environment variable")
else: # IOS
print(" - macOS: brew install libimobiledevice")
print(" - Linux: sudo apt-get install libimobiledevice-utils")
all_passed = False
else:
# Double check by running version command
try:
version_cmd = [tool_cmd, "version"] if device_type == DeviceType.ADB else [tool_cmd, "-v"]
if device_type == DeviceType.ADB:
version_cmd = [tool_cmd, "version"]
elif device_type == DeviceType.HDC:
version_cmd = [tool_cmd, "-v"]
else: # IOS
version_cmd = [tool_cmd, "-ln"]
result = subprocess.run(
version_cmd, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
version_line = result.stdout.strip().split("\n")[0]
print(f"✅ OK ({version_line})")
print(f"✅ OK ({version_line if version_line else 'installed'})")
else:
print("❌ FAILED")
print(f" Error: {tool_name} command failed to run.")
@@ -108,13 +131,18 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
)
lines = result.stdout.strip().split("\n")
# Filter out header and empty lines, look for 'device' status
devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line]
else: # HDC
devices = [
line for line in lines[1:] if line.strip() and "\tdevice" in line
]
elif device_type == DeviceType.HDC:
result = subprocess.run(
["hdc", "list", "targets"], capture_output=True, text=True, timeout=10
)
lines = result.stdout.strip().split("\n")
devices = [line for line in lines if line.strip()]
else: # IOS
ios_devices = list_ios_devices()
devices = [d.device_id for d in ios_devices]
if not devices:
print("❌ FAILED")
@@ -123,18 +151,31 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
if device_type == DeviceType.ADB:
print(" 1. Enable USB debugging on your Android device")
print(" 2. Connect via USB and authorize the connection")
print(" 3. Or connect remotely: python main.py --connect <ip>:<port>")
else:
print(
" 3. Or connect remotely: python main.py --connect <ip>:<port>"
)
elif device_type == DeviceType.HDC:
print(" 1. Enable USB debugging on your HarmonyOS device")
print(" 2. Connect via USB and authorize the connection")
print(" 3. Or connect remotely: python main.py --device-type hdc --connect <ip>:<port>")
print(
" 3. Or connect remotely: python main.py --device-type hdc --connect <ip>:<port>"
)
else: # IOS
print(" 1. Connect your iOS device via USB")
print(" 2. Unlock device and tap 'Trust This Computer'")
print(" 3. Verify: idevice_id -l")
print(" 4. Or connect via WiFi using device IP")
all_passed = False
else:
if device_type == DeviceType.ADB:
device_ids = [d.split("\t")[0] for d in devices]
else:
elif device_type == DeviceType.HDC:
device_ids = [d.strip() for d in devices]
print(f"✅ OK ({len(devices)} device(s): {', '.join(device_ids)})")
else: # IOS
device_ids = devices
print(
f"✅ OK ({len(devices)} device(s): {', '.join(device_ids[:2])}{'...' if len(device_ids) > 2 else ''})"
)
except subprocess.TimeoutExpired:
print("❌ FAILED")
print(f" Error: {tool_name} command timed out.")
@@ -150,7 +191,7 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
print("❌ System check failed. Please fix the issues above.")
return False
# Check 3: ADB Keyboard installed (only for ADB)
# Check 3: ADB Keyboard installed (only for ADB) or WebDriverAgent (for iOS)
if device_type == DeviceType.ADB:
print("3. Checking ADB Keyboard...", end=" ")
try:
@@ -185,10 +226,38 @@ def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
print("❌ FAILED")
print(f" Error: {e}")
all_passed = False
else:
elif device_type == DeviceType.HDC:
# For HDC, skip keyboard check as it uses different input method
print("3. Skipping keyboard check for HarmonyOS...", end=" ")
print("✅ OK (using native input)")
else: # IOS
# Check WebDriverAgent
print(f"3. Checking WebDriverAgent ({wda_url})...", end=" ")
try:
conn = XCTestConnection(wda_url=wda_url)
if conn.is_wda_ready():
print("✅ OK")
# Get WDA status for additional info
status = conn.get_wda_status()
if status:
session_id = status.get("sessionId", "N/A")
print(f" Session ID: {session_id}")
else:
print("❌ FAILED")
print(" Error: WebDriverAgent is not running or not accessible.")
print(" Solution:")
print(" 1. Run WebDriverAgent on your iOS device via Xcode")
print(" 2. For USB: Set up port forwarding: iproxy 8100 8100")
print(
" 3. For WiFi: Use device IP, e.g., --wda-url http://192.168.1.100:8100"
)
print(" 4. Verify in browser: open http://localhost:8100/status")
all_passed = False
except Exception as e:
print("❌ FAILED")
print(f" Error: {e}")
all_passed = False
print("-" * 50)
@@ -290,7 +359,7 @@ def parse_args() -> argparse.Namespace:
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run with default settings
# Run with default settings (Android)
python main.py
# Specify model endpoint
@@ -313,6 +382,22 @@ Examples:
# List supported apps
python main.py --list-apps
# iOS specific examples
# Run with iOS device
python main.py --device-type ios "Open Safari and search for iPhone tips"
# Use WiFi connection for iOS
python main.py --device-type ios --wda-url http://192.168.1.100:8100
# List connected iOS devices
python main.py --device-type ios --list-devices
# Check WebDriverAgent status
python main.py --device-type ios --wda-status
# Pair with iOS device
python main.py --device-type ios --pair
""",
)
@@ -384,6 +469,26 @@ Examples:
help="Enable TCP/IP debugging on USB device (default port: 5555)",
)
# iOS specific options
parser.add_argument(
"--wda-url",
type=str,
default=os.getenv("PHONE_AGENT_WDA_URL", "http://localhost:8100"),
help="WebDriverAgent URL for iOS (default: http://localhost:8100)",
)
parser.add_argument(
"--pair",
action="store_true",
help="Pair with iOS device (required for some operations)",
)
parser.add_argument(
"--wda-status",
action="store_true",
help="Show WebDriverAgent status and exit (iOS only)",
)
# Other options
parser.add_argument(
"--quiet", "-q", action="store_true", help="Suppress verbose output"
@@ -404,9 +509,9 @@ Examples:
parser.add_argument(
"--device-type",
type=str,
choices=["adb", "hdc"],
choices=["adb", "hdc", "ios"],
default=os.getenv("PHONE_AGENT_DEVICE_TYPE", "adb"),
help="Device type: adb for Android, hdc for HarmonyOS (default: adb)",
help="Device type: adb for Android, hdc for HarmonyOS, ios for iPhone (default: adb)",
)
parser.add_argument(
@@ -419,6 +524,81 @@ Examples:
return parser.parse_args()
def handle_ios_device_commands(args) -> bool:
"""
Handle iOS device-related commands.
Returns:
True if a device command was handled (should exit), False otherwise.
"""
conn = XCTestConnection(wda_url=args.wda_url)
# Handle --list-devices
if args.list_devices:
devices = list_ios_devices()
if not devices:
print("No iOS devices connected.")
print("\nTroubleshooting:")
print(" 1. Connect device via USB")
print(" 2. Unlock device and trust this computer")
print(" 3. Run: idevice_id -l")
else:
print("Connected iOS devices:")
print("-" * 70)
for device in devices:
conn_type = device.connection_type.value
model_info = f"{device.model}" if device.model else "Unknown"
ios_info = f"iOS {device.ios_version}" if device.ios_version else ""
name_info = device.device_name or "Unnamed"
print(f"{name_info}")
print(f" UUID: {device.device_id}")
print(f" Model: {model_info}")
print(f" OS: {ios_info}")
print(f" Connection: {conn_type}")
print("-" * 70)
return True
# Handle --pair
if args.pair:
print("Pairing with iOS device...")
success, message = conn.pair_device(args.device_id)
print(f"{'' if success else ''} {message}")
return True
# Handle --wda-status
if args.wda_status:
print(f"Checking WebDriverAgent status at {args.wda_url}...")
print("-" * 50)
if conn.is_wda_ready():
print("✓ WebDriverAgent is running")
status = conn.get_wda_status()
if status:
print(f"\nStatus details:")
value = status.get("value", {})
print(f" Session ID: {status.get('sessionId', 'N/A')}")
print(f" Build: {value.get('build', {}).get('time', 'N/A')}")
current_app = value.get("currentApp", {})
if current_app:
print(f"\nCurrent App:")
print(f" Bundle ID: {current_app.get('bundleId', 'N/A')}")
print(f" Process ID: {current_app.get('pid', 'N/A')}")
else:
print("✗ WebDriverAgent is not running")
print("\nPlease start WebDriverAgent on your iOS device:")
print(" 1. Open WebDriverAgent.xcodeproj in Xcode")
print(" 2. Select your device")
print(" 3. Run WebDriverAgentRunner (Product > Test or Cmd+U)")
print(f" 4. For USB: Run port forwarding: iproxy 8100 8100")
return True
return False
def handle_device_commands(args) -> bool:
"""
Handle device-related commands.
@@ -426,6 +606,16 @@ def handle_device_commands(args) -> bool:
Returns:
True if a device command was handled (should exit), False otherwise.
"""
device_type = (
DeviceType.ADB
if args.device_type == "adb"
else (DeviceType.HDC if args.device_type == "hdc" else DeviceType.IOS)
)
# Handle iOS-specific commands
if device_type == DeviceType.IOS:
return handle_ios_device_commands(args)
device_factory = get_device_factory()
ConnectionClass = device_factory.get_connection_class()
conn = ConnectionClass()
@@ -496,12 +686,21 @@ def main():
args = parse_args()
# Set device type globally based on args
device_type = DeviceType.ADB if args.device_type == "adb" else DeviceType.HDC
set_device_type(device_type)
if args.device_type == "adb":
device_type = DeviceType.ADB
elif args.device_type == "hdc":
device_type = DeviceType.HDC
else: # ios
device_type = DeviceType.IOS
# Set device type globally for non-iOS devices
if device_type != DeviceType.IOS:
set_device_type(device_type)
# Enable HDC verbose mode if using HDC
if device_type == DeviceType.HDC:
from phone_agent.hdc import set_hdc_verbose
set_hdc_verbose(True)
# Handle --list-apps (no system check needed)
@@ -509,12 +708,23 @@ def main():
if device_type == DeviceType.HDC:
print("Supported HarmonyOS apps:")
apps = list_harmonyos_apps()
elif device_type == DeviceType.IOS:
print("Supported iOS apps:")
print("\nNote: For iOS apps, Bundle IDs are configured in:")
print(" phone_agent/config/apps_ios.py")
print("\nCurrently configured apps:")
apps = list_ios_apps()
else:
print("Supported Android apps:")
apps = list_supported_apps()
for app in apps:
for app in sorted(apps):
print(f" - {app}")
if device_type == DeviceType.IOS:
print(
"\nTo add iOS apps, find the Bundle ID and add to APP_PACKAGES_IOS dictionary."
)
return
# Handle device commands (these may need partial system checks)
@@ -522,14 +732,19 @@ def main():
return
# Run system requirements check before proceeding
if not check_system_requirements(device_type):
if not check_system_requirements(
device_type,
wda_url=args.wda_url
if device_type == DeviceType.IOS
else "http://localhost:8100",
):
sys.exit(1)
# Check model API connectivity and model availability
if not check_model_api(args.base_url, args.model, args.apikey):
sys.exit(1)
# Create configurations
# Create configurations and agent based on device type
model_config = ModelConfig(
base_url=args.base_url,
model_name=args.model,
@@ -537,22 +752,40 @@ def main():
lang=args.lang,
)
agent_config = AgentConfig(
max_steps=args.max_steps,
device_id=args.device_id,
verbose=not args.quiet,
lang=args.lang,
)
if device_type == DeviceType.IOS:
# Create iOS agent
agent_config = IOSAgentConfig(
max_steps=args.max_steps,
wda_url=args.wda_url,
device_id=args.device_id,
verbose=not args.quiet,
lang=args.lang,
)
# Create agent
agent = PhoneAgent(
model_config=model_config,
agent_config=agent_config,
)
agent = IOSPhoneAgent(
model_config=model_config,
agent_config=agent_config,
)
else:
# Create Android/HarmonyOS agent
agent_config = AgentConfig(
max_steps=args.max_steps,
device_id=args.device_id,
verbose=not args.quiet,
lang=args.lang,
)
agent = PhoneAgent(
model_config=model_config,
agent_config=agent_config,
)
# Print header
print("=" * 50)
print("Phone Agent - AI-powered phone automation")
if device_type == DeviceType.IOS:
print("Phone Agent iOS - AI-powered iOS automation")
else:
print("Phone Agent - AI-powered phone automation")
print("=" * 50)
print(f"Model: {model_config.model_name}")
print(f"Base URL: {model_config.base_url}")
@@ -560,13 +793,27 @@ def main():
print(f"Language: {agent_config.lang}")
print(f"Device Type: {args.device_type.upper()}")
# Show iOS-specific config
if device_type == DeviceType.IOS:
print(f"WDA URL: {args.wda_url}")
# Show device info
device_factory = get_device_factory()
devices = device_factory.list_devices()
if agent_config.device_id:
print(f"Device: {agent_config.device_id}")
elif devices:
print(f"Device: {devices[0].device_id} (auto-detected)")
if device_type == DeviceType.IOS:
devices = list_ios_devices()
if agent_config.device_id:
print(f"Device: {agent_config.device_id}")
elif devices:
device = devices[0]
print(f"Device: {device.device_name or device.device_id[:16]}")
if device.model and device.ios_version:
print(f" {device.model}, iOS {device.ios_version}")
else:
device_factory = get_device_factory()
devices = device_factory.list_devices()
if agent_config.device_id:
print(f"Device: {agent_config.device_id}")
elif devices:
print(f"Device: {devices[0].device_id} (auto-detected)")
print("=" * 50)

View File

@@ -9,6 +9,7 @@ class DeviceType(Enum):
ADB = "adb"
HDC = "hdc"
IOS = "ios"
class DeviceFactory:
@@ -34,9 +35,11 @@ class DeviceFactory:
if self._module is None:
if self.device_type == DeviceType.ADB:
from phone_agent import adb
self._module = adb
elif self.device_type == DeviceType.HDC:
from phone_agent import hdc
self._module = hdc
else:
raise ValueError(f"Unknown device type: {self.device_type}")
@@ -50,21 +53,43 @@ class DeviceFactory:
"""Get current app name."""
return self.module.get_current_app(device_id)
def tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
def tap(
self, x: int, y: int, device_id: str | None = None, delay: float | None = None
):
"""Tap at coordinates."""
return self.module.tap(x, y, device_id, delay)
def double_tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
def double_tap(
self, x: int, y: int, device_id: str | None = None, delay: float | None = None
):
"""Double tap at coordinates."""
return self.module.double_tap(x, y, device_id, delay)
def long_press(self, x: int, y: int, duration_ms: int = 3000, device_id: str | None = None, delay: float | None = None):
def long_press(
self,
x: int,
y: int,
duration_ms: int = 3000,
device_id: str | None = None,
delay: float | None = None,
):
"""Long press at coordinates."""
return self.module.long_press(x, y, duration_ms, device_id, delay)
def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, duration_ms: int | None = None, device_id: str | None = None, delay: float | None = None):
def swipe(
self,
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration_ms: int | None = None,
device_id: str | None = None,
delay: float | None = None,
):
"""Swipe from start to end."""
return self.module.swipe(start_x, start_y, end_x, end_y, duration_ms, device_id, delay)
return self.module.swipe(
start_x, start_y, end_x, end_y, duration_ms, device_id, delay
)
def back(self, device_id: str | None = None, delay: float | None = None):
"""Press back button."""
@@ -74,7 +99,9 @@ class DeviceFactory:
"""Press home button."""
return self.module.home(device_id, delay)
def launch_app(self, app_name: str, device_id: str | None = None, delay: float | None = None) -> bool:
def launch_app(
self, app_name: str, device_id: str | None = None, delay: float | None = None
) -> bool:
"""Launch an app."""
return self.module.launch_app(app_name, device_id, delay)
@@ -102,9 +129,11 @@ class DeviceFactory:
"""Get the connection class (ADBConnection or HDCConnection)."""
if self.device_type == DeviceType.ADB:
from phone_agent.adb import ADBConnection
return ADBConnection
elif self.device_type == DeviceType.HDC:
from phone_agent.hdc import HDCConnection
return HDCConnection
else:
raise ValueError(f"Unknown device type: {self.device_type}")