diff --git a/main.py b/main.py index 39942fb..624d2bf 100644 --- a/main.py +++ b/main.py @@ -23,20 +23,24 @@ from urllib.parse import urlparse from openai import OpenAI from phone_agent import PhoneAgent -from phone_agent.adb import ADBConnection, list_devices from phone_agent.agent import AgentConfig from phone_agent.config.apps import list_supported_apps +from phone_agent.config.apps_harmonyos import list_supported_apps as list_harmonyos_apps +from phone_agent.device_factory import DeviceType, get_device_factory, set_device_type from phone_agent.model import ModelConfig -def check_system_requirements() -> bool: +def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool: """ Check system requirements before running the agent. Checks: - 1. ADB tools installed + 1. ADB/HDC tools installed 2. At least one device connected - 3. ADB Keyboard installed on the device + 3. ADB Keyboard installed on the device (for ADB only) + + Args: + device_type: Type of device tool (ADB or HDC). Returns: True if all checks pass, False otherwise. @@ -46,38 +50,47 @@ def check_system_requirements() -> bool: all_passed = True - # Check 1: ADB installed - print("1. Checking ADB installation...", end=" ") - if shutil.which("adb") is None: + # Determine tool name and command + tool_name = "ADB" if device_type == DeviceType.ADB else "HDC" + tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc" + + # Check 1: Tool installed + print(f"1. Checking {tool_name} installation...", end=" ") + if shutil.which(tool_cmd) is None: print("❌ FAILED") - print(" Error: ADB is not installed or not in PATH.") - print(" Solution: Install Android SDK Platform Tools:") - print(" - macOS: brew install android-platform-tools") - print(" - Linux: sudo apt install android-tools-adb") - print( - " - Windows: Download from https://developer.android.com/studio/releases/platform-tools" - ) + print(f" Error: {tool_name} is not installed or not in PATH.") + print(f" Solution: Install {tool_name}:") + if device_type == DeviceType.ADB: + print(" - macOS: brew install android-platform-tools") + print(" - Linux: sudo apt install android-tools-adb") + print( + " - Windows: Download from https://developer.android.com/studio/releases/platform-tools" + ) + else: + print(" - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs") + print(" - Add to PATH environment variable") all_passed = False else: - # Double check by running adb version + # Double check by running version command try: + version_cmd = [tool_cmd, "version"] if device_type == DeviceType.ADB else [tool_cmd, "-v"] result = subprocess.run( - ["adb", "version"], capture_output=True, text=True, timeout=10 + version_cmd, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: version_line = result.stdout.strip().split("\n")[0] print(f"✅ OK ({version_line})") else: print("❌ FAILED") - print(" Error: ADB command failed to run.") + print(f" Error: {tool_name} command failed to run.") all_passed = False except FileNotFoundError: print("❌ FAILED") - print(" Error: ADB command not found.") + print(f" Error: {tool_name} command not found.") all_passed = False except subprocess.TimeoutExpired: print("❌ FAILED") - print(" Error: ADB command timed out.") + print(f" Error: {tool_name} command timed out.") all_passed = False # If ADB is not installed, skip remaining checks @@ -89,27 +102,42 @@ def check_system_requirements() -> bool: # Check 2: Device connected print("2. Checking connected devices...", end=" ") try: - result = subprocess.run( - ["adb", "devices"], capture_output=True, text=True, timeout=10 - ) - lines = result.stdout.strip().split("\n") - # Filter out header and empty lines, look for 'device' status - devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line] + if device_type == DeviceType.ADB: + result = subprocess.run( + ["adb", "devices"], capture_output=True, text=True, timeout=10 + ) + lines = result.stdout.strip().split("\n") + # Filter out header and empty lines, look for 'device' status + devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line] + else: # HDC + result = subprocess.run( + ["hdc", "list", "targets"], capture_output=True, text=True, timeout=10 + ) + lines = result.stdout.strip().split("\n") + devices = [line for line in lines if line.strip()] if not devices: print("❌ FAILED") print(" Error: No devices connected.") print(" Solution:") - print(" 1. Enable USB debugging on your Android device") - print(" 2. Connect via USB and authorize the connection") - print(" 3. Or connect remotely: python main.py --connect :") + if device_type == DeviceType.ADB: + print(" 1. Enable USB debugging on your Android device") + print(" 2. Connect via USB and authorize the connection") + print(" 3. Or connect remotely: python main.py --connect :") + else: + print(" 1. Enable USB debugging on your HarmonyOS device") + print(" 2. Connect via USB and authorize the connection") + print(" 3. Or connect remotely: python main.py --device-type hdc --connect :") all_passed = False else: - device_ids = [d.split("\t")[0] for d in devices] + if device_type == DeviceType.ADB: + device_ids = [d.split("\t")[0] for d in devices] + else: + device_ids = [d.strip() for d in devices] print(f"✅ OK ({len(devices)} device(s): {', '.join(device_ids)})") except subprocess.TimeoutExpired: print("❌ FAILED") - print(" Error: ADB command timed out.") + print(f" Error: {tool_name} command timed out.") all_passed = False except Exception as e: print("❌ FAILED") @@ -122,40 +150,45 @@ def check_system_requirements() -> bool: print("❌ System check failed. Please fix the issues above.") return False - # Check 3: ADB Keyboard installed - print("3. Checking ADB Keyboard...", end=" ") - try: - result = subprocess.run( - ["adb", "shell", "ime", "list", "-s"], - capture_output=True, - text=True, - timeout=10, - ) - ime_list = result.stdout.strip() + # Check 3: ADB Keyboard installed (only for ADB) + if device_type == DeviceType.ADB: + print("3. Checking ADB Keyboard...", end=" ") + try: + result = subprocess.run( + ["adb", "shell", "ime", "list", "-s"], + capture_output=True, + text=True, + timeout=10, + ) + ime_list = result.stdout.strip() - if "com.android.adbkeyboard/.AdbIME" in ime_list: - print("✅ OK") - else: + if "com.android.adbkeyboard/.AdbIME" in ime_list: + print("✅ OK") + else: + print("❌ FAILED") + print(" Error: ADB Keyboard is not installed on the device.") + print(" Solution:") + print(" 1. Download ADB Keyboard APK from:") + print( + " https://github.com/senzhk/ADBKeyBoard/blob/master/ADBKeyboard.apk" + ) + print(" 2. Install it on your device: adb install ADBKeyboard.apk") + print( + " 3. Enable it in Settings > System > Languages & Input > Virtual Keyboard" + ) + all_passed = False + except subprocess.TimeoutExpired: print("❌ FAILED") - print(" Error: ADB Keyboard is not installed on the device.") - print(" Solution:") - print(" 1. Download ADB Keyboard APK from:") - print( - " https://github.com/senzhk/ADBKeyBoard/blob/master/ADBKeyboard.apk" - ) - print(" 2. Install it on your device: adb install ADBKeyboard.apk") - print( - " 3. Enable it in Settings > System > Languages & Input > Virtual Keyboard" - ) + print(" Error: ADB command timed out.") all_passed = False - except subprocess.TimeoutExpired: - print("❌ FAILED") - print(" Error: ADB command timed out.") - all_passed = False - except Exception as e: - print("❌ FAILED") - print(f" Error: {e}") - all_passed = False + except Exception as e: + print("❌ FAILED") + print(f" Error: {e}") + all_passed = False + else: + # For HDC, skip keyboard check as it uses different input method + print("3. Skipping keyboard check for HarmonyOS...", end=" ") + print("✅ OK (using native input)") print("-" * 50) @@ -368,6 +401,14 @@ Examples: help="Language for system prompt (cn or en, default: cn)", ) + parser.add_argument( + "--device-type", + type=str, + choices=["adb", "hdc"], + default=os.getenv("PHONE_AGENT_DEVICE_TYPE", "adb"), + help="Device type: adb for Android, hdc for HarmonyOS (default: adb)", + ) + parser.add_argument( "task", nargs="?", @@ -385,11 +426,13 @@ def handle_device_commands(args) -> bool: Returns: True if a device command was handled (should exit), False otherwise. """ - conn = ADBConnection() + device_factory = get_device_factory() + ConnectionClass = device_factory.get_connection_class() + conn = ConnectionClass() # Handle --list-devices if args.list_devices: - devices = list_devices() + devices = device_factory.list_devices() if not devices: print("No devices connected.") else: @@ -452,10 +495,25 @@ def main(): """Main entry point.""" args = parse_args() + # Set device type globally based on args + device_type = DeviceType.ADB if args.device_type == "adb" else DeviceType.HDC + set_device_type(device_type) + + # Enable HDC verbose mode if using HDC + if device_type == DeviceType.HDC: + from phone_agent.hdc import set_hdc_verbose + set_hdc_verbose(True) + # Handle --list-apps (no system check needed) if args.list_apps: - print("Supported apps:") - for app in sorted(list_supported_apps()): + if device_type == DeviceType.HDC: + print("Supported HarmonyOS apps:") + apps = list_harmonyos_apps() + else: + print("Supported Android apps:") + apps = list_supported_apps() + + for app in apps: print(f" - {app}") return @@ -464,7 +522,7 @@ def main(): return # Run system requirements check before proceeding - if not check_system_requirements(): + if not check_system_requirements(device_type): sys.exit(1) # Check model API connectivity and model availability @@ -500,9 +558,11 @@ def main(): print(f"Base URL: {model_config.base_url}") print(f"Max Steps: {agent_config.max_steps}") print(f"Language: {agent_config.lang}") + print(f"Device Type: {args.device_type.upper()}") # Show device info - devices = list_devices() + device_factory = get_device_factory() + devices = device_factory.list_devices() if agent_config.device_id: print(f"Device: {agent_config.device_id}") elif devices: diff --git a/phone_agent/actions/handler.py b/phone_agent/actions/handler.py index 32ab4ff..0bef1c3 100644 --- a/phone_agent/actions/handler.py +++ b/phone_agent/actions/handler.py @@ -2,24 +2,13 @@ import ast import re +import subprocess import time from dataclasses import dataclass from typing import Any, Callable -from phone_agent.adb import ( - back, - clear_text, - detect_and_set_adb_keyboard, - double_tap, - home, - launch_app, - long_press, - restore_keyboard, - swipe, - tap, - type_text, -) from phone_agent.config.timing import TIMING_CONFIG +from phone_agent.device_factory import get_device_factory @dataclass @@ -132,7 +121,8 @@ class ActionHandler: if not app_name: return ActionResult(False, False, "No app name specified") - success = launch_app(app_name, self.device_id) + device_factory = get_device_factory() + success = device_factory.launch_app(app_name, self.device_id) if success: return ActionResult(True, False) return ActionResult(False, False, f"App not found: {app_name}") @@ -154,26 +144,30 @@ class ActionHandler: message="User cancelled sensitive operation", ) - tap(x, y, self.device_id) + device_factory = get_device_factory() + device_factory.tap(x, y, self.device_id) return ActionResult(True, False) def _handle_type(self, action: dict, width: int, height: int) -> ActionResult: """Handle text input action.""" text = action.get("text", "") + device_factory = get_device_factory() + # Switch to ADB keyboard - original_ime = detect_and_set_adb_keyboard(self.device_id) + original_ime = device_factory.detect_and_set_adb_keyboard(self.device_id) time.sleep(TIMING_CONFIG.action.keyboard_switch_delay) # Clear existing text and type new text - clear_text(self.device_id) + device_factory.clear_text(self.device_id) time.sleep(TIMING_CONFIG.action.text_clear_delay) - type_text(text, self.device_id) + # Handle multiline text by splitting on newlines + device_factory.type_text(text, self.device_id) time.sleep(TIMING_CONFIG.action.text_input_delay) # Restore original keyboard - restore_keyboard(original_ime, self.device_id) + device_factory.restore_keyboard(original_ime, self.device_id) time.sleep(TIMING_CONFIG.action.keyboard_restore_delay) return ActionResult(True, False) @@ -189,17 +183,20 @@ class ActionHandler: start_x, start_y = self._convert_relative_to_absolute(start, width, height) end_x, end_y = self._convert_relative_to_absolute(end, width, height) - swipe(start_x, start_y, end_x, end_y, device_id=self.device_id) + device_factory = get_device_factory() + device_factory.swipe(start_x, start_y, end_x, end_y, device_id=self.device_id) return ActionResult(True, False) def _handle_back(self, action: dict, width: int, height: int) -> ActionResult: """Handle back button action.""" - back(self.device_id) + device_factory = get_device_factory() + device_factory.back(self.device_id) return ActionResult(True, False) def _handle_home(self, action: dict, width: int, height: int) -> ActionResult: """Handle home button action.""" - home(self.device_id) + device_factory = get_device_factory() + device_factory.home(self.device_id) return ActionResult(True, False) def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult: @@ -209,7 +206,8 @@ class ActionHandler: return ActionResult(False, False, "No element coordinates") x, y = self._convert_relative_to_absolute(element, width, height) - double_tap(x, y, self.device_id) + device_factory = get_device_factory() + device_factory.double_tap(x, y, self.device_id) return ActionResult(True, False) def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult: @@ -219,7 +217,8 @@ class ActionHandler: return ActionResult(False, False, "No element coordinates") x, y = self._convert_relative_to_absolute(element, width, height) - long_press(x, y, device_id=self.device_id) + device_factory = get_device_factory() + device_factory.long_press(x, y, device_id=self.device_id) return ActionResult(True, False) def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult: @@ -256,6 +255,68 @@ class ActionHandler: # This action signals that user input is needed return ActionResult(True, False, message="User interaction required") + def _send_keyevent(self, keycode: str) -> None: + """Send a keyevent to the device.""" + from phone_agent.device_factory import DeviceType, get_device_factory + from phone_agent.hdc.connection import _run_hdc_command + + device_factory = get_device_factory() + + # Handle HDC devices with HarmonyOS-specific keyEvent command + if device_factory.device_type == DeviceType.HDC: + hdc_prefix = ["hdc", "-t", self.device_id] if self.device_id else ["hdc"] + + # Map common keycodes to HarmonyOS keyEvent codes + # KEYCODE_ENTER (66) -> 2054 (HarmonyOS Enter key code) + if keycode == "KEYCODE_ENTER" or keycode == "66": + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"], + capture_output=True, + text=True, + ) + else: + # For other keys, try to use the numeric code directly + # If keycode is a string like "KEYCODE_ENTER", convert it + try: + # Try to extract numeric code from string or use as-is + if keycode.startswith("KEYCODE_"): + # For now, only handle ENTER, other keys may need mapping + if "ENTER" in keycode: + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"], + capture_output=True, + text=True, + ) + else: + # Fallback to ADB-style command for unsupported keys + subprocess.run( + hdc_prefix + ["shell", "input", "keyevent", keycode], + capture_output=True, + text=True, + ) + else: + # Assume it's a numeric code + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", str(keycode)], + capture_output=True, + text=True, + ) + except Exception: + # Fallback to ADB-style command + subprocess.run( + hdc_prefix + ["shell", "input", "keyevent", keycode], + capture_output=True, + text=True, + ) + else: + # ADB devices use standard input keyevent command + cmd_prefix = ["adb", "-s", self.device_id] if self.device_id else ["adb"] + subprocess.run( + cmd_prefix + ["shell", "input", "keyevent", keycode], + capture_output=True, + text=True, + ) + @staticmethod def _default_confirmation(message: str) -> bool: """Default confirmation callback using console input.""" @@ -281,6 +342,7 @@ def parse_action(response: str) -> dict[str, Any]: Raises: ValueError: If the response cannot be parsed. """ + print(f"Parsing action: {response}") try: response = response.strip() if response.startswith('do(action="Type"') or response.startswith( @@ -292,6 +354,11 @@ def parse_action(response: str) -> dict[str, Any]: elif response.startswith("do"): # Use AST parsing instead of eval for safety try: + # Escape special characters (newlines, tabs, etc.) for valid Python syntax + response = response.replace('\n', '\\n') + response = response.replace('\r', '\\r') + response = response.replace('\t', '\\t') + tree = ast.parse(response, mode="eval") if not isinstance(tree.body, ast.Call): raise ValueError("Expected a function call") diff --git a/phone_agent/adb/connection.py b/phone_agent/adb/connection.py index 480b5a7..b723ca9 100644 --- a/phone_agent/adb/connection.py +++ b/phone_agent/adb/connection.py @@ -109,7 +109,7 @@ class ADBConnection: if address: cmd.append(address) - result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) + result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5) output = result.stdout + result.stderr return True, output.strip() or "Disconnected" @@ -241,7 +241,7 @@ class ADBConnection: cmd.extend(["-s", device_id]) cmd.extend(["tcpip", str(port)]) - result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10) output = result.stdout + result.stderr @@ -270,7 +270,7 @@ class ADBConnection: cmd.extend(["-s", device_id]) cmd.extend(["shell", "ip", "route"]) - result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) + result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5) # Parse IP from route output for line in result.stdout.split("\n"): @@ -286,6 +286,7 @@ class ADBConnection: cmd[:-1] + ["shell", "ip", "addr", "show", "wlan0"], capture_output=True, text=True, + encoding="utf-8", timeout=5, ) diff --git a/phone_agent/adb/device.py b/phone_agent/adb/device.py index 1a081ad..995336a 100644 --- a/phone_agent/adb/device.py +++ b/phone_agent/adb/device.py @@ -22,9 +22,11 @@ def get_current_app(device_id: str | None = None) -> str: adb_prefix = _get_adb_prefix(device_id) result = subprocess.run( - adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True + adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True, encoding="utf-8" ) output = result.stdout + if not output: + raise ValueError("No output from dumpsys window") # Parse window focus info for line in output.split("\n"): diff --git a/phone_agent/agent.py b/phone_agent/agent.py index b170316..3642791 100644 --- a/phone_agent/agent.py +++ b/phone_agent/agent.py @@ -7,8 +7,8 @@ from typing import Any, Callable from phone_agent.actions import ActionHandler from phone_agent.actions.handler import do, finish, parse_action -from phone_agent.adb import get_current_app, get_screenshot from phone_agent.config import get_messages, get_system_prompt +from phone_agent.device_factory import get_device_factory from phone_agent.model import ModelClient, ModelConfig from phone_agent.model.client import MessageBuilder @@ -140,8 +140,9 @@ class PhoneAgent: self._step_count += 1 # Capture current screen state - screenshot = get_screenshot(self.agent_config.device_id) - current_app = get_current_app(self.agent_config.device_id) + device_factory = get_device_factory() + screenshot = device_factory.get_screenshot(self.agent_config.device_id) + current_app = device_factory.get_current_app(self.agent_config.device_id) # Build messages if is_first: diff --git a/phone_agent/config/apps_harmonyos.py b/phone_agent/config/apps_harmonyos.py new file mode 100644 index 0000000..5ffb52c --- /dev/null +++ b/phone_agent/config/apps_harmonyos.py @@ -0,0 +1,266 @@ +"""HarmonyOS application package name mappings. + +Maps user-friendly app names to HarmonyOS bundle names. +These bundle names are used with the 'hdc shell aa start -b ' command. +""" + +# Custom ability names for apps that don't use the default "EntryAbility" +# Maps bundle_name -> ability_name +# Generated by: python test/find_abilities.py +APP_ABILITIES: dict[str, str] = { + # Third-party apps + "cn.wps.mobileoffice.hap": "DocumentAbility", + "com.ccb.mobilebank.hm": "CcbMainAbility", + "com.dewu.hos": "HomeAbility", + "com.larus.nova.hm": "MainAbility", + "com.luna.hm.music": "MainAbility", + "com.meitu.meitupic": "MainAbility", + "com.ss.hm.article.news": "MainAbility", + "com.ss.hm.ugc.aweme": "MainAbility", + "com.taobao.taobao4hmos": "Taobao_mainAbility", + "com.tencent.videohm": "AppAbility", + "com.ximalaya.ting.xmharmony": "MainBundleAbility", + "com.zhihu.hmos": "PhoneAbility", + + # Huawei system apps + "com.huawei.hmos.browser": "MainAbility", + "com.huawei.hmos.calculator": "com.huawei.hmos.calculator.CalculatorAbility", + "com.huawei.hmos.calendar": "MainAbility", + "com.huawei.hmos.camera": "com.huawei.hmos.camera.MainAbility", + "com.huawei.hmos.clock": "com.huawei.hmos.clock.phone", + "com.huawei.hmos.clouddrive": "MainAbility", + "com.huawei.hmos.email": "ApplicationAbility", + "com.huawei.hmos.filemanager": "MainAbility", + "com.huawei.hmos.health": "Activity_card_entryAbility", + "com.huawei.hmos.notepad": "MainAbility", + "com.huawei.hmos.photos": "MainAbility", + "com.huawei.hmos.screenrecorder": "com.huawei.hmos.screenrecorder.ServiceExtAbility", + "com.huawei.hmos.screenshot": "com.huawei.hmos.screenshot.ServiceExtAbility", + "com.huawei.hmos.settings": "com.huawei.hmos.settings.MainAbility", + "com.huawei.hmos.soundrecorder": "MainAbility", + "com.huawei.hmos.vassistant": "AiCaptionServiceExtAbility", + "com.huawei.hmos.wallet": "MainAbility", + + # Huawei services + "com.huawei.hmsapp.appgallery": "MainAbility", + "com.huawei.hmsapp.books": "MainAbility", + "com.huawei.hmsapp.himovie": "MainAbility", + "com.huawei.hmsapp.hisearch": "MainAbility", + "com.huawei.hmsapp.music": "MainAbility", + "com.huawei.hmsapp.thememanager": "MainAbility", + "com.huawei.hmsapp.totemweather": "com.huawei.hmsapp.totemweather.MainAbility", + + # OHOS system apps + "com.ohos.callui": "com.ohos.callui.ServiceAbility", + "com.ohos.contacts": "com.ohos.contacts.MainAbility", + "com.ohos.mms": "com.ohos.mms.MainAbility", +} + +APP_PACKAGES: dict[str, str] = { + # Social & Messaging + "微信": "com.tencent.wechat", + "QQ": "com.tencent.mqq", + "微博": "com.sina.weibo.stage", + # E-commerce + "淘宝": "com.taobao.taobao4hmos", + "京东": "com.jd.hm.mall", + "拼多多": "com.xunmeng.pinduoduo.hos", + "淘宝闪购": "com.taobao.taobao4hmos", + "京东秒送": "com.jd.hm.mall", + # Lifestyle & Social + "小红书": "com.xingin.xhs_hos", + "知乎": "com.zhihu.hmos", + # "豆瓣": "com.douban.frodo", # 未在 hdc 列表中找到 + # Maps & Navigation + "高德地图": "com.amap.hmapp", + "百度地图": "com.baidu.hmmap", + # Food & Services + "美团": "com.sankuai.hmeituan", + "美团外卖": "com.meituan.takeaway", + "大众点评": "com.sankuai.dianping", + # "肯德基": "com.yek.android.kfc.activitys", # 未在 hdc 列表中找到 + # Travel + # "携程": "ctrip.android.view", # 未在 hdc 列表中找到 + "铁路12306": "com.chinarailway.ticketingHM", + "12306": "com.chinarailway.ticketingHM", + # "去哪儿": "com.Qunar", # 未在 hdc 列表中找到 + # "去哪儿旅行": "com.Qunar", # 未在 hdc 列表中找到 + "滴滴出行": "com.sdu.didi.hmos.psnger", + # Video & Entertainment + "bilibili": "yylx.danmaku.bili", + "抖音": "com.ss.hm.ugc.aweme", + "快手": "com.kuaishou.hmapp", + "腾讯视频": "com.tencent.videohm", + "爱奇艺": "com.qiyi.video.hmy", + "芒果TV": "com.mgtv.phone", + # "优酷视频": "com.youku.phone", # 未在 hdc 列表中找到 + # "红果短剧": "com.phoenix.read", # 未在 hdc 列表中找到 + # Music & Audio + # "网易云音乐": "com.netease.cloudmusic", # 未在 hdc 列表中找到 + "QQ音乐": "com.tencent.hm.qqmusic", + "汽水音乐": "com.luna.hm.music", + "喜马拉雅": "com.ximalaya.ting.xmharmony", + # Reading + # "番茄小说": "com.dragon.read", # 未在 hdc 列表中找到 + # "番茄免费小说": "com.dragon.read", # 未在 hdc 列表中找到 + # "七猫免费小说": "com.kmxs.reader", # 未在 hdc 列表中找到 + # Productivity + "飞书": "com.ss.feishu", + # "QQ邮箱": "com.tencent.androidqqmail", # 未在 hdc 列表中找到 + # AI & Tools + "豆包": "com.larus.nova.hm", + # Health & Fitness + # "keep": "com.gotokeep.keep", # 未在 hdc 列表中找到 + # "美柚": "com.lingan.seeyou", # 未在 hdc 列表中找到 + # News & Information + # "腾讯新闻": "com.tencent.news", # 未在 hdc 列表中找到 + "今日头条": "com.ss.hm.article.news", + # Real Estate + # "贝壳找房": "com.lianjia.beike", # 未在 hdc 列表中找到 + # "安居客": "com.anjuke.android.app", # 未在 hdc 列表中找到 + # Finance + # "同花顺": "com.hexin.plat.android", # 未在 hdc 列表中找到 + # Games + # "星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到 + # "崩坏:星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到 + # "恋与深空": "com.papegames.lysk.cn", # 未在 hdc 列表中找到 + + # HarmonyOS 第三方应用 + "百度": "com.baidu.baiduapp", + "阿里巴巴": "com.alibaba.wireless_hmos", + "WPS": "cn.wps.mobileoffice.hap", + "企业微信": "com.tencent.wework.hmos", + "同程": "com.tongcheng.hmos", + "同程旅行": "com.tongcheng.hmos", + "唯品会": "com.vip.hosapp", + "支付宝": "com.alipay.mobile.client", + "UC浏览器": "com.uc.mobile", + "闲鱼": "com.taobao.idlefish4ohos", + "转转": "com.zhuanzhuan.hmoszz", + "迅雷": "com.xunlei.thunder", + "搜狗输入法": "com.sogou.input", + "扫描全能王": "com.intsig.camscanner.hap", + "美图秀秀": "com.meitu.meitupic", + "58同城": "com.wuba.life", + "得物": "com.dewu.hos", + "海底捞": "com.haidilao.haros", + "中国移动": "com.droi.tong", + "中国联通": "com.sinovatech.unicom.ha", + "国家税务总局": "cn.gov.chinatax.gt4.hm", + "建设银行": "com.ccb.mobilebank.hm", + "快手极速版": "com.kuaishou.hmnebula", + + # HarmonyOS 系统应用 - 工具类 + "浏览器": "com.huawei.hmos.browser", + "计算器": "com.huawei.hmos.calculator", + "日历": "com.huawei.hmos.calendar", + "相机": "com.huawei.hmos.camera", + "时钟": "com.huawei.hmos.clock", + "云盘": "com.huawei.hmos.clouddrive", + "云空间": "com.huawei.hmos.clouddrive", + "邮件": "com.huawei.hmos.email", + "文件管理器": "com.huawei.hmos.filemanager", + "文件": "com.huawei.hmos.files", + "查找设备": "com.huawei.hmos.finddevice", + "查找手机": "com.huawei.hmos.finddevice", + "录音机": "com.huawei.hmos.soundrecorder", + "录音": "com.huawei.hmos.soundrecorder", + "录屏": "com.huawei.hmos.screenrecorder", + "截屏": "com.huawei.hmos.screenshot", + "笔记": "com.huawei.hmos.notepad", + "备忘录": "com.huawei.hmos.notepad", + + # HarmonyOS 系统应用 - 媒体类 + "相册": "com.huawei.hmos.photos", + "图库": "com.huawei.hmos.photos", + # "视频": "com.huawei.hmos.mediaplayer", # 未在 hdc 列表中找到,但有 com.huawei.hmsapp.himovie + + # HarmonyOS 系统应用 - 通讯类 + "联系人": "com.ohos.contacts", + "通讯录": "com.ohos.contacts", + "短信": "com.ohos.mms", + "信息": "com.ohos.mms", + "电话": "com.ohos.callui", + "拨号": "com.ohos.callui", + + # HarmonyOS 系统应用 - 设置类 + "设置": "com.huawei.hmos.settings", + "系统设置": "com.huawei.hmos.settings", + "AndroidSystemSettings": "com.huawei.hmos.settings", + "Android System Settings": "com.huawei.hmos.settings", + "Android System Settings": "com.huawei.hmos.settings", + "Android-System-Settings": "com.huawei.hmos.settings", + "Settings": "com.huawei.hmos.settings", + + # HarmonyOS 系统应用 - 生活服务 + "健康": "com.huawei.hmos.health", + "运动健康": "com.huawei.hmos.health", + "地图": "com.huawei.hmos.maps.app", + "华为地图": "com.huawei.hmos.maps.app", + "钱包": "com.huawei.hmos.wallet", + "华为钱包": "com.huawei.hmos.wallet", + "智慧生活": "com.huawei.hmos.ailife", + "智能助手": "com.huawei.hmos.vassistant", + "小艺": "com.huawei.hmos.vassistant", + + # HarmonyOS 服务 + "应用市场": "com.huawei.hmsapp.appgallery", + "华为应用市场": "com.huawei.hmsapp.appgallery", + "音乐": "com.huawei.hmsapp.music", + "华为音乐": "com.huawei.hmsapp.music", + "主题": "com.huawei.hmsapp.thememanager", + "主题管理": "com.huawei.hmsapp.thememanager", + "天气": "com.huawei.hmsapp.totemweather", + "华为天气": "com.huawei.hmsapp.totemweather", + "视频": "com.huawei.hmsapp.himovie", + "华为视频": "com.huawei.hmsapp.himovie", + "阅读": "com.huawei.hmsapp.books", + "华为阅读": "com.huawei.hmsapp.books", + "游戏中心": "com.huawei.hmsapp.gamecenter", + "华为游戏中心": "com.huawei.hmsapp.gamecenter", + "搜索": "com.huawei.hmsapp.hisearch", + "华为搜索": "com.huawei.hmsapp.hisearch", + "指南针": "com.huawei.hmsapp.compass", + "会员中心": "com.huawei.hmos.myhuawei", + "我的华为": "com.huawei.hmos.myhuawei", + "华为会员": "com.huawei.hmos.myhuawei", +} + + +def get_package_name(app_name: str) -> str | None: + """ + Get the package name for an app. + + Args: + app_name: The display name of the app. + + Returns: + The HarmonyOS bundle name, or None if not found. + """ + return APP_PACKAGES.get(app_name) + + +def get_app_name(package_name: str) -> str | None: + """ + Get the app name from a package name. + + Args: + package_name: The HarmonyOS bundle name. + + Returns: + The display name of the app, or None if not found. + """ + for name, package in APP_PACKAGES.items(): + if package == package_name: + return name + return None + + +def list_supported_apps() -> list[str]: + """ + Get a list of all supported app names. + + Returns: + List of app names. + """ + return list(APP_PACKAGES.keys()) diff --git a/phone_agent/device_factory.py b/phone_agent/device_factory.py new file mode 100644 index 0000000..f7d3c46 --- /dev/null +++ b/phone_agent/device_factory.py @@ -0,0 +1,138 @@ +"""Device factory for selecting ADB or HDC based on device type.""" + +from enum import Enum +from typing import Any + + +class DeviceType(Enum): + """Type of device connection tool.""" + + ADB = "adb" + HDC = "hdc" + + +class DeviceFactory: + """ + Factory class for getting device-specific implementations. + + This allows the system to work with both Android (ADB) and HarmonyOS (HDC) devices. + """ + + def __init__(self, device_type: DeviceType = DeviceType.ADB): + """ + Initialize the device factory. + + Args: + device_type: The type of device to use (ADB or HDC). + """ + self.device_type = device_type + self._module = None + + @property + def module(self): + """Get the appropriate device module (adb or hdc).""" + if self._module is None: + if self.device_type == DeviceType.ADB: + from phone_agent import adb + self._module = adb + elif self.device_type == DeviceType.HDC: + from phone_agent import hdc + self._module = hdc + else: + raise ValueError(f"Unknown device type: {self.device_type}") + return self._module + + def get_screenshot(self, device_id: str | None = None, timeout: int = 10): + """Get screenshot from device.""" + return self.module.get_screenshot(device_id, timeout) + + def get_current_app(self, device_id: str | None = None) -> str: + """Get current app name.""" + return self.module.get_current_app(device_id) + + def tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None): + """Tap at coordinates.""" + return self.module.tap(x, y, device_id, delay) + + def double_tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None): + """Double tap at coordinates.""" + return self.module.double_tap(x, y, device_id, delay) + + def long_press(self, x: int, y: int, duration_ms: int = 3000, device_id: str | None = None, delay: float | None = None): + """Long press at coordinates.""" + return self.module.long_press(x, y, duration_ms, device_id, delay) + + def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, duration_ms: int | None = None, device_id: str | None = None, delay: float | None = None): + """Swipe from start to end.""" + return self.module.swipe(start_x, start_y, end_x, end_y, duration_ms, device_id, delay) + + def back(self, device_id: str | None = None, delay: float | None = None): + """Press back button.""" + return self.module.back(device_id, delay) + + def home(self, device_id: str | None = None, delay: float | None = None): + """Press home button.""" + return self.module.home(device_id, delay) + + def launch_app(self, app_name: str, device_id: str | None = None, delay: float | None = None) -> bool: + """Launch an app.""" + return self.module.launch_app(app_name, device_id, delay) + + def type_text(self, text: str, device_id: str | None = None): + """Type text.""" + return self.module.type_text(text, device_id) + + def clear_text(self, device_id: str | None = None): + """Clear text.""" + return self.module.clear_text(device_id) + + def detect_and_set_adb_keyboard(self, device_id: str | None = None) -> str: + """Detect and set keyboard.""" + return self.module.detect_and_set_adb_keyboard(device_id) + + def restore_keyboard(self, ime: str, device_id: str | None = None): + """Restore keyboard.""" + return self.module.restore_keyboard(ime, device_id) + + def list_devices(self): + """List connected devices.""" + return self.module.list_devices() + + def get_connection_class(self): + """Get the connection class (ADBConnection or HDCConnection).""" + if self.device_type == DeviceType.ADB: + from phone_agent.adb import ADBConnection + return ADBConnection + elif self.device_type == DeviceType.HDC: + from phone_agent.hdc import HDCConnection + return HDCConnection + else: + raise ValueError(f"Unknown device type: {self.device_type}") + + +# Global device factory instance +_device_factory: DeviceFactory | None = None + + +def set_device_type(device_type: DeviceType): + """ + Set the global device type. + + Args: + device_type: The device type to use (ADB or HDC). + """ + global _device_factory + _device_factory = DeviceFactory(device_type) + + +def get_device_factory() -> DeviceFactory: + """ + Get the global device factory instance. + + Returns: + The device factory instance. + """ + global _device_factory + if _device_factory is None: + _device_factory = DeviceFactory(DeviceType.ADB) # Default to ADB + return _device_factory diff --git a/phone_agent/hdc/__init__.py b/phone_agent/hdc/__init__.py new file mode 100644 index 0000000..9b06993 --- /dev/null +++ b/phone_agent/hdc/__init__.py @@ -0,0 +1,53 @@ +"""HDC utilities for HarmonyOS device interaction.""" + +from phone_agent.hdc.connection import ( + HDCConnection, + ConnectionType, + DeviceInfo, + list_devices, + quick_connect, + set_hdc_verbose, +) +from phone_agent.hdc.device import ( + back, + double_tap, + get_current_app, + home, + launch_app, + long_press, + swipe, + tap, +) +from phone_agent.hdc.input import ( + clear_text, + detect_and_set_adb_keyboard, + restore_keyboard, + type_text, +) +from phone_agent.hdc.screenshot import get_screenshot + +__all__ = [ + # Screenshot + "get_screenshot", + # Input + "type_text", + "clear_text", + "detect_and_set_adb_keyboard", + "restore_keyboard", + # Device control + "get_current_app", + "tap", + "swipe", + "back", + "home", + "double_tap", + "long_press", + "launch_app", + # Connection management + "HDCConnection", + "DeviceInfo", + "ConnectionType", + "quick_connect", + "list_devices", + "set_hdc_verbose", +] diff --git a/phone_agent/hdc/connection.py b/phone_agent/hdc/connection.py new file mode 100644 index 0000000..15809f8 --- /dev/null +++ b/phone_agent/hdc/connection.py @@ -0,0 +1,381 @@ +"""HDC connection management for HarmonyOS devices.""" + +import os +import subprocess +import time +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from phone_agent.config.timing import TIMING_CONFIG + + +# Global flag to control HDC command output +_HDC_VERBOSE = os.getenv("HDC_VERBOSE", "false").lower() in ("true", "1", "yes") + + +def _run_hdc_command(cmd: list, **kwargs) -> subprocess.CompletedProcess: + """ + Run HDC command with optional verbose output. + + Args: + cmd: Command list to execute. + **kwargs: Additional arguments for subprocess.run. + + Returns: + CompletedProcess result. + """ + if _HDC_VERBOSE: + print(f"[HDC] Running command: {' '.join(cmd)}") + + result = subprocess.run(cmd, **kwargs) + + if _HDC_VERBOSE and result.returncode != 0: + print(f"[HDC] Command failed with return code {result.returncode}") + if hasattr(result, 'stderr') and result.stderr: + print(f"[HDC] Error: {result.stderr}") + + return result + + +def set_hdc_verbose(verbose: bool): + """Set HDC verbose mode globally.""" + global _HDC_VERBOSE + _HDC_VERBOSE = verbose + + +class ConnectionType(Enum): + """Type of HDC connection.""" + + USB = "usb" + WIFI = "wifi" + REMOTE = "remote" + + +@dataclass +class DeviceInfo: + """Information about a connected device.""" + + device_id: str + status: str + connection_type: ConnectionType + model: str | None = None + harmony_version: str | None = None + + +class HDCConnection: + """ + Manages HDC connections to HarmonyOS devices. + + Supports USB, WiFi, and remote TCP/IP connections. + + Example: + >>> conn = HDCConnection() + >>> # Connect to remote device + >>> conn.connect("192.168.1.100:5555") + >>> # List devices + >>> devices = conn.list_devices() + >>> # Disconnect + >>> conn.disconnect("192.168.1.100:5555") + """ + + def __init__(self, hdc_path: str = "hdc"): + """ + Initialize HDC connection manager. + + Args: + hdc_path: Path to HDC executable. + """ + self.hdc_path = hdc_path + + def connect(self, address: str, timeout: int = 10) -> tuple[bool, str]: + """ + Connect to a remote device via TCP/IP. + + Args: + address: Device address in format "host:port" (e.g., "192.168.1.100:5555"). + timeout: Connection timeout in seconds. + + Returns: + Tuple of (success, message). + + Note: + The remote device must have TCP/IP debugging enabled. + """ + # Validate address format + if ":" not in address: + address = f"{address}:5555" # Default HDC port + + try: + result = _run_hdc_command( + [self.hdc_path, "tconn", address], + capture_output=True, + text=True, + timeout=timeout, + ) + + output = result.stdout + result.stderr + + if "Connect OK" in output or "connected" in output.lower(): + return True, f"Connected to {address}" + elif "already connected" in output.lower(): + return True, f"Already connected to {address}" + else: + return False, output.strip() + + except subprocess.TimeoutExpired: + return False, f"Connection timeout after {timeout}s" + except Exception as e: + return False, f"Connection error: {e}" + + def disconnect(self, address: str | None = None) -> tuple[bool, str]: + """ + Disconnect from a remote device. + + Args: + address: Device address to disconnect. If None, disconnects all. + + Returns: + Tuple of (success, message). + """ + try: + if address: + cmd = [self.hdc_path, "tdisconn", address] + else: + # HDC doesn't have a "disconnect all" command, so we need to list and disconnect each + devices = self.list_devices() + for device in devices: + if ":" in device.device_id: # Remote device + _run_hdc_command( + [self.hdc_path, "tdisconn", device.device_id], + capture_output=True, + text=True, + timeout=5 + ) + return True, "Disconnected all remote devices" + + result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5) + + output = result.stdout + result.stderr + return True, output.strip() or "Disconnected" + + except Exception as e: + return False, f"Disconnect error: {e}" + + def list_devices(self) -> list[DeviceInfo]: + """ + List all connected devices. + + Returns: + List of DeviceInfo objects. + """ + try: + result = _run_hdc_command( + [self.hdc_path, "list", "targets"], + capture_output=True, + text=True, + timeout=5, + ) + + devices = [] + for line in result.stdout.strip().split("\n"): + if not line.strip(): + continue + + # HDC output format: device_id (status) + # Example: "192.168.1.100:5555" or "FMR0223C13000649" + device_id = line.strip() + + # Determine connection type + if ":" in device_id: + conn_type = ConnectionType.REMOTE + else: + conn_type = ConnectionType.USB + + # HDC doesn't provide detailed status in list command + # We assume "Connected" status for devices that appear + devices.append( + DeviceInfo( + device_id=device_id, + status="device", + connection_type=conn_type, + model=None, + ) + ) + + return devices + + except Exception as e: + print(f"Error listing devices: {e}") + return [] + + def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None: + """ + Get detailed information about a device. + + Args: + device_id: Device ID. If None, uses first available device. + + Returns: + DeviceInfo or None if not found. + """ + devices = self.list_devices() + + if not devices: + return None + + if device_id is None: + return devices[0] + + for device in devices: + if device.device_id == device_id: + return device + + return None + + def is_connected(self, device_id: str | None = None) -> bool: + """ + Check if a device is connected. + + Args: + device_id: Device ID to check. If None, checks if any device is connected. + + Returns: + True if connected, False otherwise. + """ + devices = self.list_devices() + + if not devices: + return False + + if device_id is None: + return len(devices) > 0 + + return any(d.device_id == device_id for d in devices) + + def enable_tcpip( + self, port: int = 5555, device_id: str | None = None + ) -> tuple[bool, str]: + """ + Enable TCP/IP debugging on a USB-connected device. + + This allows subsequent wireless connections to the device. + + Args: + port: TCP port for HDC (default: 5555). + device_id: Device ID. If None, uses first available device. + + Returns: + Tuple of (success, message). + + Note: + The device must be connected via USB first. + After this, you can disconnect USB and connect via WiFi. + """ + try: + cmd = [self.hdc_path] + if device_id: + cmd.extend(["-t", device_id]) + cmd.extend(["tmode", "port", str(port)]) + + result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10) + + output = result.stdout + result.stderr + + if result.returncode == 0 or "success" in output.lower(): + time.sleep(TIMING_CONFIG.connection.adb_restart_delay) + return True, f"TCP/IP mode enabled on port {port}" + else: + return False, output.strip() + + except Exception as e: + return False, f"Error enabling TCP/IP: {e}" + + def get_device_ip(self, device_id: str | None = None) -> str | None: + """ + Get the IP address of a connected device. + + Args: + device_id: Device ID. If None, uses first available device. + + Returns: + IP address string or None if not found. + """ + try: + cmd = [self.hdc_path] + if device_id: + cmd.extend(["-t", device_id]) + cmd.extend(["shell", "ifconfig"]) + + result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5) + + # Parse IP from ifconfig output + for line in result.stdout.split("\n"): + if "inet addr:" in line or "inet " in line: + parts = line.strip().split() + for i, part in enumerate(parts): + if "addr:" in part: + ip = part.split(":")[1] + # Filter out localhost + if not ip.startswith("127."): + return ip + elif part == "inet" and i + 1 < len(parts): + ip = parts[i + 1].split("/")[0] + if not ip.startswith("127."): + return ip + + return None + + except Exception as e: + print(f"Error getting device IP: {e}") + return None + + def restart_server(self) -> tuple[bool, str]: + """ + Restart the HDC server. + + Returns: + Tuple of (success, message). + """ + try: + # Kill server + _run_hdc_command( + [self.hdc_path, "kill"], capture_output=True, timeout=5 + ) + + time.sleep(TIMING_CONFIG.connection.server_restart_delay) + + # Start server (HDC auto-starts when running commands) + _run_hdc_command( + [self.hdc_path, "start", "-r"], capture_output=True, timeout=5 + ) + + return True, "HDC server restarted" + + except Exception as e: + return False, f"Error restarting server: {e}" + + +def quick_connect(address: str) -> tuple[bool, str]: + """ + Quick helper to connect to a remote device. + + Args: + address: Device address (e.g., "192.168.1.100" or "192.168.1.100:5555"). + + Returns: + Tuple of (success, message). + """ + conn = HDCConnection() + return conn.connect(address) + + +def list_devices() -> list[DeviceInfo]: + """ + Quick helper to list connected devices. + + Returns: + List of DeviceInfo objects. + """ + conn = HDCConnection() + return conn.list_devices() diff --git a/phone_agent/hdc/device.py b/phone_agent/hdc/device.py new file mode 100644 index 0000000..63f23c3 --- /dev/null +++ b/phone_agent/hdc/device.py @@ -0,0 +1,272 @@ +"""Device control utilities for HarmonyOS automation.""" + +import os +import subprocess +import time +from typing import List, Optional, Tuple + +from phone_agent.config.apps_harmonyos import APP_ABILITIES, APP_PACKAGES +from phone_agent.config.timing import TIMING_CONFIG +from phone_agent.hdc.connection import _run_hdc_command + + +def get_current_app(device_id: str | None = None) -> str: + """ + Get the currently focused app name. + + Args: + device_id: Optional HDC device ID for multi-device setups. + + Returns: + The app name if recognized, otherwise "System Home". + """ + hdc_prefix = _get_hdc_prefix(device_id) + + result = _run_hdc_command( + hdc_prefix + ["shell", "hidumper", "-s", "WindowManagerService", "-a", "-a"], + capture_output=True, + text=True, + encoding="utf-8" + ) + output = result.stdout + if not output: + raise ValueError("No output from hidumper") + + # Parse window focus info + for line in output.split("\n"): + if "focused" in line.lower() or "current" in line.lower(): + for app_name, package in APP_PACKAGES.items(): + if package in line: + return app_name + + return "System Home" + + +def tap( + x: int, y: int, device_id: str | None = None, delay: float | None = None +) -> None: + """ + Tap at the specified coordinates. + + Args: + x: X coordinate. + y: Y coordinate. + device_id: Optional HDC device ID. + delay: Delay in seconds after tap. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_tap_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + # HarmonyOS uses uitest uiInput click + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "click", str(x), str(y)], + capture_output=True + ) + time.sleep(delay) + + +def double_tap( + x: int, y: int, device_id: str | None = None, delay: float | None = None +) -> None: + """ + Double tap at the specified coordinates. + + Args: + x: X coordinate. + y: Y coordinate. + device_id: Optional HDC device ID. + delay: Delay in seconds after double tap. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_double_tap_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + # HarmonyOS uses uitest uiInput doubleClick + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "doubleClick", str(x), str(y)], + capture_output=True + ) + time.sleep(delay) + + +def long_press( + x: int, + y: int, + duration_ms: int = 3000, + device_id: str | None = None, + delay: float | None = None, +) -> None: + """ + Long press at the specified coordinates. + + Args: + x: X coordinate. + y: Y coordinate. + duration_ms: Duration of press in milliseconds (note: HarmonyOS longClick may not support duration). + device_id: Optional HDC device ID. + delay: Delay in seconds after long press. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_long_press_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + # HarmonyOS uses uitest uiInput longClick + # Note: longClick may have a fixed duration, duration_ms parameter might not be supported + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "longClick", str(x), str(y)], + capture_output=True, + ) + time.sleep(delay) + + +def swipe( + start_x: int, + start_y: int, + end_x: int, + end_y: int, + duration_ms: int | None = None, + device_id: str | None = None, + delay: float | None = None, +) -> None: + """ + Swipe from start to end coordinates. + + Args: + start_x: Starting X coordinate. + start_y: Starting Y coordinate. + end_x: Ending X coordinate. + end_y: Ending Y coordinate. + duration_ms: Duration of swipe in milliseconds (auto-calculated if None). + device_id: Optional HDC device ID. + delay: Delay in seconds after swipe. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_swipe_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + if duration_ms is None: + # Calculate duration based on distance + dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2 + duration_ms = int(dist_sq / 1000) + duration_ms = max(500, min(duration_ms, 1000)) # Clamp between 500-1000ms + + # HarmonyOS uses uitest uiInput swipe + # Format: swipe startX startY endX endY duration + _run_hdc_command( + hdc_prefix + + [ + "shell", + "uitest", + "uiInput", + "swipe", + str(start_x), + str(start_y), + str(end_x), + str(end_y), + str(duration_ms), + ], + capture_output=True, + ) + time.sleep(delay) + + +def back(device_id: str | None = None, delay: float | None = None) -> None: + """ + Press the back button. + + Args: + device_id: Optional HDC device ID. + delay: Delay in seconds after pressing back. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_back_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + # HarmonyOS uses uitest uiInput keyEvent Back + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Back"], + capture_output=True + ) + time.sleep(delay) + + +def home(device_id: str | None = None, delay: float | None = None) -> None: + """ + Press the home button. + + Args: + device_id: Optional HDC device ID. + delay: Delay in seconds after pressing home. If None, uses configured default. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_home_delay + + hdc_prefix = _get_hdc_prefix(device_id) + + # HarmonyOS uses uitest uiInput keyEvent Home + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Home"], + capture_output=True + ) + time.sleep(delay) + + +def launch_app( + app_name: str, device_id: str | None = None, delay: float | None = None +) -> bool: + """ + Launch an app by name. + + Args: + app_name: The app name (must be in APP_PACKAGES). + device_id: Optional HDC device ID. + delay: Delay in seconds after launching. If None, uses configured default. + + Returns: + True if app was launched, False if app not found. + """ + if delay is None: + delay = TIMING_CONFIG.device.default_launch_delay + + if app_name not in APP_PACKAGES: + print(f"[HDC] App '{app_name}' not found in HarmonyOS app list") + print(f"[HDC] Available apps: {', '.join(sorted(APP_PACKAGES.keys())[:10])}...") + return False + + hdc_prefix = _get_hdc_prefix(device_id) + bundle = APP_PACKAGES[app_name] + + # Get the ability name for this bundle + # Default to "EntryAbility" if not specified in APP_ABILITIES + ability = APP_ABILITIES.get(bundle, "EntryAbility") + + # HarmonyOS uses 'aa start' command to launch apps + # Format: aa start -b {bundle} -a {ability} + _run_hdc_command( + hdc_prefix + + [ + "shell", + "aa", + "start", + "-b", + bundle, + "-a", + ability, + ], + capture_output=True, + ) + time.sleep(delay) + return True + + +def _get_hdc_prefix(device_id: str | None) -> list: + """Get HDC command prefix with optional device specifier.""" + if device_id: + return ["hdc", "-t", device_id] + return ["hdc"] diff --git a/phone_agent/hdc/input.py b/phone_agent/hdc/input.py new file mode 100644 index 0000000..920cf7d --- /dev/null +++ b/phone_agent/hdc/input.py @@ -0,0 +1,149 @@ +"""Input utilities for HarmonyOS device text input.""" + +import base64 +import subprocess +from typing import Optional + +from phone_agent.hdc.connection import _run_hdc_command + + +def type_text(text: str, device_id: str | None = None) -> None: + """ + Type text into the currently focused input field. + + Args: + text: The text to type. Supports multi-line text with newline characters. + device_id: Optional HDC device ID for multi-device setups. + + Note: + HarmonyOS uses: hdc shell uitest uiInput text "文本内容" + This command works without coordinates when input field is focused. + For multi-line text, the function splits by newlines and sends ENTER keyEvents. + ENTER key code in HarmonyOS: 2054 + Recommendation: Click on the input field first to focus it, then use this function. + """ + hdc_prefix = _get_hdc_prefix(device_id) + + # Handle multi-line text by splitting on newlines + if '\n' in text: + lines = text.split('\n') + for i, line in enumerate(lines): + if line: # Only process non-empty lines + # Escape special characters for shell + escaped_line = line.replace('"', '\\"').replace("$", "\\$") + + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "text", escaped_line], + capture_output=True, + text=True, + ) + + # Send ENTER key event after each line except the last one + if i < len(lines) - 1: + try: + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"], + capture_output=True, + text=True, + ) + except Exception as e: + print(f"[HDC] ENTER keyEvent failed: {e}") + else: + # Single line text - original logic + # Escape special characters for shell (keep quotes for proper text handling) + # The text will be wrapped in quotes in the command + escaped_text = text.replace('"', '\\"').replace("$", "\\$") + + # HarmonyOS uitest uiInput text command + # Format: hdc shell uitest uiInput text "文本内容" + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "text", escaped_text], + capture_output=True, + text=True, + ) + + +def clear_text(device_id: str | None = None) -> None: + """ + Clear text in the currently focused input field. + + Args: + device_id: Optional HDC device ID for multi-device setups. + + Note: + This method uses repeated delete key events to clear text. + For HarmonyOS, you might also use select all + delete for better efficiency. + """ + hdc_prefix = _get_hdc_prefix(device_id) + # Ctrl+A to select all (key code 2072 for Ctrl, 2017 for A) + # Then delete + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2072", "2017"], + capture_output=True, + text=True, + ) + _run_hdc_command( + hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2055"], # Delete key + capture_output=True, + text=True, + ) + + +def detect_and_set_adb_keyboard(device_id: str | None = None) -> str: + """ + Detect current keyboard and switch to ADB Keyboard if available. + + Args: + device_id: Optional HDC device ID for multi-device setups. + + Returns: + The original keyboard IME identifier for later restoration. + + Note: + This is a placeholder. HarmonyOS may not support ADB Keyboard. + If there's a similar tool for HarmonyOS, integrate it here. + """ + hdc_prefix = _get_hdc_prefix(device_id) + + # Get current IME (if HarmonyOS supports this) + try: + result = _run_hdc_command( + hdc_prefix + ["shell", "settings", "get", "secure", "default_input_method"], + capture_output=True, + text=True, + ) + current_ime = (result.stdout + result.stderr).strip() + + # If ADB Keyboard equivalent exists for HarmonyOS, switch to it + # For now, we'll just return the current IME + return current_ime + except Exception: + return "" + + +def restore_keyboard(ime: str, device_id: str | None = None) -> None: + """ + Restore the original keyboard IME. + + Args: + ime: The IME identifier to restore. + device_id: Optional HDC device ID for multi-device setups. + """ + if not ime: + return + + hdc_prefix = _get_hdc_prefix(device_id) + + try: + _run_hdc_command( + hdc_prefix + ["shell", "ime", "set", ime], capture_output=True, text=True + ) + except Exception: + pass + + +def _get_hdc_prefix(device_id: str | None) -> list: + """Get HDC command prefix with optional device specifier.""" + if device_id: + return ["hdc", "-t", device_id] + return ["hdc"] diff --git a/phone_agent/hdc/screenshot.py b/phone_agent/hdc/screenshot.py new file mode 100644 index 0000000..332d198 --- /dev/null +++ b/phone_agent/hdc/screenshot.py @@ -0,0 +1,125 @@ +"""Screenshot utilities for capturing HarmonyOS device screen.""" + +import base64 +import os +import subprocess +import tempfile +import uuid +from dataclasses import dataclass +from io import BytesIO +from typing import Tuple + +from PIL import Image +from phone_agent.hdc.connection import _run_hdc_command + + +@dataclass +class Screenshot: + """Represents a captured screenshot.""" + + base64_data: str + width: int + height: int + is_sensitive: bool = False + + +def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screenshot: + """ + Capture a screenshot from the connected HarmonyOS device. + + Args: + device_id: Optional HDC device ID for multi-device setups. + timeout: Timeout in seconds for screenshot operations. + + Returns: + Screenshot object containing base64 data and dimensions. + + Note: + If the screenshot fails (e.g., on sensitive screens like payment pages), + a black fallback image is returned with is_sensitive=True. + """ + temp_path = os.path.join(tempfile.gettempdir(), f"screenshot_{uuid.uuid4()}.png") + hdc_prefix = _get_hdc_prefix(device_id) + + try: + # Execute screenshot command + # HarmonyOS HDC only supports JPEG format + remote_path = "/data/local/tmp/tmp_screenshot.jpeg" + + # Try method 1: hdc shell screenshot (newer HarmonyOS versions) + result = _run_hdc_command( + hdc_prefix + ["shell", "screenshot", remote_path], + capture_output=True, + text=True, + timeout=timeout, + ) + + # Check for screenshot failure (sensitive screen) + output = result.stdout + result.stderr + if "fail" in output.lower() or "error" in output.lower() or "not found" in output.lower(): + # Try method 2: snapshot_display (older versions or different devices) + result = _run_hdc_command( + hdc_prefix + ["shell", "snapshot_display", "-f", remote_path], + capture_output=True, + text=True, + timeout=timeout, + ) + output = result.stdout + result.stderr + if "fail" in output.lower() or "error" in output.lower(): + return _create_fallback_screenshot(is_sensitive=True) + + # Pull screenshot to local temp path + # Note: remote file is JPEG, but PIL can open it regardless of local extension + _run_hdc_command( + hdc_prefix + ["file", "recv", remote_path, temp_path], + capture_output=True, + text=True, + timeout=5, + ) + + if not os.path.exists(temp_path): + return _create_fallback_screenshot(is_sensitive=False) + + # Read JPEG image and convert to PNG for model inference + # PIL automatically detects the image format from file content + img = Image.open(temp_path) + width, height = img.size + + buffered = BytesIO() + img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + + # Cleanup + os.remove(temp_path) + + return Screenshot( + base64_data=base64_data, width=width, height=height, is_sensitive=False + ) + + except Exception as e: + print(f"Screenshot error: {e}") + return _create_fallback_screenshot(is_sensitive=False) + + +def _get_hdc_prefix(device_id: str | None) -> list: + """Get HDC command prefix with optional device specifier.""" + if device_id: + return ["hdc", "-t", device_id] + return ["hdc"] + + +def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot: + """Create a black fallback image when screenshot fails.""" + default_width, default_height = 1080, 2400 + + black_img = Image.new("RGB", (default_width, default_height), color="black") + buffered = BytesIO() + black_img.save(buffered, format="PNG") + base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") + + return Screenshot( + base64_data=base64_data, + width=default_width, + height=default_height, + is_sensitive=is_sensitive, + )