Merge pull request #221 from floatingstarZ/hzy_1216_v4_ok

支持鸿蒙OSNEXT的HDC
This commit is contained in:
yongbin-buaa
2025-12-17 17:11:54 +08:00
committed by GitHub
12 changed files with 1613 additions and 98 deletions

194
main.py
View File

@@ -23,20 +23,24 @@ from urllib.parse import urlparse
from openai import OpenAI
from phone_agent import PhoneAgent
from phone_agent.adb import ADBConnection, list_devices
from phone_agent.agent import AgentConfig
from phone_agent.config.apps import list_supported_apps
from phone_agent.config.apps_harmonyos import list_supported_apps as list_harmonyos_apps
from phone_agent.device_factory import DeviceType, get_device_factory, set_device_type
from phone_agent.model import ModelConfig
def check_system_requirements() -> bool:
def check_system_requirements(device_type: DeviceType = DeviceType.ADB) -> bool:
"""
Check system requirements before running the agent.
Checks:
1. ADB tools installed
1. ADB/HDC tools installed
2. At least one device connected
3. ADB Keyboard installed on the device
3. ADB Keyboard installed on the device (for ADB only)
Args:
device_type: Type of device tool (ADB or HDC).
Returns:
True if all checks pass, False otherwise.
@@ -46,38 +50,47 @@ def check_system_requirements() -> bool:
all_passed = True
# Check 1: ADB installed
print("1. Checking ADB installation...", end=" ")
if shutil.which("adb") is None:
# Determine tool name and command
tool_name = "ADB" if device_type == DeviceType.ADB else "HDC"
tool_cmd = "adb" if device_type == DeviceType.ADB else "hdc"
# Check 1: Tool installed
print(f"1. Checking {tool_name} installation...", end=" ")
if shutil.which(tool_cmd) is None:
print("❌ FAILED")
print(" Error: ADB is not installed or not in PATH.")
print(" Solution: Install Android SDK Platform Tools:")
print(" - macOS: brew install android-platform-tools")
print(" - Linux: sudo apt install android-tools-adb")
print(
" - Windows: Download from https://developer.android.com/studio/releases/platform-tools"
)
print(f" Error: {tool_name} is not installed or not in PATH.")
print(f" Solution: Install {tool_name}:")
if device_type == DeviceType.ADB:
print(" - macOS: brew install android-platform-tools")
print(" - Linux: sudo apt install android-tools-adb")
print(
" - Windows: Download from https://developer.android.com/studio/releases/platform-tools"
)
else:
print(" - Download from HarmonyOS SDK or https://gitee.com/openharmony/docs")
print(" - Add to PATH environment variable")
all_passed = False
else:
# Double check by running adb version
# Double check by running version command
try:
version_cmd = [tool_cmd, "version"] if device_type == DeviceType.ADB else [tool_cmd, "-v"]
result = subprocess.run(
["adb", "version"], capture_output=True, text=True, timeout=10
version_cmd, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
version_line = result.stdout.strip().split("\n")[0]
print(f"✅ OK ({version_line})")
else:
print("❌ FAILED")
print(" Error: ADB command failed to run.")
print(f" Error: {tool_name} command failed to run.")
all_passed = False
except FileNotFoundError:
print("❌ FAILED")
print(" Error: ADB command not found.")
print(f" Error: {tool_name} command not found.")
all_passed = False
except subprocess.TimeoutExpired:
print("❌ FAILED")
print(" Error: ADB command timed out.")
print(f" Error: {tool_name} command timed out.")
all_passed = False
# If ADB is not installed, skip remaining checks
@@ -89,27 +102,42 @@ def check_system_requirements() -> bool:
# Check 2: Device connected
print("2. Checking connected devices...", end=" ")
try:
result = subprocess.run(
["adb", "devices"], capture_output=True, text=True, timeout=10
)
lines = result.stdout.strip().split("\n")
# Filter out header and empty lines, look for 'device' status
devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line]
if device_type == DeviceType.ADB:
result = subprocess.run(
["adb", "devices"], capture_output=True, text=True, timeout=10
)
lines = result.stdout.strip().split("\n")
# Filter out header and empty lines, look for 'device' status
devices = [line for line in lines[1:] if line.strip() and "\tdevice" in line]
else: # HDC
result = subprocess.run(
["hdc", "list", "targets"], capture_output=True, text=True, timeout=10
)
lines = result.stdout.strip().split("\n")
devices = [line for line in lines if line.strip()]
if not devices:
print("❌ FAILED")
print(" Error: No devices connected.")
print(" Solution:")
print(" 1. Enable USB debugging on your Android device")
print(" 2. Connect via USB and authorize the connection")
print(" 3. Or connect remotely: python main.py --connect <ip>:<port>")
if device_type == DeviceType.ADB:
print(" 1. Enable USB debugging on your Android device")
print(" 2. Connect via USB and authorize the connection")
print(" 3. Or connect remotely: python main.py --connect <ip>:<port>")
else:
print(" 1. Enable USB debugging on your HarmonyOS device")
print(" 2. Connect via USB and authorize the connection")
print(" 3. Or connect remotely: python main.py --device-type hdc --connect <ip>:<port>")
all_passed = False
else:
device_ids = [d.split("\t")[0] for d in devices]
if device_type == DeviceType.ADB:
device_ids = [d.split("\t")[0] for d in devices]
else:
device_ids = [d.strip() for d in devices]
print(f"✅ OK ({len(devices)} device(s): {', '.join(device_ids)})")
except subprocess.TimeoutExpired:
print("❌ FAILED")
print(" Error: ADB command timed out.")
print(f" Error: {tool_name} command timed out.")
all_passed = False
except Exception as e:
print("❌ FAILED")
@@ -122,40 +150,45 @@ def check_system_requirements() -> bool:
print("❌ System check failed. Please fix the issues above.")
return False
# Check 3: ADB Keyboard installed
print("3. Checking ADB Keyboard...", end=" ")
try:
result = subprocess.run(
["adb", "shell", "ime", "list", "-s"],
capture_output=True,
text=True,
timeout=10,
)
ime_list = result.stdout.strip()
# Check 3: ADB Keyboard installed (only for ADB)
if device_type == DeviceType.ADB:
print("3. Checking ADB Keyboard...", end=" ")
try:
result = subprocess.run(
["adb", "shell", "ime", "list", "-s"],
capture_output=True,
text=True,
timeout=10,
)
ime_list = result.stdout.strip()
if "com.android.adbkeyboard/.AdbIME" in ime_list:
print("✅ OK")
else:
if "com.android.adbkeyboard/.AdbIME" in ime_list:
print("✅ OK")
else:
print("❌ FAILED")
print(" Error: ADB Keyboard is not installed on the device.")
print(" Solution:")
print(" 1. Download ADB Keyboard APK from:")
print(
" https://github.com/senzhk/ADBKeyBoard/blob/master/ADBKeyboard.apk"
)
print(" 2. Install it on your device: adb install ADBKeyboard.apk")
print(
" 3. Enable it in Settings > System > Languages & Input > Virtual Keyboard"
)
all_passed = False
except subprocess.TimeoutExpired:
print("❌ FAILED")
print(" Error: ADB Keyboard is not installed on the device.")
print(" Solution:")
print(" 1. Download ADB Keyboard APK from:")
print(
" https://github.com/senzhk/ADBKeyBoard/blob/master/ADBKeyboard.apk"
)
print(" 2. Install it on your device: adb install ADBKeyboard.apk")
print(
" 3. Enable it in Settings > System > Languages & Input > Virtual Keyboard"
)
print(" Error: ADB command timed out.")
all_passed = False
except subprocess.TimeoutExpired:
print("❌ FAILED")
print(" Error: ADB command timed out.")
all_passed = False
except Exception as e:
print("❌ FAILED")
print(f" Error: {e}")
all_passed = False
except Exception as e:
print("❌ FAILED")
print(f" Error: {e}")
all_passed = False
else:
# For HDC, skip keyboard check as it uses different input method
print("3. Skipping keyboard check for HarmonyOS...", end=" ")
print("✅ OK (using native input)")
print("-" * 50)
@@ -368,6 +401,14 @@ Examples:
help="Language for system prompt (cn or en, default: cn)",
)
parser.add_argument(
"--device-type",
type=str,
choices=["adb", "hdc"],
default=os.getenv("PHONE_AGENT_DEVICE_TYPE", "adb"),
help="Device type: adb for Android, hdc for HarmonyOS (default: adb)",
)
parser.add_argument(
"task",
nargs="?",
@@ -385,11 +426,13 @@ def handle_device_commands(args) -> bool:
Returns:
True if a device command was handled (should exit), False otherwise.
"""
conn = ADBConnection()
device_factory = get_device_factory()
ConnectionClass = device_factory.get_connection_class()
conn = ConnectionClass()
# Handle --list-devices
if args.list_devices:
devices = list_devices()
devices = device_factory.list_devices()
if not devices:
print("No devices connected.")
else:
@@ -452,10 +495,25 @@ def main():
"""Main entry point."""
args = parse_args()
# Set device type globally based on args
device_type = DeviceType.ADB if args.device_type == "adb" else DeviceType.HDC
set_device_type(device_type)
# Enable HDC verbose mode if using HDC
if device_type == DeviceType.HDC:
from phone_agent.hdc import set_hdc_verbose
set_hdc_verbose(True)
# Handle --list-apps (no system check needed)
if args.list_apps:
print("Supported apps:")
for app in sorted(list_supported_apps()):
if device_type == DeviceType.HDC:
print("Supported HarmonyOS apps:")
apps = list_harmonyos_apps()
else:
print("Supported Android apps:")
apps = list_supported_apps()
for app in apps:
print(f" - {app}")
return
@@ -464,7 +522,7 @@ def main():
return
# Run system requirements check before proceeding
if not check_system_requirements():
if not check_system_requirements(device_type):
sys.exit(1)
# Check model API connectivity and model availability
@@ -500,9 +558,11 @@ def main():
print(f"Base URL: {model_config.base_url}")
print(f"Max Steps: {agent_config.max_steps}")
print(f"Language: {agent_config.lang}")
print(f"Device Type: {args.device_type.upper()}")
# Show device info
devices = list_devices()
device_factory = get_device_factory()
devices = device_factory.list_devices()
if agent_config.device_id:
print(f"Device: {agent_config.device_id}")
elif devices:

View File

@@ -2,24 +2,13 @@
import ast
import re
import subprocess
import time
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.adb import (
back,
clear_text,
detect_and_set_adb_keyboard,
double_tap,
home,
launch_app,
long_press,
restore_keyboard,
swipe,
tap,
type_text,
)
from phone_agent.config.timing import TIMING_CONFIG
from phone_agent.device_factory import get_device_factory
@dataclass
@@ -132,7 +121,8 @@ class ActionHandler:
if not app_name:
return ActionResult(False, False, "No app name specified")
success = launch_app(app_name, self.device_id)
device_factory = get_device_factory()
success = device_factory.launch_app(app_name, self.device_id)
if success:
return ActionResult(True, False)
return ActionResult(False, False, f"App not found: {app_name}")
@@ -154,26 +144,30 @@ class ActionHandler:
message="User cancelled sensitive operation",
)
tap(x, y, self.device_id)
device_factory = get_device_factory()
device_factory.tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_type(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle text input action."""
text = action.get("text", "")
device_factory = get_device_factory()
# Switch to ADB keyboard
original_ime = detect_and_set_adb_keyboard(self.device_id)
original_ime = device_factory.detect_and_set_adb_keyboard(self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_switch_delay)
# Clear existing text and type new text
clear_text(self.device_id)
device_factory.clear_text(self.device_id)
time.sleep(TIMING_CONFIG.action.text_clear_delay)
type_text(text, self.device_id)
# Handle multiline text by splitting on newlines
device_factory.type_text(text, self.device_id)
time.sleep(TIMING_CONFIG.action.text_input_delay)
# Restore original keyboard
restore_keyboard(original_ime, self.device_id)
device_factory.restore_keyboard(original_ime, self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_restore_delay)
return ActionResult(True, False)
@@ -189,17 +183,20 @@ class ActionHandler:
start_x, start_y = self._convert_relative_to_absolute(start, width, height)
end_x, end_y = self._convert_relative_to_absolute(end, width, height)
swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
device_factory = get_device_factory()
device_factory.swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_back(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle back button action."""
back(self.device_id)
device_factory = get_device_factory()
device_factory.back(self.device_id)
return ActionResult(True, False)
def _handle_home(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle home button action."""
home(self.device_id)
device_factory = get_device_factory()
device_factory.home(self.device_id)
return ActionResult(True, False)
def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult:
@@ -209,7 +206,8 @@ class ActionHandler:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
double_tap(x, y, self.device_id)
device_factory = get_device_factory()
device_factory.double_tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult:
@@ -219,7 +217,8 @@ class ActionHandler:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
long_press(x, y, device_id=self.device_id)
device_factory = get_device_factory()
device_factory.long_press(x, y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult:
@@ -256,6 +255,68 @@ class ActionHandler:
# This action signals that user input is needed
return ActionResult(True, False, message="User interaction required")
def _send_keyevent(self, keycode: str) -> None:
"""Send a keyevent to the device."""
from phone_agent.device_factory import DeviceType, get_device_factory
from phone_agent.hdc.connection import _run_hdc_command
device_factory = get_device_factory()
# Handle HDC devices with HarmonyOS-specific keyEvent command
if device_factory.device_type == DeviceType.HDC:
hdc_prefix = ["hdc", "-t", self.device_id] if self.device_id else ["hdc"]
# Map common keycodes to HarmonyOS keyEvent codes
# KEYCODE_ENTER (66) -> 2054 (HarmonyOS Enter key code)
if keycode == "KEYCODE_ENTER" or keycode == "66":
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# For other keys, try to use the numeric code directly
# If keycode is a string like "KEYCODE_ENTER", convert it
try:
# Try to extract numeric code from string or use as-is
if keycode.startswith("KEYCODE_"):
# For now, only handle ENTER, other keys may need mapping
if "ENTER" in keycode:
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# Fallback to ADB-style command for unsupported keys
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# Assume it's a numeric code
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", str(keycode)],
capture_output=True,
text=True,
)
except Exception:
# Fallback to ADB-style command
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# ADB devices use standard input keyevent command
cmd_prefix = ["adb", "-s", self.device_id] if self.device_id else ["adb"]
subprocess.run(
cmd_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
@staticmethod
def _default_confirmation(message: str) -> bool:
"""Default confirmation callback using console input."""
@@ -281,6 +342,7 @@ def parse_action(response: str) -> dict[str, Any]:
Raises:
ValueError: If the response cannot be parsed.
"""
print(f"Parsing action: {response}")
try:
response = response.strip()
if response.startswith('do(action="Type"') or response.startswith(
@@ -292,6 +354,11 @@ def parse_action(response: str) -> dict[str, Any]:
elif response.startswith("do"):
# Use AST parsing instead of eval for safety
try:
# Escape special characters (newlines, tabs, etc.) for valid Python syntax
response = response.replace('\n', '\\n')
response = response.replace('\r', '\\r')
response = response.replace('\t', '\\t')
tree = ast.parse(response, mode="eval")
if not isinstance(tree.body, ast.Call):
raise ValueError("Expected a function call")

View File

@@ -109,7 +109,7 @@ class ADBConnection:
if address:
cmd.append(address)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
output = result.stdout + result.stderr
return True, output.strip() or "Disconnected"
@@ -241,7 +241,7 @@ class ADBConnection:
cmd.extend(["-s", device_id])
cmd.extend(["tcpip", str(port)])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10)
output = result.stdout + result.stderr
@@ -270,7 +270,7 @@ class ADBConnection:
cmd.extend(["-s", device_id])
cmd.extend(["shell", "ip", "route"])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
# Parse IP from route output
for line in result.stdout.split("\n"):
@@ -286,6 +286,7 @@ class ADBConnection:
cmd[:-1] + ["shell", "ip", "addr", "show", "wlan0"],
capture_output=True,
text=True,
encoding="utf-8",
timeout=5,
)

View File

@@ -22,9 +22,11 @@ def get_current_app(device_id: str | None = None) -> str:
adb_prefix = _get_adb_prefix(device_id)
result = subprocess.run(
adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True
adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True, encoding="utf-8"
)
output = result.stdout
if not output:
raise ValueError("No output from dumpsys window")
# Parse window focus info
for line in output.split("\n"):

View File

@@ -7,8 +7,8 @@ from typing import Any, Callable
from phone_agent.actions import ActionHandler
from phone_agent.actions.handler import do, finish, parse_action
from phone_agent.adb import get_current_app, get_screenshot
from phone_agent.config import get_messages, get_system_prompt
from phone_agent.device_factory import get_device_factory
from phone_agent.model import ModelClient, ModelConfig
from phone_agent.model.client import MessageBuilder
@@ -140,8 +140,9 @@ class PhoneAgent:
self._step_count += 1
# Capture current screen state
screenshot = get_screenshot(self.agent_config.device_id)
current_app = get_current_app(self.agent_config.device_id)
device_factory = get_device_factory()
screenshot = device_factory.get_screenshot(self.agent_config.device_id)
current_app = device_factory.get_current_app(self.agent_config.device_id)
# Build messages
if is_first:

View File

@@ -0,0 +1,266 @@
"""HarmonyOS application package name mappings.
Maps user-friendly app names to HarmonyOS bundle names.
These bundle names are used with the 'hdc shell aa start -b <bundle>' command.
"""
# Custom ability names for apps that don't use the default "EntryAbility"
# Maps bundle_name -> ability_name
# Generated by: python test/find_abilities.py
APP_ABILITIES: dict[str, str] = {
# Third-party apps
"cn.wps.mobileoffice.hap": "DocumentAbility",
"com.ccb.mobilebank.hm": "CcbMainAbility",
"com.dewu.hos": "HomeAbility",
"com.larus.nova.hm": "MainAbility",
"com.luna.hm.music": "MainAbility",
"com.meitu.meitupic": "MainAbility",
"com.ss.hm.article.news": "MainAbility",
"com.ss.hm.ugc.aweme": "MainAbility",
"com.taobao.taobao4hmos": "Taobao_mainAbility",
"com.tencent.videohm": "AppAbility",
"com.ximalaya.ting.xmharmony": "MainBundleAbility",
"com.zhihu.hmos": "PhoneAbility",
# Huawei system apps
"com.huawei.hmos.browser": "MainAbility",
"com.huawei.hmos.calculator": "com.huawei.hmos.calculator.CalculatorAbility",
"com.huawei.hmos.calendar": "MainAbility",
"com.huawei.hmos.camera": "com.huawei.hmos.camera.MainAbility",
"com.huawei.hmos.clock": "com.huawei.hmos.clock.phone",
"com.huawei.hmos.clouddrive": "MainAbility",
"com.huawei.hmos.email": "ApplicationAbility",
"com.huawei.hmos.filemanager": "MainAbility",
"com.huawei.hmos.health": "Activity_card_entryAbility",
"com.huawei.hmos.notepad": "MainAbility",
"com.huawei.hmos.photos": "MainAbility",
"com.huawei.hmos.screenrecorder": "com.huawei.hmos.screenrecorder.ServiceExtAbility",
"com.huawei.hmos.screenshot": "com.huawei.hmos.screenshot.ServiceExtAbility",
"com.huawei.hmos.settings": "com.huawei.hmos.settings.MainAbility",
"com.huawei.hmos.soundrecorder": "MainAbility",
"com.huawei.hmos.vassistant": "AiCaptionServiceExtAbility",
"com.huawei.hmos.wallet": "MainAbility",
# Huawei services
"com.huawei.hmsapp.appgallery": "MainAbility",
"com.huawei.hmsapp.books": "MainAbility",
"com.huawei.hmsapp.himovie": "MainAbility",
"com.huawei.hmsapp.hisearch": "MainAbility",
"com.huawei.hmsapp.music": "MainAbility",
"com.huawei.hmsapp.thememanager": "MainAbility",
"com.huawei.hmsapp.totemweather": "com.huawei.hmsapp.totemweather.MainAbility",
# OHOS system apps
"com.ohos.callui": "com.ohos.callui.ServiceAbility",
"com.ohos.contacts": "com.ohos.contacts.MainAbility",
"com.ohos.mms": "com.ohos.mms.MainAbility",
}
APP_PACKAGES: dict[str, str] = {
# Social & Messaging
"微信": "com.tencent.wechat",
"QQ": "com.tencent.mqq",
"微博": "com.sina.weibo.stage",
# E-commerce
"淘宝": "com.taobao.taobao4hmos",
"京东": "com.jd.hm.mall",
"拼多多": "com.xunmeng.pinduoduo.hos",
"淘宝闪购": "com.taobao.taobao4hmos",
"京东秒送": "com.jd.hm.mall",
# Lifestyle & Social
"小红书": "com.xingin.xhs_hos",
"知乎": "com.zhihu.hmos",
# "豆瓣": "com.douban.frodo", # 未在 hdc 列表中找到
# Maps & Navigation
"高德地图": "com.amap.hmapp",
"百度地图": "com.baidu.hmmap",
# Food & Services
"美团": "com.sankuai.hmeituan",
"美团外卖": "com.meituan.takeaway",
"大众点评": "com.sankuai.dianping",
# "肯德基": "com.yek.android.kfc.activitys", # 未在 hdc 列表中找到
# Travel
# "携程": "ctrip.android.view", # 未在 hdc 列表中找到
"铁路12306": "com.chinarailway.ticketingHM",
"12306": "com.chinarailway.ticketingHM",
# "去哪儿": "com.Qunar", # 未在 hdc 列表中找到
# "去哪儿旅行": "com.Qunar", # 未在 hdc 列表中找到
"滴滴出行": "com.sdu.didi.hmos.psnger",
# Video & Entertainment
"bilibili": "yylx.danmaku.bili",
"抖音": "com.ss.hm.ugc.aweme",
"快手": "com.kuaishou.hmapp",
"腾讯视频": "com.tencent.videohm",
"爱奇艺": "com.qiyi.video.hmy",
"芒果TV": "com.mgtv.phone",
# "优酷视频": "com.youku.phone", # 未在 hdc 列表中找到
# "红果短剧": "com.phoenix.read", # 未在 hdc 列表中找到
# Music & Audio
# "网易云音乐": "com.netease.cloudmusic", # 未在 hdc 列表中找到
"QQ音乐": "com.tencent.hm.qqmusic",
"汽水音乐": "com.luna.hm.music",
"喜马拉雅": "com.ximalaya.ting.xmharmony",
# Reading
# "番茄小说": "com.dragon.read", # 未在 hdc 列表中找到
# "番茄免费小说": "com.dragon.read", # 未在 hdc 列表中找到
# "七猫免费小说": "com.kmxs.reader", # 未在 hdc 列表中找到
# Productivity
"飞书": "com.ss.feishu",
# "QQ邮箱": "com.tencent.androidqqmail", # 未在 hdc 列表中找到
# AI & Tools
"豆包": "com.larus.nova.hm",
# Health & Fitness
# "keep": "com.gotokeep.keep", # 未在 hdc 列表中找到
# "美柚": "com.lingan.seeyou", # 未在 hdc 列表中找到
# News & Information
# "腾讯新闻": "com.tencent.news", # 未在 hdc 列表中找到
"今日头条": "com.ss.hm.article.news",
# Real Estate
# "贝壳找房": "com.lianjia.beike", # 未在 hdc 列表中找到
# "安居客": "com.anjuke.android.app", # 未在 hdc 列表中找到
# Finance
# "同花顺": "com.hexin.plat.android", # 未在 hdc 列表中找到
# Games
# "星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到
# "崩坏:星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到
# "恋与深空": "com.papegames.lysk.cn", # 未在 hdc 列表中找到
# HarmonyOS 第三方应用
"百度": "com.baidu.baiduapp",
"阿里巴巴": "com.alibaba.wireless_hmos",
"WPS": "cn.wps.mobileoffice.hap",
"企业微信": "com.tencent.wework.hmos",
"同程": "com.tongcheng.hmos",
"同程旅行": "com.tongcheng.hmos",
"唯品会": "com.vip.hosapp",
"支付宝": "com.alipay.mobile.client",
"UC浏览器": "com.uc.mobile",
"闲鱼": "com.taobao.idlefish4ohos",
"转转": "com.zhuanzhuan.hmoszz",
"迅雷": "com.xunlei.thunder",
"搜狗输入法": "com.sogou.input",
"扫描全能王": "com.intsig.camscanner.hap",
"美图秀秀": "com.meitu.meitupic",
"58同城": "com.wuba.life",
"得物": "com.dewu.hos",
"海底捞": "com.haidilao.haros",
"中国移动": "com.droi.tong",
"中国联通": "com.sinovatech.unicom.ha",
"国家税务总局": "cn.gov.chinatax.gt4.hm",
"建设银行": "com.ccb.mobilebank.hm",
"快手极速版": "com.kuaishou.hmnebula",
# HarmonyOS 系统应用 - 工具类
"浏览器": "com.huawei.hmos.browser",
"计算器": "com.huawei.hmos.calculator",
"日历": "com.huawei.hmos.calendar",
"相机": "com.huawei.hmos.camera",
"时钟": "com.huawei.hmos.clock",
"云盘": "com.huawei.hmos.clouddrive",
"云空间": "com.huawei.hmos.clouddrive",
"邮件": "com.huawei.hmos.email",
"文件管理器": "com.huawei.hmos.filemanager",
"文件": "com.huawei.hmos.files",
"查找设备": "com.huawei.hmos.finddevice",
"查找手机": "com.huawei.hmos.finddevice",
"录音机": "com.huawei.hmos.soundrecorder",
"录音": "com.huawei.hmos.soundrecorder",
"录屏": "com.huawei.hmos.screenrecorder",
"截屏": "com.huawei.hmos.screenshot",
"笔记": "com.huawei.hmos.notepad",
"备忘录": "com.huawei.hmos.notepad",
# HarmonyOS 系统应用 - 媒体类
"相册": "com.huawei.hmos.photos",
"图库": "com.huawei.hmos.photos",
# "视频": "com.huawei.hmos.mediaplayer", # 未在 hdc 列表中找到,但有 com.huawei.hmsapp.himovie
# HarmonyOS 系统应用 - 通讯类
"联系人": "com.ohos.contacts",
"通讯录": "com.ohos.contacts",
"短信": "com.ohos.mms",
"信息": "com.ohos.mms",
"电话": "com.ohos.callui",
"拨号": "com.ohos.callui",
# HarmonyOS 系统应用 - 设置类
"设置": "com.huawei.hmos.settings",
"系统设置": "com.huawei.hmos.settings",
"AndroidSystemSettings": "com.huawei.hmos.settings",
"Android System Settings": "com.huawei.hmos.settings",
"Android System Settings": "com.huawei.hmos.settings",
"Android-System-Settings": "com.huawei.hmos.settings",
"Settings": "com.huawei.hmos.settings",
# HarmonyOS 系统应用 - 生活服务
"健康": "com.huawei.hmos.health",
"运动健康": "com.huawei.hmos.health",
"地图": "com.huawei.hmos.maps.app",
"华为地图": "com.huawei.hmos.maps.app",
"钱包": "com.huawei.hmos.wallet",
"华为钱包": "com.huawei.hmos.wallet",
"智慧生活": "com.huawei.hmos.ailife",
"智能助手": "com.huawei.hmos.vassistant",
"小艺": "com.huawei.hmos.vassistant",
# HarmonyOS 服务
"应用市场": "com.huawei.hmsapp.appgallery",
"华为应用市场": "com.huawei.hmsapp.appgallery",
"音乐": "com.huawei.hmsapp.music",
"华为音乐": "com.huawei.hmsapp.music",
"主题": "com.huawei.hmsapp.thememanager",
"主题管理": "com.huawei.hmsapp.thememanager",
"天气": "com.huawei.hmsapp.totemweather",
"华为天气": "com.huawei.hmsapp.totemweather",
"视频": "com.huawei.hmsapp.himovie",
"华为视频": "com.huawei.hmsapp.himovie",
"阅读": "com.huawei.hmsapp.books",
"华为阅读": "com.huawei.hmsapp.books",
"游戏中心": "com.huawei.hmsapp.gamecenter",
"华为游戏中心": "com.huawei.hmsapp.gamecenter",
"搜索": "com.huawei.hmsapp.hisearch",
"华为搜索": "com.huawei.hmsapp.hisearch",
"指南针": "com.huawei.hmsapp.compass",
"会员中心": "com.huawei.hmos.myhuawei",
"我的华为": "com.huawei.hmos.myhuawei",
"华为会员": "com.huawei.hmos.myhuawei",
}
def get_package_name(app_name: str) -> str | None:
"""
Get the package name for an app.
Args:
app_name: The display name of the app.
Returns:
The HarmonyOS bundle name, or None if not found.
"""
return APP_PACKAGES.get(app_name)
def get_app_name(package_name: str) -> str | None:
"""
Get the app name from a package name.
Args:
package_name: The HarmonyOS bundle name.
Returns:
The display name of the app, or None if not found.
"""
for name, package in APP_PACKAGES.items():
if package == package_name:
return name
return None
def list_supported_apps() -> list[str]:
"""
Get a list of all supported app names.
Returns:
List of app names.
"""
return list(APP_PACKAGES.keys())

View File

@@ -0,0 +1,138 @@
"""Device factory for selecting ADB or HDC based on device type."""
from enum import Enum
from typing import Any
class DeviceType(Enum):
"""Type of device connection tool."""
ADB = "adb"
HDC = "hdc"
class DeviceFactory:
"""
Factory class for getting device-specific implementations.
This allows the system to work with both Android (ADB) and HarmonyOS (HDC) devices.
"""
def __init__(self, device_type: DeviceType = DeviceType.ADB):
"""
Initialize the device factory.
Args:
device_type: The type of device to use (ADB or HDC).
"""
self.device_type = device_type
self._module = None
@property
def module(self):
"""Get the appropriate device module (adb or hdc)."""
if self._module is None:
if self.device_type == DeviceType.ADB:
from phone_agent import adb
self._module = adb
elif self.device_type == DeviceType.HDC:
from phone_agent import hdc
self._module = hdc
else:
raise ValueError(f"Unknown device type: {self.device_type}")
return self._module
def get_screenshot(self, device_id: str | None = None, timeout: int = 10):
"""Get screenshot from device."""
return self.module.get_screenshot(device_id, timeout)
def get_current_app(self, device_id: str | None = None) -> str:
"""Get current app name."""
return self.module.get_current_app(device_id)
def tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
"""Tap at coordinates."""
return self.module.tap(x, y, device_id, delay)
def double_tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
"""Double tap at coordinates."""
return self.module.double_tap(x, y, device_id, delay)
def long_press(self, x: int, y: int, duration_ms: int = 3000, device_id: str | None = None, delay: float | None = None):
"""Long press at coordinates."""
return self.module.long_press(x, y, duration_ms, device_id, delay)
def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, duration_ms: int | None = None, device_id: str | None = None, delay: float | None = None):
"""Swipe from start to end."""
return self.module.swipe(start_x, start_y, end_x, end_y, duration_ms, device_id, delay)
def back(self, device_id: str | None = None, delay: float | None = None):
"""Press back button."""
return self.module.back(device_id, delay)
def home(self, device_id: str | None = None, delay: float | None = None):
"""Press home button."""
return self.module.home(device_id, delay)
def launch_app(self, app_name: str, device_id: str | None = None, delay: float | None = None) -> bool:
"""Launch an app."""
return self.module.launch_app(app_name, device_id, delay)
def type_text(self, text: str, device_id: str | None = None):
"""Type text."""
return self.module.type_text(text, device_id)
def clear_text(self, device_id: str | None = None):
"""Clear text."""
return self.module.clear_text(device_id)
def detect_and_set_adb_keyboard(self, device_id: str | None = None) -> str:
"""Detect and set keyboard."""
return self.module.detect_and_set_adb_keyboard(device_id)
def restore_keyboard(self, ime: str, device_id: str | None = None):
"""Restore keyboard."""
return self.module.restore_keyboard(ime, device_id)
def list_devices(self):
"""List connected devices."""
return self.module.list_devices()
def get_connection_class(self):
"""Get the connection class (ADBConnection or HDCConnection)."""
if self.device_type == DeviceType.ADB:
from phone_agent.adb import ADBConnection
return ADBConnection
elif self.device_type == DeviceType.HDC:
from phone_agent.hdc import HDCConnection
return HDCConnection
else:
raise ValueError(f"Unknown device type: {self.device_type}")
# Global device factory instance
_device_factory: DeviceFactory | None = None
def set_device_type(device_type: DeviceType):
"""
Set the global device type.
Args:
device_type: The device type to use (ADB or HDC).
"""
global _device_factory
_device_factory = DeviceFactory(device_type)
def get_device_factory() -> DeviceFactory:
"""
Get the global device factory instance.
Returns:
The device factory instance.
"""
global _device_factory
if _device_factory is None:
_device_factory = DeviceFactory(DeviceType.ADB) # Default to ADB
return _device_factory

View File

@@ -0,0 +1,53 @@
"""HDC utilities for HarmonyOS device interaction."""
from phone_agent.hdc.connection import (
HDCConnection,
ConnectionType,
DeviceInfo,
list_devices,
quick_connect,
set_hdc_verbose,
)
from phone_agent.hdc.device import (
back,
double_tap,
get_current_app,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.hdc.input import (
clear_text,
detect_and_set_adb_keyboard,
restore_keyboard,
type_text,
)
from phone_agent.hdc.screenshot import get_screenshot
__all__ = [
# Screenshot
"get_screenshot",
# Input
"type_text",
"clear_text",
"detect_and_set_adb_keyboard",
"restore_keyboard",
# Device control
"get_current_app",
"tap",
"swipe",
"back",
"home",
"double_tap",
"long_press",
"launch_app",
# Connection management
"HDCConnection",
"DeviceInfo",
"ConnectionType",
"quick_connect",
"list_devices",
"set_hdc_verbose",
]

View File

@@ -0,0 +1,381 @@
"""HDC connection management for HarmonyOS devices."""
import os
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from phone_agent.config.timing import TIMING_CONFIG
# Global flag to control HDC command output
_HDC_VERBOSE = os.getenv("HDC_VERBOSE", "false").lower() in ("true", "1", "yes")
def _run_hdc_command(cmd: list, **kwargs) -> subprocess.CompletedProcess:
"""
Run HDC command with optional verbose output.
Args:
cmd: Command list to execute.
**kwargs: Additional arguments for subprocess.run.
Returns:
CompletedProcess result.
"""
if _HDC_VERBOSE:
print(f"[HDC] Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, **kwargs)
if _HDC_VERBOSE and result.returncode != 0:
print(f"[HDC] Command failed with return code {result.returncode}")
if hasattr(result, 'stderr') and result.stderr:
print(f"[HDC] Error: {result.stderr}")
return result
def set_hdc_verbose(verbose: bool):
"""Set HDC verbose mode globally."""
global _HDC_VERBOSE
_HDC_VERBOSE = verbose
class ConnectionType(Enum):
"""Type of HDC connection."""
USB = "usb"
WIFI = "wifi"
REMOTE = "remote"
@dataclass
class DeviceInfo:
"""Information about a connected device."""
device_id: str
status: str
connection_type: ConnectionType
model: str | None = None
harmony_version: str | None = None
class HDCConnection:
"""
Manages HDC connections to HarmonyOS devices.
Supports USB, WiFi, and remote TCP/IP connections.
Example:
>>> conn = HDCConnection()
>>> # Connect to remote device
>>> conn.connect("192.168.1.100:5555")
>>> # List devices
>>> devices = conn.list_devices()
>>> # Disconnect
>>> conn.disconnect("192.168.1.100:5555")
"""
def __init__(self, hdc_path: str = "hdc"):
"""
Initialize HDC connection manager.
Args:
hdc_path: Path to HDC executable.
"""
self.hdc_path = hdc_path
def connect(self, address: str, timeout: int = 10) -> tuple[bool, str]:
"""
Connect to a remote device via TCP/IP.
Args:
address: Device address in format "host:port" (e.g., "192.168.1.100:5555").
timeout: Connection timeout in seconds.
Returns:
Tuple of (success, message).
Note:
The remote device must have TCP/IP debugging enabled.
"""
# Validate address format
if ":" not in address:
address = f"{address}:5555" # Default HDC port
try:
result = _run_hdc_command(
[self.hdc_path, "tconn", address],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "Connect OK" in output or "connected" in output.lower():
return True, f"Connected to {address}"
elif "already connected" in output.lower():
return True, f"Already connected to {address}"
else:
return False, output.strip()
except subprocess.TimeoutExpired:
return False, f"Connection timeout after {timeout}s"
except Exception as e:
return False, f"Connection error: {e}"
def disconnect(self, address: str | None = None) -> tuple[bool, str]:
"""
Disconnect from a remote device.
Args:
address: Device address to disconnect. If None, disconnects all.
Returns:
Tuple of (success, message).
"""
try:
if address:
cmd = [self.hdc_path, "tdisconn", address]
else:
# HDC doesn't have a "disconnect all" command, so we need to list and disconnect each
devices = self.list_devices()
for device in devices:
if ":" in device.device_id: # Remote device
_run_hdc_command(
[self.hdc_path, "tdisconn", device.device_id],
capture_output=True,
text=True,
timeout=5
)
return True, "Disconnected all remote devices"
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
output = result.stdout + result.stderr
return True, output.strip() or "Disconnected"
except Exception as e:
return False, f"Disconnect error: {e}"
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected devices.
Returns:
List of DeviceInfo objects.
"""
try:
result = _run_hdc_command(
[self.hdc_path, "list", "targets"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
# HDC output format: device_id (status)
# Example: "192.168.1.100:5555" or "FMR0223C13000649"
device_id = line.strip()
# Determine connection type
if ":" in device_id:
conn_type = ConnectionType.REMOTE
else:
conn_type = ConnectionType.USB
# HDC doesn't provide detailed status in list command
# We assume "Connected" status for devices that appear
devices.append(
DeviceInfo(
device_id=device_id,
status="device",
connection_type=conn_type,
model=None,
)
)
return devices
except Exception as e:
print(f"Error listing devices: {e}")
return []
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device ID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return len(devices) > 0
return any(d.device_id == device_id for d in devices)
def enable_tcpip(
self, port: int = 5555, device_id: str | None = None
) -> tuple[bool, str]:
"""
Enable TCP/IP debugging on a USB-connected device.
This allows subsequent wireless connections to the device.
Args:
port: TCP port for HDC (default: 5555).
device_id: Device ID. If None, uses first available device.
Returns:
Tuple of (success, message).
Note:
The device must be connected via USB first.
After this, you can disconnect USB and connect via WiFi.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["tmode", "port", str(port)])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10)
output = result.stdout + result.stderr
if result.returncode == 0 or "success" in output.lower():
time.sleep(TIMING_CONFIG.connection.adb_restart_delay)
return True, f"TCP/IP mode enabled on port {port}"
else:
return False, output.strip()
except Exception as e:
return False, f"Error enabling TCP/IP: {e}"
def get_device_ip(self, device_id: str | None = None) -> str | None:
"""
Get the IP address of a connected device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
IP address string or None if not found.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["shell", "ifconfig"])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
# Parse IP from ifconfig output
for line in result.stdout.split("\n"):
if "inet addr:" in line or "inet " in line:
parts = line.strip().split()
for i, part in enumerate(parts):
if "addr:" in part:
ip = part.split(":")[1]
# Filter out localhost
if not ip.startswith("127."):
return ip
elif part == "inet" and i + 1 < len(parts):
ip = parts[i + 1].split("/")[0]
if not ip.startswith("127."):
return ip
return None
except Exception as e:
print(f"Error getting device IP: {e}")
return None
def restart_server(self) -> tuple[bool, str]:
"""
Restart the HDC server.
Returns:
Tuple of (success, message).
"""
try:
# Kill server
_run_hdc_command(
[self.hdc_path, "kill"], capture_output=True, timeout=5
)
time.sleep(TIMING_CONFIG.connection.server_restart_delay)
# Start server (HDC auto-starts when running commands)
_run_hdc_command(
[self.hdc_path, "start", "-r"], capture_output=True, timeout=5
)
return True, "HDC server restarted"
except Exception as e:
return False, f"Error restarting server: {e}"
def quick_connect(address: str) -> tuple[bool, str]:
"""
Quick helper to connect to a remote device.
Args:
address: Device address (e.g., "192.168.1.100" or "192.168.1.100:5555").
Returns:
Tuple of (success, message).
"""
conn = HDCConnection()
return conn.connect(address)
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected devices.
Returns:
List of DeviceInfo objects.
"""
conn = HDCConnection()
return conn.list_devices()

272
phone_agent/hdc/device.py Normal file
View File

@@ -0,0 +1,272 @@
"""Device control utilities for HarmonyOS automation."""
import os
import subprocess
import time
from typing import List, Optional, Tuple
from phone_agent.config.apps_harmonyos import APP_ABILITIES, APP_PACKAGES
from phone_agent.config.timing import TIMING_CONFIG
from phone_agent.hdc.connection import _run_hdc_command
def get_current_app(device_id: str | None = None) -> str:
"""
Get the currently focused app name.
Args:
device_id: Optional HDC device ID for multi-device setups.
Returns:
The app name if recognized, otherwise "System Home".
"""
hdc_prefix = _get_hdc_prefix(device_id)
result = _run_hdc_command(
hdc_prefix + ["shell", "hidumper", "-s", "WindowManagerService", "-a", "-a"],
capture_output=True,
text=True,
encoding="utf-8"
)
output = result.stdout
if not output:
raise ValueError("No output from hidumper")
# Parse window focus info
for line in output.split("\n"):
if "focused" in line.lower() or "current" in line.lower():
for app_name, package in APP_PACKAGES.items():
if package in line:
return app_name
return "System Home"
def tap(
x: int, y: int, device_id: str | None = None, delay: float | None = None
) -> None:
"""
Tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional HDC device ID.
delay: Delay in seconds after tap. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_tap_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput click
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "click", str(x), str(y)],
capture_output=True
)
time.sleep(delay)
def double_tap(
x: int, y: int, device_id: str | None = None, delay: float | None = None
) -> None:
"""
Double tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional HDC device ID.
delay: Delay in seconds after double tap. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_double_tap_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput doubleClick
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "doubleClick", str(x), str(y)],
capture_output=True
)
time.sleep(delay)
def long_press(
x: int,
y: int,
duration_ms: int = 3000,
device_id: str | None = None,
delay: float | None = None,
) -> None:
"""
Long press at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
duration_ms: Duration of press in milliseconds (note: HarmonyOS longClick may not support duration).
device_id: Optional HDC device ID.
delay: Delay in seconds after long press. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_long_press_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput longClick
# Note: longClick may have a fixed duration, duration_ms parameter might not be supported
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "longClick", str(x), str(y)],
capture_output=True,
)
time.sleep(delay)
def swipe(
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration_ms: int | None = None,
device_id: str | None = None,
delay: float | None = None,
) -> None:
"""
Swipe from start to end coordinates.
Args:
start_x: Starting X coordinate.
start_y: Starting Y coordinate.
end_x: Ending X coordinate.
end_y: Ending Y coordinate.
duration_ms: Duration of swipe in milliseconds (auto-calculated if None).
device_id: Optional HDC device ID.
delay: Delay in seconds after swipe. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_swipe_delay
hdc_prefix = _get_hdc_prefix(device_id)
if duration_ms is None:
# Calculate duration based on distance
dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2
duration_ms = int(dist_sq / 1000)
duration_ms = max(500, min(duration_ms, 1000)) # Clamp between 500-1000ms
# HarmonyOS uses uitest uiInput swipe
# Format: swipe startX startY endX endY duration
_run_hdc_command(
hdc_prefix
+ [
"shell",
"uitest",
"uiInput",
"swipe",
str(start_x),
str(start_y),
str(end_x),
str(end_y),
str(duration_ms),
],
capture_output=True,
)
time.sleep(delay)
def back(device_id: str | None = None, delay: float | None = None) -> None:
"""
Press the back button.
Args:
device_id: Optional HDC device ID.
delay: Delay in seconds after pressing back. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_back_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput keyEvent Back
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Back"],
capture_output=True
)
time.sleep(delay)
def home(device_id: str | None = None, delay: float | None = None) -> None:
"""
Press the home button.
Args:
device_id: Optional HDC device ID.
delay: Delay in seconds after pressing home. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_home_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput keyEvent Home
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Home"],
capture_output=True
)
time.sleep(delay)
def launch_app(
app_name: str, device_id: str | None = None, delay: float | None = None
) -> bool:
"""
Launch an app by name.
Args:
app_name: The app name (must be in APP_PACKAGES).
device_id: Optional HDC device ID.
delay: Delay in seconds after launching. If None, uses configured default.
Returns:
True if app was launched, False if app not found.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_launch_delay
if app_name not in APP_PACKAGES:
print(f"[HDC] App '{app_name}' not found in HarmonyOS app list")
print(f"[HDC] Available apps: {', '.join(sorted(APP_PACKAGES.keys())[:10])}...")
return False
hdc_prefix = _get_hdc_prefix(device_id)
bundle = APP_PACKAGES[app_name]
# Get the ability name for this bundle
# Default to "EntryAbility" if not specified in APP_ABILITIES
ability = APP_ABILITIES.get(bundle, "EntryAbility")
# HarmonyOS uses 'aa start' command to launch apps
# Format: aa start -b {bundle} -a {ability}
_run_hdc_command(
hdc_prefix
+ [
"shell",
"aa",
"start",
"-b",
bundle,
"-a",
ability,
],
capture_output=True,
)
time.sleep(delay)
return True
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]

149
phone_agent/hdc/input.py Normal file
View File

@@ -0,0 +1,149 @@
"""Input utilities for HarmonyOS device text input."""
import base64
import subprocess
from typing import Optional
from phone_agent.hdc.connection import _run_hdc_command
def type_text(text: str, device_id: str | None = None) -> None:
"""
Type text into the currently focused input field.
Args:
text: The text to type. Supports multi-line text with newline characters.
device_id: Optional HDC device ID for multi-device setups.
Note:
HarmonyOS uses: hdc shell uitest uiInput text "文本内容"
This command works without coordinates when input field is focused.
For multi-line text, the function splits by newlines and sends ENTER keyEvents.
ENTER key code in HarmonyOS: 2054
Recommendation: Click on the input field first to focus it, then use this function.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Handle multi-line text by splitting on newlines
if '\n' in text:
lines = text.split('\n')
for i, line in enumerate(lines):
if line: # Only process non-empty lines
# Escape special characters for shell
escaped_line = line.replace('"', '\\"').replace("$", "\\$")
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "text", escaped_line],
capture_output=True,
text=True,
)
# Send ENTER key event after each line except the last one
if i < len(lines) - 1:
try:
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
except Exception as e:
print(f"[HDC] ENTER keyEvent failed: {e}")
else:
# Single line text - original logic
# Escape special characters for shell (keep quotes for proper text handling)
# The text will be wrapped in quotes in the command
escaped_text = text.replace('"', '\\"').replace("$", "\\$")
# HarmonyOS uitest uiInput text command
# Format: hdc shell uitest uiInput text "文本内容"
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "text", escaped_text],
capture_output=True,
text=True,
)
def clear_text(device_id: str | None = None) -> None:
"""
Clear text in the currently focused input field.
Args:
device_id: Optional HDC device ID for multi-device setups.
Note:
This method uses repeated delete key events to clear text.
For HarmonyOS, you might also use select all + delete for better efficiency.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Ctrl+A to select all (key code 2072 for Ctrl, 2017 for A)
# Then delete
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2072", "2017"],
capture_output=True,
text=True,
)
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2055"], # Delete key
capture_output=True,
text=True,
)
def detect_and_set_adb_keyboard(device_id: str | None = None) -> str:
"""
Detect current keyboard and switch to ADB Keyboard if available.
Args:
device_id: Optional HDC device ID for multi-device setups.
Returns:
The original keyboard IME identifier for later restoration.
Note:
This is a placeholder. HarmonyOS may not support ADB Keyboard.
If there's a similar tool for HarmonyOS, integrate it here.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Get current IME (if HarmonyOS supports this)
try:
result = _run_hdc_command(
hdc_prefix + ["shell", "settings", "get", "secure", "default_input_method"],
capture_output=True,
text=True,
)
current_ime = (result.stdout + result.stderr).strip()
# If ADB Keyboard equivalent exists for HarmonyOS, switch to it
# For now, we'll just return the current IME
return current_ime
except Exception:
return ""
def restore_keyboard(ime: str, device_id: str | None = None) -> None:
"""
Restore the original keyboard IME.
Args:
ime: The IME identifier to restore.
device_id: Optional HDC device ID for multi-device setups.
"""
if not ime:
return
hdc_prefix = _get_hdc_prefix(device_id)
try:
_run_hdc_command(
hdc_prefix + ["shell", "ime", "set", ime], capture_output=True, text=True
)
except Exception:
pass
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]

View File

@@ -0,0 +1,125 @@
"""Screenshot utilities for capturing HarmonyOS device screen."""
import base64
import os
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from io import BytesIO
from typing import Tuple
from PIL import Image
from phone_agent.hdc.connection import _run_hdc_command
@dataclass
class Screenshot:
"""Represents a captured screenshot."""
base64_data: str
width: int
height: int
is_sensitive: bool = False
def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screenshot:
"""
Capture a screenshot from the connected HarmonyOS device.
Args:
device_id: Optional HDC device ID for multi-device setups.
timeout: Timeout in seconds for screenshot operations.
Returns:
Screenshot object containing base64 data and dimensions.
Note:
If the screenshot fails (e.g., on sensitive screens like payment pages),
a black fallback image is returned with is_sensitive=True.
"""
temp_path = os.path.join(tempfile.gettempdir(), f"screenshot_{uuid.uuid4()}.png")
hdc_prefix = _get_hdc_prefix(device_id)
try:
# Execute screenshot command
# HarmonyOS HDC only supports JPEG format
remote_path = "/data/local/tmp/tmp_screenshot.jpeg"
# Try method 1: hdc shell screenshot (newer HarmonyOS versions)
result = _run_hdc_command(
hdc_prefix + ["shell", "screenshot", remote_path],
capture_output=True,
text=True,
timeout=timeout,
)
# Check for screenshot failure (sensitive screen)
output = result.stdout + result.stderr
if "fail" in output.lower() or "error" in output.lower() or "not found" in output.lower():
# Try method 2: snapshot_display (older versions or different devices)
result = _run_hdc_command(
hdc_prefix + ["shell", "snapshot_display", "-f", remote_path],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "fail" in output.lower() or "error" in output.lower():
return _create_fallback_screenshot(is_sensitive=True)
# Pull screenshot to local temp path
# Note: remote file is JPEG, but PIL can open it regardless of local extension
_run_hdc_command(
hdc_prefix + ["file", "recv", remote_path, temp_path],
capture_output=True,
text=True,
timeout=5,
)
if not os.path.exists(temp_path):
return _create_fallback_screenshot(is_sensitive=False)
# Read JPEG image and convert to PNG for model inference
# PIL automatically detects the image format from file content
img = Image.open(temp_path)
width, height = img.size
buffered = BytesIO()
img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Cleanup
os.remove(temp_path)
return Screenshot(
base64_data=base64_data, width=width, height=height, is_sensitive=False
)
except Exception as e:
print(f"Screenshot error: {e}")
return _create_fallback_screenshot(is_sensitive=False)
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]
def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""Create a black fallback image when screenshot fails."""
default_width, default_height = 1080, 2400
black_img = Image.new("RGB", (default_width, default_height), color="black")
buffered = BytesIO()
black_img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return Screenshot(
base64_data=base64_data,
width=default_width,
height=default_height,
is_sensitive=is_sensitive,
)