Files
Open-AutoGLM/phone_agent/hdc/connection.py
2025-12-16 14:28:49 +08:00

382 lines
11 KiB
Python

"""HDC connection management for HarmonyOS devices."""
import os
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from phone_agent.config.timing import TIMING_CONFIG
# Global flag to control HDC command output
_HDC_VERBOSE = os.getenv("HDC_VERBOSE", "false").lower() in ("true", "1", "yes")
def _run_hdc_command(cmd: list, **kwargs) -> subprocess.CompletedProcess:
"""
Run HDC command with optional verbose output.
Args:
cmd: Command list to execute.
**kwargs: Additional arguments for subprocess.run.
Returns:
CompletedProcess result.
"""
if _HDC_VERBOSE:
print(f"[HDC] Running command: {' '.join(cmd)}")
result = subprocess.run(cmd, **kwargs)
if _HDC_VERBOSE and result.returncode != 0:
print(f"[HDC] Command failed with return code {result.returncode}")
if hasattr(result, 'stderr') and result.stderr:
print(f"[HDC] Error: {result.stderr}")
return result
def set_hdc_verbose(verbose: bool):
"""Set HDC verbose mode globally."""
global _HDC_VERBOSE
_HDC_VERBOSE = verbose
class ConnectionType(Enum):
"""Type of HDC connection."""
USB = "usb"
WIFI = "wifi"
REMOTE = "remote"
@dataclass
class DeviceInfo:
"""Information about a connected device."""
device_id: str
status: str
connection_type: ConnectionType
model: str | None = None
harmony_version: str | None = None
class HDCConnection:
"""
Manages HDC connections to HarmonyOS devices.
Supports USB, WiFi, and remote TCP/IP connections.
Example:
>>> conn = HDCConnection()
>>> # Connect to remote device
>>> conn.connect("192.168.1.100:5555")
>>> # List devices
>>> devices = conn.list_devices()
>>> # Disconnect
>>> conn.disconnect("192.168.1.100:5555")
"""
def __init__(self, hdc_path: str = "hdc"):
"""
Initialize HDC connection manager.
Args:
hdc_path: Path to HDC executable.
"""
self.hdc_path = hdc_path
def connect(self, address: str, timeout: int = 10) -> tuple[bool, str]:
"""
Connect to a remote device via TCP/IP.
Args:
address: Device address in format "host:port" (e.g., "192.168.1.100:5555").
timeout: Connection timeout in seconds.
Returns:
Tuple of (success, message).
Note:
The remote device must have TCP/IP debugging enabled.
"""
# Validate address format
if ":" not in address:
address = f"{address}:5555" # Default HDC port
try:
result = _run_hdc_command(
[self.hdc_path, "tconn", address],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "Connect OK" in output or "connected" in output.lower():
return True, f"Connected to {address}"
elif "already connected" in output.lower():
return True, f"Already connected to {address}"
else:
return False, output.strip()
except subprocess.TimeoutExpired:
return False, f"Connection timeout after {timeout}s"
except Exception as e:
return False, f"Connection error: {e}"
def disconnect(self, address: str | None = None) -> tuple[bool, str]:
"""
Disconnect from a remote device.
Args:
address: Device address to disconnect. If None, disconnects all.
Returns:
Tuple of (success, message).
"""
try:
if address:
cmd = [self.hdc_path, "tdisconn", address]
else:
# HDC doesn't have a "disconnect all" command, so we need to list and disconnect each
devices = self.list_devices()
for device in devices:
if ":" in device.device_id: # Remote device
_run_hdc_command(
[self.hdc_path, "tdisconn", device.device_id],
capture_output=True,
text=True,
timeout=5
)
return True, "Disconnected all remote devices"
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
output = result.stdout + result.stderr
return True, output.strip() or "Disconnected"
except Exception as e:
return False, f"Disconnect error: {e}"
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected devices.
Returns:
List of DeviceInfo objects.
"""
try:
result = _run_hdc_command(
[self.hdc_path, "list", "targets"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
# HDC output format: device_id (status)
# Example: "192.168.1.100:5555" or "FMR0223C13000649"
device_id = line.strip()
# Determine connection type
if ":" in device_id:
conn_type = ConnectionType.REMOTE
else:
conn_type = ConnectionType.USB
# HDC doesn't provide detailed status in list command
# We assume "Connected" status for devices that appear
devices.append(
DeviceInfo(
device_id=device_id,
status="device",
connection_type=conn_type,
model=None,
)
)
return devices
except Exception as e:
print(f"Error listing devices: {e}")
return []
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device ID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return len(devices) > 0
return any(d.device_id == device_id for d in devices)
def enable_tcpip(
self, port: int = 5555, device_id: str | None = None
) -> tuple[bool, str]:
"""
Enable TCP/IP debugging on a USB-connected device.
This allows subsequent wireless connections to the device.
Args:
port: TCP port for HDC (default: 5555).
device_id: Device ID. If None, uses first available device.
Returns:
Tuple of (success, message).
Note:
The device must be connected via USB first.
After this, you can disconnect USB and connect via WiFi.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["tmode", "port", str(port)])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=10)
output = result.stdout + result.stderr
if result.returncode == 0 or "success" in output.lower():
time.sleep(TIMING_CONFIG.connection.adb_restart_delay)
return True, f"TCP/IP mode enabled on port {port}"
else:
return False, output.strip()
except Exception as e:
return False, f"Error enabling TCP/IP: {e}"
def get_device_ip(self, device_id: str | None = None) -> str | None:
"""
Get the IP address of a connected device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
IP address string or None if not found.
"""
try:
cmd = [self.hdc_path]
if device_id:
cmd.extend(["-t", device_id])
cmd.extend(["shell", "ifconfig"])
result = _run_hdc_command(cmd, capture_output=True, text=True, encoding="utf-8", timeout=5)
# Parse IP from ifconfig output
for line in result.stdout.split("\n"):
if "inet addr:" in line or "inet " in line:
parts = line.strip().split()
for i, part in enumerate(parts):
if "addr:" in part:
ip = part.split(":")[1]
# Filter out localhost
if not ip.startswith("127."):
return ip
elif part == "inet" and i + 1 < len(parts):
ip = parts[i + 1].split("/")[0]
if not ip.startswith("127."):
return ip
return None
except Exception as e:
print(f"Error getting device IP: {e}")
return None
def restart_server(self) -> tuple[bool, str]:
"""
Restart the HDC server.
Returns:
Tuple of (success, message).
"""
try:
# Kill server
_run_hdc_command(
[self.hdc_path, "kill"], capture_output=True, timeout=5
)
time.sleep(TIMING_CONFIG.connection.server_restart_delay)
# Start server (HDC auto-starts when running commands)
_run_hdc_command(
[self.hdc_path, "start", "-r"], capture_output=True, timeout=5
)
return True, "HDC server restarted"
except Exception as e:
return False, f"Error restarting server: {e}"
def quick_connect(address: str) -> tuple[bool, str]:
"""
Quick helper to connect to a remote device.
Args:
address: Device address (e.g., "192.168.1.100" or "192.168.1.100:5555").
Returns:
Tuple of (success, message).
"""
conn = HDCConnection()
return conn.connect(address)
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected devices.
Returns:
List of DeviceInfo objects.
"""
conn = HDCConnection()
return conn.list_devices()