支持鸿蒙OSNEXT_HDC

This commit is contained in:
floatingstarZ
2025-12-16 14:28:49 +08:00
parent 4d427bcd31
commit 95f5921887
12 changed files with 2314 additions and 102 deletions

View File

@@ -2,24 +2,13 @@
import ast
import re
import subprocess
import time
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.adb import (
back,
clear_text,
detect_and_set_adb_keyboard,
double_tap,
home,
launch_app,
long_press,
restore_keyboard,
swipe,
tap,
type_text,
)
from phone_agent.config.timing import TIMING_CONFIG
from phone_agent.device_factory import get_device_factory
@dataclass
@@ -132,7 +121,8 @@ class ActionHandler:
if not app_name:
return ActionResult(False, False, "No app name specified")
success = launch_app(app_name, self.device_id)
device_factory = get_device_factory()
success = device_factory.launch_app(app_name, self.device_id)
if success:
return ActionResult(True, False)
return ActionResult(False, False, f"App not found: {app_name}")
@@ -154,19 +144,22 @@ class ActionHandler:
message="User cancelled sensitive operation",
)
tap(x, y, self.device_id)
device_factory = get_device_factory()
device_factory.tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_type(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle text input action."""
text = action.get("text", "")
device_factory = get_device_factory()
# Switch to ADB keyboard
original_ime = detect_and_set_adb_keyboard(self.device_id)
original_ime = device_factory.detect_and_set_adb_keyboard(self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_switch_delay)
# Clear existing text and type new text
clear_text(self.device_id)
device_factory.clear_text(self.device_id)
time.sleep(TIMING_CONFIG.action.text_clear_delay)
# Handle multiline text by splitting on newlines
@@ -174,7 +167,7 @@ class ActionHandler:
lines = text.split('\n')
for i, line in enumerate(lines):
if line: # Only type non-empty lines
type_text(line, self.device_id)
device_factory.type_text(line, self.device_id)
time.sleep(0.01)
# Send ENTER key between lines (not after the last line)
@@ -182,11 +175,11 @@ class ActionHandler:
self._send_keyevent("KEYCODE_ENTER")
time.sleep(0.01)
else:
type_text(text, self.device_id)
device_factory.type_text(text, self.device_id)
time.sleep(TIMING_CONFIG.action.text_input_delay)
# Restore original keyboard
restore_keyboard(original_ime, self.device_id)
device_factory.restore_keyboard(original_ime, self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_restore_delay)
return ActionResult(True, False)
@@ -202,17 +195,20 @@ class ActionHandler:
start_x, start_y = self._convert_relative_to_absolute(start, width, height)
end_x, end_y = self._convert_relative_to_absolute(end, width, height)
swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
device_factory = get_device_factory()
device_factory.swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_back(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle back button action."""
back(self.device_id)
device_factory = get_device_factory()
device_factory.back(self.device_id)
return ActionResult(True, False)
def _handle_home(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle home button action."""
home(self.device_id)
device_factory = get_device_factory()
device_factory.home(self.device_id)
return ActionResult(True, False)
def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult:
@@ -222,7 +218,8 @@ class ActionHandler:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
double_tap(x, y, self.device_id)
device_factory = get_device_factory()
device_factory.double_tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult:
@@ -232,7 +229,8 @@ class ActionHandler:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
long_press(x, y, device_id=self.device_id)
device_factory = get_device_factory()
device_factory.long_press(x, y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult:
@@ -271,13 +269,65 @@ class ActionHandler:
def _send_keyevent(self, keycode: str) -> None:
"""Send a keyevent to the device."""
import subprocess
adb_prefix = ["adb", "-s", self.device_id] if self.device_id else ["adb"]
subprocess.run(
adb_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
from phone_agent.device_factory import DeviceType, get_device_factory
from phone_agent.hdc.connection import _run_hdc_command
device_factory = get_device_factory()
# Handle HDC devices with HarmonyOS-specific keyEvent command
if device_factory.device_type == DeviceType.HDC:
hdc_prefix = ["hdc", "-t", self.device_id] if self.device_id else ["hdc"]
# Map common keycodes to HarmonyOS keyEvent codes
# KEYCODE_ENTER (66) -> 2054 (HarmonyOS Enter key code)
if keycode == "KEYCODE_ENTER" or keycode == "66":
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# For other keys, try to use the numeric code directly
# If keycode is a string like "KEYCODE_ENTER", convert it
try:
# Try to extract numeric code from string or use as-is
if keycode.startswith("KEYCODE_"):
# For now, only handle ENTER, other keys may need mapping
if "ENTER" in keycode:
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# Fallback to ADB-style command for unsupported keys
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# Assume it's a numeric code
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", str(keycode)],
capture_output=True,
text=True,
)
except Exception:
# Fallback to ADB-style command
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# ADB devices use standard input keyevent command
cmd_prefix = ["adb", "-s", self.device_id] if self.device_id else ["adb"]
subprocess.run(
cmd_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
@staticmethod
def _default_confirmation(message: str) -> bool:

View File

@@ -7,8 +7,8 @@ from typing import Any, Callable
from phone_agent.actions import ActionHandler
from phone_agent.actions.handler import do, finish, parse_action
from phone_agent.adb import get_current_app, get_screenshot
from phone_agent.config import get_messages, get_system_prompt
from phone_agent.device_factory import get_device_factory
from phone_agent.model import ModelClient, ModelConfig
from phone_agent.model.client import MessageBuilder
@@ -140,8 +140,9 @@ class PhoneAgent:
self._step_count += 1
# Capture current screen state
screenshot = get_screenshot(self.agent_config.device_id)
current_app = get_current_app(self.agent_config.device_id)
device_factory = get_device_factory()
screenshot = device_factory.get_screenshot(self.agent_config.device_id)
current_app = device_factory.get_current_app(self.agent_config.device_id)
# Build messages
if is_first:

View File

@@ -0,0 +1,209 @@
"""HarmonyOS application package name mappings.
Maps user-friendly app names to HarmonyOS bundle names.
These bundle names are used with the 'hdc shell aa start -b <bundle>' command.
"""
APP_PACKAGES: dict[str, str] = {
# Social & Messaging
"微信": "com.tencent.wechat",
"QQ": "com.tencent.mqq",
"微博": "com.sina.weibo.stage",
# E-commerce
"淘宝": "com.taobao.taobao4hmos",
"京东": "com.jd.hm.mall",
"拼多多": "com.xunmeng.pinduoduo.hos",
"淘宝闪购": "com.taobao.taobao4hmos",
"京东秒送": "com.jd.hm.mall",
# Lifestyle & Social
"小红书": "com.xingin.xhs_hos",
# "豆瓣": "com.douban.frodo", # 未在 hdc 列表中找到
# "知乎": "com.zhihu.android", # 未在 hdc 列表中找到
# Maps & Navigation
"高德地图": "com.amap.hmapp",
"百度地图": "com.baidu.baiduapp",
# Food & Services
"美团": "com.sankuai.hmeituan",
"大众点评": "com.sankuai.dianping",
"饿了么": "me.ele.eleme",
# "肯德基": "com.yek.android.kfc.activitys", # 未在 hdc 列表中找到
# Travel
# "携程": "ctrip.android.view", # 未在 hdc 列表中找到
"铁路12306": "com.tmri.app.harmony12123",
"12306": "com.tmri.app.harmony12123",
# "去哪儿": "com.Qunar", # 未在 hdc 列表中找到
# "去哪儿旅行": "com.Qunar", # 未在 hdc 列表中找到
"滴滴出行": "com.sdu.didi.hmos.psnger",
# Video & Entertainment
"bilibili": "yylx.danmaku.bili",
"抖音": "com.ss.hm.ugc.aweme",
"快手": "com.kuaishou.hmapp",
"腾讯视频": "com.tencent.videohm",
"爱奇艺": "com.qiyi.video.hmy",
# "优酷视频": "com.youku.phone", # 未在 hdc 列表中找到
# "芒果TV": "com.hunantv.imgo.activity", # 未在 hdc 列表中找到
# "红果短剧": "com.phoenix.read", # 未在 hdc 列表中找到
# Music & Audio
# "网易云音乐": "com.netease.cloudmusic", # 未在 hdc 列表中找到
"QQ音乐": "com.tencent.hm.qqmusic",
# "汽水音乐": "com.luna.music", # 未在 hdc 列表中找到,但有 com.luna.hm.music
"喜马拉雅": "com.ximalaya.ting.xmharmony",
# Reading
# "番茄小说": "com.dragon.read", # 未在 hdc 列表中找到
# "番茄免费小说": "com.dragon.read", # 未在 hdc 列表中找到
# "七猫免费小说": "com.kmxs.reader", # 未在 hdc 列表中找到
# Productivity
"飞书": "com.ss.feishu",
# "QQ邮箱": "com.tencent.androidqqmail", # 未在 hdc 列表中找到
# AI & Tools
"豆包": "com.larus.nova.hm",
# Health & Fitness
# "keep": "com.gotokeep.keep", # 未在 hdc 列表中找到
# "美柚": "com.lingan.seeyou", # 未在 hdc 列表中找到
# News & Information
# "腾讯新闻": "com.tencent.news", # 未在 hdc 列表中找到
"今日头条": "com.ss.hm.article.news",
# Real Estate
# "贝壳找房": "com.lianjia.beike", # 未在 hdc 列表中找到
# "安居客": "com.anjuke.android.app", # 未在 hdc 列表中找到
# Finance
# "同花顺": "com.hexin.plat.android", # 未在 hdc 列表中找到
# Games
# "星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到
# "崩坏:星穹铁道": "com.miHoYo.hkrpg", # 未在 hdc 列表中找到
# "恋与深空": "com.papegames.lysk.cn", # 未在 hdc 列表中找到
# HarmonyOS 第三方应用
"百度": "com.baidu.baiduapp",
"阿里巴巴": "com.alibaba.wireless_hmos",
"WPS": "cn.wps.mobileoffice.hap",
"企业微信": "com.tencent.wework.hmos",
"同程": "com.tongcheng.hmos",
"同程旅行": "com.tongcheng.hmos",
"唯品会": "com.vip.hosapp",
"支付宝": "com.alipay.mobile.client",
"UC浏览器": "com.uc.mobile",
"搜狗输入法": "com.sogou.input",
"扫描全能王": "com.intsig.camscanner.hap",
"美图秀秀": "com.meitu.meitupic",
"58同城": "com.wuba.life",
"得物": "com.dewu.hos",
"海底捞": "com.haidilao.haros",
"中国移动": "com.droi.tong",
"中国联通": "com.sinovatech.unicom.ha",
"国家税务总局": "cn.gov.chinatax.gt4.hm",
# HarmonyOS 系统应用 - 工具类
"浏览器": "com.huawei.hmos.browser",
"计算器": "com.huawei.hmos.calculator",
"日历": "com.huawei.hmos.calendar",
"相机": "com.huawei.hmos.camera",
"时钟": "com.huawei.hmos.clock",
"云盘": "com.huawei.hmos.clouddrive",
"云空间": "com.huawei.hmos.clouddrive",
"邮件": "com.huawei.hmos.email",
"文件管理器": "com.huawei.hmos.filemanager",
"文件": "com.huawei.hmos.files",
"查找设备": "com.huawei.hmos.finddevice",
"查找手机": "com.huawei.hmos.finddevice",
"录音机": "com.huawei.hmos.soundrecorder",
"录音": "com.huawei.hmos.soundrecorder",
"录屏": "com.huawei.hmos.screenrecorder",
"截屏": "com.huawei.hmos.screenshot",
"笔记": "com.huawei.hmos.notepad",
"备忘录": "com.huawei.hmos.notepad",
# HarmonyOS 系统应用 - 媒体类
"相册": "com.huawei.hmos.photos",
"图库": "com.huawei.hmos.photos",
# "视频": "com.huawei.hmos.mediaplayer", # 未在 hdc 列表中找到,但有 com.huawei.hmsapp.himovie
# HarmonyOS 系统应用 - 通讯类
"联系人": "com.ohos.contacts",
"通讯录": "com.ohos.contacts",
"短信": "com.ohos.mms",
"信息": "com.ohos.mms",
"电话": "com.ohos.callui",
"拨号": "com.ohos.callui",
# HarmonyOS 系统应用 - 设置类
"设置": "com.huawei.hmos.settings",
"系统设置": "com.huawei.hmos.settings",
"AndroidSystemSettings": "com.huawei.hmos.settings",
"Android System Settings": "com.huawei.hmos.settings",
"Android System Settings": "com.huawei.hmos.settings",
"Android-System-Settings": "com.huawei.hmos.settings",
"Settings": "com.huawei.hmos.settings",
# HarmonyOS 系统应用 - 生活服务
"健康": "com.huawei.hmos.health",
"运动健康": "com.huawei.hmos.health",
"地图": "com.huawei.hmos.maps.app",
"华为地图": "com.huawei.hmos.maps.app",
"钱包": "com.huawei.hmos.wallet",
"华为钱包": "com.huawei.hmos.wallet",
"智慧生活": "com.huawei.hmos.ailife",
"智能助手": "com.huawei.hmos.vassistant",
"小艺": "com.huawei.hmos.vassistant",
# HarmonyOS 服务
"应用市场": "com.huawei.hmsapp.appgallery",
"华为应用市场": "com.huawei.hmsapp.appgallery",
"音乐": "com.huawei.hmsapp.music",
"华为音乐": "com.huawei.hmsapp.music",
"主题": "com.huawei.hmsapp.thememanager",
"主题管理": "com.huawei.hmsapp.thememanager",
"天气": "com.huawei.hmsapp.totemweather",
"华为天气": "com.huawei.hmsapp.totemweather",
"视频": "com.huawei.hmsapp.himovie",
"华为视频": "com.huawei.hmsapp.himovie",
"阅读": "com.huawei.hmsapp.books",
"华为阅读": "com.huawei.hmsapp.books",
"游戏中心": "com.huawei.hmsapp.gamecenter",
"华为游戏中心": "com.huawei.hmsapp.gamecenter",
"搜索": "com.huawei.hmsapp.hisearch",
"华为搜索": "com.huawei.hmsapp.hisearch",
"指南针": "com.huawei.hmsapp.compass",
"会员中心": "com.huawei.hmos.myhuawei",
"我的华为": "com.huawei.hmos.myhuawei",
"华为会员": "com.huawei.hmos.myhuawei",
}
def get_package_name(app_name: str) -> str | None:
"""
Get the package name for an app.
Args:
app_name: The display name of the app.
Returns:
The HarmonyOS bundle name, or None if not found.
"""
return APP_PACKAGES.get(app_name)
def get_app_name(package_name: str) -> str | None:
"""
Get the app name from a package name.
Args:
package_name: The HarmonyOS bundle name.
Returns:
The display name of the app, or None if not found.
"""
for name, package in APP_PACKAGES.items():
if package == package_name:
return name
return None
def list_supported_apps() -> list[str]:
"""
Get a list of all supported app names.
Returns:
List of app names.
"""
return list(APP_PACKAGES.keys())

View File

@@ -0,0 +1,138 @@
"""Device factory for selecting ADB or HDC based on device type."""
from enum import Enum
from typing import Any
class DeviceType(Enum):
"""Type of device connection tool."""
ADB = "adb"
HDC = "hdc"
class DeviceFactory:
"""
Factory class for getting device-specific implementations.
This allows the system to work with both Android (ADB) and HarmonyOS (HDC) devices.
"""
def __init__(self, device_type: DeviceType = DeviceType.ADB):
"""
Initialize the device factory.
Args:
device_type: The type of device to use (ADB or HDC).
"""
self.device_type = device_type
self._module = None
@property
def module(self):
"""Get the appropriate device module (adb or hdc)."""
if self._module is None:
if self.device_type == DeviceType.ADB:
from phone_agent import adb
self._module = adb
elif self.device_type == DeviceType.HDC:
from phone_agent import hdc
self._module = hdc
else:
raise ValueError(f"Unknown device type: {self.device_type}")
return self._module
def get_screenshot(self, device_id: str | None = None, timeout: int = 10):
"""Get screenshot from device."""
return self.module.get_screenshot(device_id, timeout)
def get_current_app(self, device_id: str | None = None) -> str:
"""Get current app name."""
return self.module.get_current_app(device_id)
def tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
"""Tap at coordinates."""
return self.module.tap(x, y, device_id, delay)
def double_tap(self, x: int, y: int, device_id: str | None = None, delay: float | None = None):
"""Double tap at coordinates."""
return self.module.double_tap(x, y, device_id, delay)
def long_press(self, x: int, y: int, duration_ms: int = 3000, device_id: str | None = None, delay: float | None = None):
"""Long press at coordinates."""
return self.module.long_press(x, y, duration_ms, device_id, delay)
def swipe(self, start_x: int, start_y: int, end_x: int, end_y: int, duration_ms: int | None = None, device_id: str | None = None, delay: float | None = None):
"""Swipe from start to end."""
return self.module.swipe(start_x, start_y, end_x, end_y, duration_ms, device_id, delay)
def back(self, device_id: str | None = None, delay: float | None = None):
"""Press back button."""
return self.module.back(device_id, delay)
def home(self, device_id: str | None = None, delay: float | None = None):
"""Press home button."""
return self.module.home(device_id, delay)
def launch_app(self, app_name: str, device_id: str | None = None, delay: float | None = None) -> bool:
"""Launch an app."""
return self.module.launch_app(app_name, device_id, delay)
def type_text(self, text: str, device_id: str | None = None):
"""Type text."""
return self.module.type_text(text, device_id)
def clear_text(self, device_id: str | None = None):
"""Clear text."""
return self.module.clear_text(device_id)
def detect_and_set_adb_keyboard(self, device_id: str | None = None) -> str:
"""Detect and set keyboard."""
return self.module.detect_and_set_adb_keyboard(device_id)
def restore_keyboard(self, ime: str, device_id: str | None = None):
"""Restore keyboard."""
return self.module.restore_keyboard(ime, device_id)
def list_devices(self):
"""List connected devices."""
return self.module.list_devices()
def get_connection_class(self):
"""Get the connection class (ADBConnection or HDCConnection)."""
if self.device_type == DeviceType.ADB:
from phone_agent.adb import ADBConnection
return ADBConnection
elif self.device_type == DeviceType.HDC:
from phone_agent.hdc import HDCConnection
return HDCConnection
else:
raise ValueError(f"Unknown device type: {self.device_type}")
# Global device factory instance
_device_factory: DeviceFactory | None = None
def set_device_type(device_type: DeviceType):
"""
Set the global device type.
Args:
device_type: The device type to use (ADB or HDC).
"""
global _device_factory
_device_factory = DeviceFactory(device_type)
def get_device_factory() -> DeviceFactory:
"""
Get the global device factory instance.
Returns:
The device factory instance.
"""
global _device_factory
if _device_factory is None:
_device_factory = DeviceFactory(DeviceType.ADB) # Default to ADB
return _device_factory

View File

@@ -0,0 +1,53 @@
"""HDC utilities for HarmonyOS device interaction."""
from phone_agent.hdc.connection import (
HDCConnection,
ConnectionType,
DeviceInfo,
list_devices,
quick_connect,
set_hdc_verbose,
)
from phone_agent.hdc.device import (
back,
double_tap,
get_current_app,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.hdc.input import (
clear_text,
detect_and_set_adb_keyboard,
restore_keyboard,
type_text,
)
from phone_agent.hdc.screenshot import get_screenshot
__all__ = [
# Screenshot
"get_screenshot",
# Input
"type_text",
"clear_text",
"detect_and_set_adb_keyboard",
"restore_keyboard",
# Device control
"get_current_app",
"tap",
"swipe",
"back",
"home",
"double_tap",
"long_press",
"launch_app",
# Connection management
"HDCConnection",
"DeviceInfo",
"ConnectionType",
"quick_connect",
"list_devices",
"set_hdc_verbose",
]

View File

@@ -0,0 +1,381 @@
"""HDC connection management for HarmonyOS devices."""
import os
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from phone_agent.config.timing import TIMING_CONFIG
# Global flag to control HDC command output
_HDC_VERBOSE = os.getenv("HDC_VERBOSE", "false").lower() in ("true", "1", "yes")
def _run_hdc_command(cmd: list, **kwargs) -> subprocess.CompletedProcess:
"""
Run HDC command with optional verbose output.
Args:
cmd: Command list to execute.
**kwargs: Additional arguments for subprocess.run.
Returns:
CompletedProcess result.
"""
if _HDC_VERBOSE:
print(f"[HDC] Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, **kwargs)
if _HDC_VERBOSE and result.returncode != 0:
print(f"[HDC] Command failed with return code {result.returncode}")
if hasattr(result, 'stderr') and result.stderr:
print(f"[HDC] Error: {result.stderr}")
return result
def set_hdc_verbose(verbose: bool):
"""Set HDC verbose mode globally."""
global _HDC_VERBOSE
_HDC_VERBOSE = verbose
class ConnectionType(Enum):
"""Type of HDC connection."""
USB = "usb"
WIFI = "wifi"
REMOTE = "remote"
@dataclass
class DeviceInfo:
"""Information about a connected device."""
device_id: str
status: str
connection_type: ConnectionType
model: str | None = None
harmony_version: str | None = None
class HDCConnection:
"""
Manages HDC connections to HarmonyOS devices.
Supports USB, WiFi, and remote TCP/IP connections.
Example:
>>> conn = HDCConnection()
>>> # Connect to remote device
>>> conn.connect("192.168.1.100:5555")
>>> # List devices
>>> devices = conn.list_devices()
>>> # Disconnect
>>> conn.disconnect("192.168.1.100:5555")
"""
def __init__(self, hdc_path: str = "hdc"):
"""
Initialize HDC connection manager.
Args:
hdc_path: Path to HDC executable.
"""
self.hdc_path = hdc_path
def connect(self, address: str, timeout: int = 10) -> tuple[bool, str]:
"""
Connect to a remote device via TCP/IP.
Args:
address: Device address in format "host:port" (e.g., "192.168.1.100:5555").
timeout: Connection timeout in seconds.
Returns:
Tuple of (success, message).
Note:
The remote device must have TCP/IP debugging enabled.
"""
# Validate address format
if ":" not in address:
address = f"{address}:5555" # Default HDC port
try:
result = _run_hdc_command(
[self.hdc_path, "tconn", address],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "Connect OK" in output or "connected" in output.lower():
return True, f"Connected to {address}"
elif "already connected" in output.lower():
return True, f"Already connected to {address}"
else:
return False, output.strip()
except subprocess.TimeoutExpired:
return False, f"Connection timeout after {timeout}s"
except Exception as e:
return False, f"Connection error: {e}"
def disconnect(self, address: str | None = None) -> tuple[bool, str]:
"""
Disconnect from a remote device.
Args:
address: Device address to disconnect. If None, disconnects all.
Returns:
Tuple of (success, message).
"""
try:
if address:
cmd = [self.hdc_path, "tdisconn", address]
else:
# HDC doesn't have a "disconnect all" command, so we need to list and disconnect each
devices = self.list_devices()
for device in devices:
if ":" in device.device_id: # Remote device
_run_hdc_command(
[self.hdc_path, "tdisconn", device.device_id],
capture_output=True,
text=True,
timeout=5
)
return True, "Disconnected all remote devices"
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
output = result.stdout + result.stderr
return True, output.strip() or "Disconnected"
except Exception as e:
return False, f"Disconnect error: {e}"
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected devices.
Returns:
List of DeviceInfo objects.
"""
try:
result = _run_hdc_command(
[self.hdc_path, "list", "targets"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
# HDC output format: device_id (status)
# Example: "192.168.1.100:5555" or "FMR0223C13000649"
device_id = line.strip()
# Determine connection type
if ":" in device_id:
conn_type = ConnectionType.REMOTE
else:
conn_type = ConnectionType.USB
# HDC doesn't provide detailed status in list command
# We assume "Connected" status for devices that appear
devices.append(
DeviceInfo(
device_id=device_id,
status="device",
connection_type=conn_type,
model=None,
)
)
return devices
except Exception as e:
print(f"Error listing devices: {e}")
return []
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device ID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return len(devices) > 0
return any(d.device_id == device_id for d in devices)
def enable_tcpip(
self, port: int = 5555, device_id: str | None = None
) -> tuple[bool, str]:
"""
Enable TCP/IP debugging on a USB-connected device.
This allows subsequent wireless connections to the device.
Args:
port: TCP port for HDC (default: 5555).
device_id: Device ID. If None, uses first available device.
Returns:
Tuple of (success, message).
Note:
The device must be connected via USB first.
After this, you can disconnect USB and connect via WiFi.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["tmode", "port", str(port)])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10)
output = result.stdout + result.stderr
if result.returncode == 0 or "success" in output.lower():
time.sleep(TIMING_CONFIG.connection.adb_restart_delay)
return True, f"TCP/IP mode enabled on port {port}"
else:
return False, output.strip()
except Exception as e:
return False, f"Error enabling TCP/IP: {e}"
def get_device_ip(self, device_id: str | None = None) -> str | None:
"""
Get the IP address of a connected device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
IP address string or None if not found.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["shell", "ifconfig"])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
# Parse IP from ifconfig output
for line in result.stdout.split("\n"):
if "inet addr:" in line or "inet " in line:
parts = line.strip().split()
for i, part in enumerate(parts):
if "addr:" in part:
ip = part.split(":")[1]
# Filter out localhost
if not ip.startswith("127."):
return ip
elif part == "inet" and i + 1 < len(parts):
ip = parts[i + 1].split("/")[0]
if not ip.startswith("127."):
return ip
return None
except Exception as e:
print(f"Error getting device IP: {e}")
return None
def restart_server(self) -> tuple[bool, str]:
"""
Restart the HDC server.
Returns:
Tuple of (success, message).
"""
try:
# Kill server
_run_hdc_command(
[self.hdc_path, "kill"], capture_output=True, timeout=5
)
time.sleep(TIMING_CONFIG.connection.server_restart_delay)
# Start server (HDC auto-starts when running commands)
_run_hdc_command(
[self.hdc_path, "start", "-r"], capture_output=True, timeout=5
)
return True, "HDC server restarted"
except Exception as e:
return False, f"Error restarting server: {e}"
def quick_connect(address: str) -> tuple[bool, str]:
"""
Quick helper to connect to a remote device.
Args:
address: Device address (e.g., "192.168.1.100" or "192.168.1.100:5555").
Returns:
Tuple of (success, message).
"""
conn = HDCConnection()
return conn.connect(address)
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected devices.
Returns:
List of DeviceInfo objects.
"""
conn = HDCConnection()
return conn.list_devices()

269
phone_agent/hdc/device.py Normal file
View File

@@ -0,0 +1,269 @@
"""Device control utilities for HarmonyOS automation."""
import os
import subprocess
import time
from typing import List, Optional, Tuple
from phone_agent.config.apps_harmonyos import APP_PACKAGES
from phone_agent.config.timing import TIMING_CONFIG
from phone_agent.hdc.connection import _run_hdc_command
def get_current_app(device_id: str | None = None) -> str:
"""
Get the currently focused app name.
Args:
device_id: Optional HDC device ID for multi-device setups.
Returns:
The app name if recognized, otherwise "System Home".
"""
hdc_prefix = _get_hdc_prefix(device_id)
result = _run_hdc_command(
hdc_prefix + ["shell", "hidumper", "-s", "WindowManagerService", "-a", "-a"],
capture_output=True,
text=True,
encoding="utf-8"
)
output = result.stdout
if not output:
raise ValueError("No output from hidumper")
# Parse window focus info
for line in output.split("\n"):
if "focused" in line.lower() or "current" in line.lower():
for app_name, package in APP_PACKAGES.items():
if package in line:
return app_name
return "System Home"
def tap(
x: int, y: int, device_id: str | None = None, delay: float | None = None
) -> None:
"""
Tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional HDC device ID.
delay: Delay in seconds after tap. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_tap_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput click
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "click", str(x), str(y)],
capture_output=True
)
time.sleep(delay)
def double_tap(
x: int, y: int, device_id: str | None = None, delay: float | None = None
) -> None:
"""
Double tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional HDC device ID.
delay: Delay in seconds after double tap. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_double_tap_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput doubleClick
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "doubleClick", str(x), str(y)],
capture_output=True
)
time.sleep(delay)
def long_press(
x: int,
y: int,
duration_ms: int = 3000,
device_id: str | None = None,
delay: float | None = None,
) -> None:
"""
Long press at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
duration_ms: Duration of press in milliseconds (note: HarmonyOS longClick may not support duration).
device_id: Optional HDC device ID.
delay: Delay in seconds after long press. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_long_press_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput longClick
# Note: longClick may have a fixed duration, duration_ms parameter might not be supported
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "longClick", str(x), str(y)],
capture_output=True,
)
time.sleep(delay)
def swipe(
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration_ms: int | None = None,
device_id: str | None = None,
delay: float | None = None,
) -> None:
"""
Swipe from start to end coordinates.
Args:
start_x: Starting X coordinate.
start_y: Starting Y coordinate.
end_x: Ending X coordinate.
end_y: Ending Y coordinate.
duration_ms: Duration of swipe in milliseconds (auto-calculated if None).
device_id: Optional HDC device ID.
delay: Delay in seconds after swipe. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_swipe_delay
hdc_prefix = _get_hdc_prefix(device_id)
if duration_ms is None:
# Calculate duration based on distance
dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2
duration_ms = int(dist_sq / 1000)
duration_ms = max(500, min(duration_ms, 1000)) # Clamp between 500-1000ms
# HarmonyOS uses uitest uiInput swipe
# Format: swipe startX startY endX endY duration
_run_hdc_command(
hdc_prefix
+ [
"shell",
"uitest",
"uiInput",
"swipe",
str(start_x),
str(start_y),
str(end_x),
str(end_y),
str(duration_ms),
],
capture_output=True,
)
time.sleep(delay)
def back(device_id: str | None = None, delay: float | None = None) -> None:
"""
Press the back button.
Args:
device_id: Optional HDC device ID.
delay: Delay in seconds after pressing back. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_back_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput keyEvent Back
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Back"],
capture_output=True
)
time.sleep(delay)
def home(device_id: str | None = None, delay: float | None = None) -> None:
"""
Press the home button.
Args:
device_id: Optional HDC device ID.
delay: Delay in seconds after pressing home. If None, uses configured default.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_home_delay
hdc_prefix = _get_hdc_prefix(device_id)
# HarmonyOS uses uitest uiInput keyEvent Home
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "Home"],
capture_output=True
)
time.sleep(delay)
def launch_app(
app_name: str, device_id: str | None = None, delay: float | None = None
) -> bool:
"""
Launch an app by name.
Args:
app_name: The app name (must be in APP_PACKAGES).
device_id: Optional HDC device ID.
delay: Delay in seconds after launching. If None, uses configured default.
Returns:
True if app was launched, False if app not found.
"""
if delay is None:
delay = TIMING_CONFIG.device.default_launch_delay
if app_name not in APP_PACKAGES:
print(f"[HDC] App '{app_name}' not found in HarmonyOS app list")
print(f"[HDC] Available apps: {', '.join(sorted(APP_PACKAGES.keys())[:10])}...")
return False
hdc_prefix = _get_hdc_prefix(device_id)
bundle = APP_PACKAGES[app_name]
# HarmonyOS uses 'aa start' command to launch apps
# Format: aa start -b {bundle} -a {ability}
# Most HarmonyOS apps use "EntryAbility" as the main ability name
_run_hdc_command(
hdc_prefix
+ [
"shell",
"aa",
"start",
"-b",
bundle,
"-a",
"EntryAbility",
],
capture_output=True,
)
time.sleep(delay)
return True
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]

136
phone_agent/hdc/input.py Normal file
View File

@@ -0,0 +1,136 @@
"""Input utilities for HarmonyOS device text input."""
import base64
import subprocess
from typing import Optional
from phone_agent.hdc.connection import _run_hdc_command
def type_text(text: str, device_id: str | None = None, x: int = None, y: int = None) -> None:
"""
Type text into the currently focused input field.
Args:
text: The text to type.
device_id: Optional HDC device ID for multi-device setups.
x: Optional X coordinate for input field (deprecated, kept for compatibility).
y: Optional Y coordinate for input field (deprecated, kept for compatibility).
Note:
HarmonyOS uses: hdc shell uitest uiInput text "文本内容"
This command works without coordinates when input field is focused.
Recommendation: Click on the input field first to focus it, then use this function.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Escape special characters for shell (keep quotes for proper text handling)
# The text will be wrapped in quotes in the command
escaped_text = text.replace('"', '\\"').replace("$", "\\$")
try:
# HarmonyOS uitest uiInput text command
# Format: hdc shell uitest uiInput text "文本内容"
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "text", escaped_text],
capture_output=True,
text=True,
)
except Exception as e:
print(f"[HDC] Text input failed: {e}")
# Fallback: try with coordinates if provided (for older HarmonyOS versions)
if x is not None and y is not None:
try:
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "inputText", str(x), str(y), escaped_text],
capture_output=True,
text=True,
)
except Exception:
pass
def clear_text(device_id: str | None = None) -> None:
"""
Clear text in the currently focused input field.
Args:
device_id: Optional HDC device ID for multi-device setups.
Note:
This method uses repeated delete key events to clear text.
For HarmonyOS, you might also use select all + delete for better efficiency.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Ctrl+A to select all (key code 2072 for Ctrl, 2017 for A)
# Then delete
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2072", "2017"],
capture_output=True,
text=True,
)
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2055"], # Delete key
capture_output=True,
text=True,
)
def detect_and_set_adb_keyboard(device_id: str | None = None) -> str:
"""
Detect current keyboard and switch to ADB Keyboard if available.
Args:
device_id: Optional HDC device ID for multi-device setups.
Returns:
The original keyboard IME identifier for later restoration.
Note:
This is a placeholder. HarmonyOS may not support ADB Keyboard.
If there's a similar tool for HarmonyOS, integrate it here.
"""
hdc_prefix = _get_hdc_prefix(device_id)
# Get current IME (if HarmonyOS supports this)
try:
result = _run_hdc_command(
hdc_prefix + ["shell", "settings", "get", "secure", "default_input_method"],
capture_output=True,
text=True,
)
current_ime = (result.stdout + result.stderr).strip()
# If ADB Keyboard equivalent exists for HarmonyOS, switch to it
# For now, we'll just return the current IME
return current_ime
except Exception:
return ""
def restore_keyboard(ime: str, device_id: str | None = None) -> None:
"""
Restore the original keyboard IME.
Args:
ime: The IME identifier to restore.
device_id: Optional HDC device ID for multi-device setups.
"""
if not ime:
return
hdc_prefix = _get_hdc_prefix(device_id)
try:
_run_hdc_command(
hdc_prefix + ["shell", "ime", "set", ime], capture_output=True, text=True
)
except Exception:
pass
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]

View File

@@ -0,0 +1,125 @@
"""Screenshot utilities for capturing HarmonyOS device screen."""
import base64
import os
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from io import BytesIO
from typing import Tuple
from PIL import Image
from phone_agent.hdc.connection import _run_hdc_command
@dataclass
class Screenshot:
"""Represents a captured screenshot."""
base64_data: str
width: int
height: int
is_sensitive: bool = False
def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screenshot:
"""
Capture a screenshot from the connected HarmonyOS device.
Args:
device_id: Optional HDC device ID for multi-device setups.
timeout: Timeout in seconds for screenshot operations.
Returns:
Screenshot object containing base64 data and dimensions.
Note:
If the screenshot fails (e.g., on sensitive screens like payment pages),
a black fallback image is returned with is_sensitive=True.
"""
temp_path = os.path.join(tempfile.gettempdir(), f"screenshot_{uuid.uuid4()}.png")
hdc_prefix = _get_hdc_prefix(device_id)
try:
# Execute screenshot command
# HarmonyOS HDC only supports JPEG format
remote_path = "/data/local/tmp/tmp_screenshot.jpeg"
# Try method 1: hdc shell screenshot (newer HarmonyOS versions)
result = _run_hdc_command(
hdc_prefix + ["shell", "screenshot", remote_path],
capture_output=True,
text=True,
timeout=timeout,
)
# Check for screenshot failure (sensitive screen)
output = result.stdout + result.stderr
if "fail" in output.lower() or "error" in output.lower() or "not found" in output.lower():
# Try method 2: snapshot_display (older versions or different devices)
result = _run_hdc_command(
hdc_prefix + ["shell", "snapshot_display", "-f", remote_path],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "fail" in output.lower() or "error" in output.lower():
return _create_fallback_screenshot(is_sensitive=True)
# Pull screenshot to local temp path
# Note: remote file is JPEG, but PIL can open it regardless of local extension
_run_hdc_command(
hdc_prefix + ["file", "recv", remote_path, temp_path],
capture_output=True,
text=True,
timeout=5,
)
if not os.path.exists(temp_path):
return _create_fallback_screenshot(is_sensitive=False)
# Read JPEG image and convert to PNG for model inference
# PIL automatically detects the image format from file content
img = Image.open(temp_path)
width, height = img.size
buffered = BytesIO()
img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Cleanup
os.remove(temp_path)
return Screenshot(
base64_data=base64_data, width=width, height=height, is_sensitive=False
)
except Exception as e:
print(f"Screenshot error: {e}")
return _create_fallback_screenshot(is_sensitive=False)
def _get_hdc_prefix(device_id: str | None) -> list:
"""Get HDC command prefix with optional device specifier."""
if device_id:
return ["hdc", "-t", device_id]
return ["hdc"]
def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""Create a black fallback image when screenshot fails."""
default_width, default_height = 1080, 2400
black_img = Image.new("RGB", (default_width, default_height), color="black")
buffered = BytesIO()
black_img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return Screenshot(
base64_data=base64_data,
width=default_width,
height=default_height,
is_sensitive=is_sensitive,
)