draft init

This commit is contained in:
zRzRzRzRzRzRzR
2025-12-08 23:54:29 +08:00
commit 7e1785e08e
31 changed files with 3639 additions and 0 deletions

11
phone_agent/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
Phone Agent - An AI-powered phone automation framework.
This package provides tools for automating Android phone interactions
using AI models for visual understanding and decision making.
"""
from phone_agent.agent import PhoneAgent
__version__ = "0.1.0"
__all__ = ["PhoneAgent"]

View File

@@ -0,0 +1,5 @@
"""Action handling module for Phone Agent."""
from phone_agent.actions.handler import ActionHandler, ActionResult
__all__ = ["ActionHandler", "ActionResult"]

View File

@@ -0,0 +1,307 @@
"""Action handler for processing AI model outputs."""
import time
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.adb import (
back,
clear_text,
detect_and_set_adb_keyboard,
double_tap,
home,
launch_app,
long_press,
restore_keyboard,
swipe,
tap,
type_text,
)
@dataclass
class ActionResult:
"""Result of an action execution."""
success: bool
should_finish: bool
message: str | None = None
requires_confirmation: bool = False
class ActionHandler:
"""
Handles execution of actions from AI model output.
Args:
device_id: Optional ADB device ID for multi-device setups.
confirmation_callback: Optional callback for sensitive action confirmation.
Should return True to proceed, False to cancel.
takeover_callback: Optional callback for takeover requests (login, captcha).
"""
def __init__(
self,
device_id: str | None = None,
confirmation_callback: Callable[[str], bool] | None = None,
takeover_callback: Callable[[str], None] | None = None,
):
self.device_id = device_id
self.confirmation_callback = confirmation_callback or self._default_confirmation
self.takeover_callback = takeover_callback or self._default_takeover
def execute(
self, action: dict[str, Any], screen_width: int, screen_height: int
) -> ActionResult:
"""
Execute an action from the AI model.
Args:
action: The action dictionary from the model.
screen_width: Current screen width in pixels.
screen_height: Current screen height in pixels.
Returns:
ActionResult indicating success and whether to finish.
"""
action_type = action.get("_metadata")
if action_type == "finish":
return ActionResult(
success=True, should_finish=True, message=action.get("message")
)
if action_type != "do":
return ActionResult(
success=False,
should_finish=True,
message=f"Unknown action type: {action_type}",
)
action_name = action.get("action")
handler_method = self._get_handler(action_name)
if handler_method is None:
return ActionResult(
success=False,
should_finish=False,
message=f"Unknown action: {action_name}",
)
try:
return handler_method(action, screen_width, screen_height)
except Exception as e:
return ActionResult(
success=False, should_finish=False, message=f"Action failed: {e}"
)
def _get_handler(self, action_name: str) -> Callable | None:
"""Get the handler method for an action."""
handlers = {
"Launch": self._handle_launch,
"Tap": self._handle_tap,
"Type": self._handle_type,
"Type_Name": self._handle_type,
"Swipe": self._handle_swipe,
"Back": self._handle_back,
"Home": self._handle_home,
"Double Tap": self._handle_double_tap,
"Long Press": self._handle_long_press,
"Wait": self._handle_wait,
"Take_over": self._handle_takeover,
"Note": self._handle_note,
"Call_API": self._handle_call_api,
"Interact": self._handle_interact,
}
return handlers.get(action_name)
def _convert_relative_to_absolute(
self, element: list[int], screen_width: int, screen_height: int
) -> tuple[int, int]:
"""Convert relative coordinates (0-1000) to absolute pixels."""
x = int(element[0] / 1000 * screen_width)
y = int(element[1] / 1000 * screen_height)
return x, y
def _handle_launch(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle app launch action."""
app_name = action.get("app")
if not app_name:
return ActionResult(False, False, "No app name specified")
success = launch_app(app_name, self.device_id)
if success:
return ActionResult(True, False)
return ActionResult(False, False, f"App not found: {app_name}")
def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
# Check for sensitive operation
if "message" in action:
if not self.confirmation_callback(action["message"]):
return ActionResult(
success=False,
should_finish=True,
message="User cancelled sensitive operation",
)
tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_type(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle text input action."""
text = action.get("text", "")
# Switch to ADB keyboard
original_ime = detect_and_set_adb_keyboard(self.device_id)
time.sleep(1.0)
# Clear existing text and type new text
clear_text(self.device_id)
time.sleep(1.0)
type_text(text, self.device_id)
time.sleep(1.0)
# Restore original keyboard
restore_keyboard(original_ime, self.device_id)
time.sleep(1.0)
return ActionResult(True, False)
def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle swipe action."""
start = action.get("start")
end = action.get("end")
if not start or not end:
return ActionResult(False, False, "Missing swipe coordinates")
start_x, start_y = self._convert_relative_to_absolute(start, width, height)
end_x, end_y = self._convert_relative_to_absolute(end, width, height)
swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_back(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle back button action."""
back(self.device_id)
return ActionResult(True, False)
def _handle_home(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle home button action."""
home(self.device_id)
return ActionResult(True, False)
def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle double tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
double_tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle long press action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
long_press(x, y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle wait action."""
duration_str = action.get("duration", "1 seconds")
try:
duration = float(duration_str.replace("seconds", "").strip())
except ValueError:
duration = 1.0
time.sleep(duration)
return ActionResult(True, False)
def _handle_takeover(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle takeover request (login, captcha, etc.)."""
message = action.get("message", "User intervention required")
self.takeover_callback(message)
return ActionResult(True, False)
def _handle_note(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle note action (placeholder for content recording)."""
# This action is typically used for recording page content
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_call_api(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle API call action (placeholder for summarization)."""
# This action is typically used for content summarization
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_interact(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle interaction request (user choice needed)."""
# This action signals that user input is needed
return ActionResult(True, False, message="User interaction required")
@staticmethod
def _default_confirmation(message: str) -> bool:
"""Default confirmation callback using console input."""
response = input(f"Sensitive operation: {message}\nConfirm? (Y/N): ")
return response.upper() == "Y"
@staticmethod
def _default_takeover(message: str) -> None:
"""Default takeover callback using console input."""
input(f"{message}\nPress Enter after completing manual operation...")
def parse_action(response: str) -> dict[str, Any]:
"""
Parse action from model response.
Args:
response: Raw response string from the model.
Returns:
Parsed action dictionary.
Raises:
ValueError: If the response cannot be parsed.
"""
try:
# Try to evaluate as Python dict/function call
response = response.strip()
if response.startswith("do"):
action = eval(response)
elif response.startswith("finish"):
action = {
"_metadata": "finish",
"message": response.replace("finish(message=", "")[1:-2],
}
else:
raise ValueError(f"Failed to parse action: {response}")
return action
except Exception as e:
raise ValueError(f"Failed to parse action: {e}")
def do(**kwargs) -> dict[str, Any]:
"""Helper function for creating 'do' actions."""
kwargs["_metadata"] = "do"
return kwargs
def finish(**kwargs) -> dict[str, Any]:
"""Helper function for creating 'finish' actions."""
kwargs["_metadata"] = "finish"
return kwargs

View File

@@ -0,0 +1,51 @@
"""ADB utilities for Android device interaction."""
from phone_agent.adb.connection import (
ADBConnection,
ConnectionType,
DeviceInfo,
list_devices,
quick_connect,
)
from phone_agent.adb.device import (
back,
double_tap,
get_current_app,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.adb.input import (
clear_text,
detect_and_set_adb_keyboard,
restore_keyboard,
type_text,
)
from phone_agent.adb.screenshot import get_screenshot
__all__ = [
# Screenshot
"get_screenshot",
# Input
"type_text",
"clear_text",
"detect_and_set_adb_keyboard",
"restore_keyboard",
# Device control
"get_current_app",
"tap",
"swipe",
"back",
"home",
"double_tap",
"long_press",
"launch_app",
# Connection management
"ADBConnection",
"DeviceInfo",
"ConnectionType",
"quick_connect",
"list_devices",
]

View File

@@ -0,0 +1,350 @@
"""ADB connection management for local and remote devices."""
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class ConnectionType(Enum):
"""Type of ADB connection."""
USB = "usb"
WIFI = "wifi"
REMOTE = "remote"
@dataclass
class DeviceInfo:
"""Information about a connected device."""
device_id: str
status: str
connection_type: ConnectionType
model: str | None = None
android_version: str | None = None
class ADBConnection:
"""
Manages ADB connections to Android devices.
Supports USB, WiFi, and remote TCP/IP connections.
Example:
>>> conn = ADBConnection()
>>> # Connect to remote device
>>> conn.connect("192.168.1.100:5555")
>>> # List devices
>>> devices = conn.list_devices()
>>> # Disconnect
>>> conn.disconnect("192.168.1.100:5555")
"""
def __init__(self, adb_path: str = "adb"):
"""
Initialize ADB connection manager.
Args:
adb_path: Path to ADB executable.
"""
self.adb_path = adb_path
def connect(self, address: str, timeout: int = 10) -> tuple[bool, str]:
"""
Connect to a remote device via TCP/IP.
Args:
address: Device address in format "host:port" (e.g., "192.168.1.100:5555").
timeout: Connection timeout in seconds.
Returns:
Tuple of (success, message).
Note:
The remote device must have TCP/IP debugging enabled.
On the device, run: adb tcpip 5555
"""
# Validate address format
if ":" not in address:
address = f"{address}:5555" # Default ADB port
try:
result = subprocess.run(
[self.adb_path, "connect", address],
capture_output=True,
text=True,
timeout=timeout,
)
output = result.stdout + result.stderr
if "connected" in output.lower():
return True, f"Connected to {address}"
elif "already connected" in output.lower():
return True, f"Already connected to {address}"
else:
return False, output.strip()
except subprocess.TimeoutExpired:
return False, f"Connection timeout after {timeout}s"
except Exception as e:
return False, f"Connection error: {e}"
def disconnect(self, address: str | None = None) -> tuple[bool, str]:
"""
Disconnect from a remote device.
Args:
address: Device address to disconnect. If None, disconnects all.
Returns:
Tuple of (success, message).
"""
try:
cmd = [self.adb_path, "disconnect"]
if address:
cmd.append(address)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
output = result.stdout + result.stderr
return True, output.strip() or "Disconnected"
except Exception as e:
return False, f"Disconnect error: {e}"
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected devices.
Returns:
List of DeviceInfo objects.
"""
try:
result = subprocess.run(
[self.adb_path, "devices", "-l"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n")[1:]: # Skip header
if not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
device_id = parts[0]
status = parts[1]
# Determine connection type
if ":" in device_id:
conn_type = ConnectionType.REMOTE
elif "emulator" in device_id:
conn_type = ConnectionType.USB # Emulator via USB
else:
conn_type = ConnectionType.USB
# Parse additional info
model = None
for part in parts[2:]:
if part.startswith("model:"):
model = part.split(":", 1)[1]
break
devices.append(
DeviceInfo(
device_id=device_id,
status=status,
connection_type=conn_type,
model=model,
)
)
return devices
except Exception as e:
print(f"Error listing devices: {e}")
return []
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device ID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return any(d.status == "device" for d in devices)
return any(d.device_id == device_id and d.status == "device" for d in devices)
def enable_tcpip(
self, port: int = 5555, device_id: str | None = None
) -> tuple[bool, str]:
"""
Enable TCP/IP debugging on a USB-connected device.
This allows subsequent wireless connections to the device.
Args:
port: TCP port for ADB (default: 5555).
device_id: Device ID. If None, uses first available device.
Returns:
Tuple of (success, message).
Note:
The device must be connected via USB first.
After this, you can disconnect USB and connect via WiFi.
"""
try:
cmd = [self.adb_path]
if device_id:
cmd.extend(["-s", device_id])
cmd.extend(["tcpip", str(port)])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
output = result.stdout + result.stderr
if "restarting" in output.lower() or result.returncode == 0:
time.sleep(2) # Wait for ADB to restart
return True, f"TCP/IP mode enabled on port {port}"
else:
return False, output.strip()
except Exception as e:
return False, f"Error enabling TCP/IP: {e}"
def get_device_ip(self, device_id: str | None = None) -> str | None:
"""
Get the IP address of a connected device.
Args:
device_id: Device ID. If None, uses first available device.
Returns:
IP address string or None if not found.
"""
try:
cmd = [self.adb_path]
if device_id:
cmd.extend(["-s", device_id])
cmd.extend(["shell", "ip", "route"])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
# Parse IP from route output
for line in result.stdout.split("\n"):
if "src" in line:
parts = line.split()
for i, part in enumerate(parts):
if part == "src" and i + 1 < len(parts):
return parts[i + 1]
# Alternative: try wlan0 interface
cmd[-1] = "ip addr show wlan0"
result = subprocess.run(
cmd[:-1] + ["shell", "ip", "addr", "show", "wlan0"],
capture_output=True,
text=True,
timeout=5,
)
for line in result.stdout.split("\n"):
if "inet " in line:
parts = line.strip().split()
if len(parts) >= 2:
return parts[1].split("/")[0]
return None
except Exception as e:
print(f"Error getting device IP: {e}")
return None
def restart_server(self) -> tuple[bool, str]:
"""
Restart the ADB server.
Returns:
Tuple of (success, message).
"""
try:
# Kill server
subprocess.run(
[self.adb_path, "kill-server"], capture_output=True, timeout=5
)
time.sleep(1)
# Start server
subprocess.run(
[self.adb_path, "start-server"], capture_output=True, timeout=5
)
return True, "ADB server restarted"
except Exception as e:
return False, f"Error restarting server: {e}"
def quick_connect(address: str) -> tuple[bool, str]:
"""
Quick helper to connect to a remote device.
Args:
address: Device address (e.g., "192.168.1.100" or "192.168.1.100:5555").
Returns:
Tuple of (success, message).
"""
conn = ADBConnection()
return conn.connect(address)
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected devices.
Returns:
List of DeviceInfo objects.
"""
conn = ADBConnection()
return conn.list_devices()

224
phone_agent/adb/device.py Normal file
View File

@@ -0,0 +1,224 @@
"""Device control utilities for Android automation."""
import os
import subprocess
import time
from typing import List, Optional, Tuple
from phone_agent.config.apps import APP_PACKAGES
def get_current_app(device_id: str | None = None) -> str:
"""
Get the currently focused app name.
Args:
device_id: Optional ADB device ID for multi-device setups.
Returns:
The app name if recognized, otherwise "System Home".
"""
adb_prefix = _get_adb_prefix(device_id)
result = subprocess.run(
adb_prefix + ["shell", "dumpsys", "window"], capture_output=True, text=True
)
output = result.stdout
# Parse window focus info
for line in output.split("\n"):
if "mCurrentFocus" in line or "mFocusedApp" in line:
for app_name, package in APP_PACKAGES.items():
if package in line:
return app_name
return "System Home"
def tap(x: int, y: int, device_id: str | None = None, delay: float = 1.0) -> None:
"""
Tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional ADB device ID.
delay: Delay in seconds after tap.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "input", "tap", str(x), str(y)], capture_output=True
)
time.sleep(delay)
def double_tap(
x: int, y: int, device_id: str | None = None, delay: float = 1.0
) -> None:
"""
Double tap at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
device_id: Optional ADB device ID.
delay: Delay in seconds after double tap.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "input", "tap", str(x), str(y)], capture_output=True
)
time.sleep(0.1)
subprocess.run(
adb_prefix + ["shell", "input", "tap", str(x), str(y)], capture_output=True
)
time.sleep(delay)
def long_press(
x: int,
y: int,
duration_ms: int = 3000,
device_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Long press at the specified coordinates.
Args:
x: X coordinate.
y: Y coordinate.
duration_ms: Duration of press in milliseconds.
device_id: Optional ADB device ID.
delay: Delay in seconds after long press.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix
+ ["shell", "input", "swipe", str(x), str(y), str(x), str(y), str(duration_ms)],
capture_output=True,
)
time.sleep(delay)
def swipe(
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration_ms: int | None = None,
device_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Swipe from start to end coordinates.
Args:
start_x: Starting X coordinate.
start_y: Starting Y coordinate.
end_x: Ending X coordinate.
end_y: Ending Y coordinate.
duration_ms: Duration of swipe in milliseconds (auto-calculated if None).
device_id: Optional ADB device ID.
delay: Delay in seconds after swipe.
"""
adb_prefix = _get_adb_prefix(device_id)
if duration_ms is None:
# Calculate duration based on distance
dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2
duration_ms = int(dist_sq / 1000)
duration_ms = max(1000, min(duration_ms, 2000)) # Clamp between 1000-2000ms
subprocess.run(
adb_prefix
+ [
"shell",
"input",
"swipe",
str(start_x),
str(start_y),
str(end_x),
str(end_y),
str(duration_ms),
],
capture_output=True,
)
time.sleep(delay)
def back(device_id: str | None = None, delay: float = 1.0) -> None:
"""
Press the back button.
Args:
device_id: Optional ADB device ID.
delay: Delay in seconds after pressing back.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "input", "keyevent", "4"], capture_output=True
)
time.sleep(delay)
def home(device_id: str | None = None, delay: float = 1.0) -> None:
"""
Press the home button.
Args:
device_id: Optional ADB device ID.
delay: Delay in seconds after pressing home.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "input", "keyevent", "KEYCODE_HOME"], capture_output=True
)
time.sleep(delay)
def launch_app(app_name: str, device_id: str | None = None, delay: float = 1.0) -> bool:
"""
Launch an app by name.
Args:
app_name: The app name (must be in APP_PACKAGES).
device_id: Optional ADB device ID.
delay: Delay in seconds after launching.
Returns:
True if app was launched, False if app not found.
"""
if app_name not in APP_PACKAGES:
return False
adb_prefix = _get_adb_prefix(device_id)
package = APP_PACKAGES[app_name]
subprocess.run(
adb_prefix
+ [
"shell",
"monkey",
"-p",
package,
"-c",
"android.intent.category.LAUNCHER",
"1",
],
capture_output=True,
)
time.sleep(delay)
return True
def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
return ["adb"]

109
phone_agent/adb/input.py Normal file
View File

@@ -0,0 +1,109 @@
"""Input utilities for Android device text input."""
import base64
import subprocess
from typing import Optional
def type_text(text: str, device_id: str | None = None) -> None:
"""
Type text into the currently focused input field using ADB Keyboard.
Args:
text: The text to type.
device_id: Optional ADB device ID for multi-device setups.
Note:
Requires ADB Keyboard to be installed on the device.
See: https://github.com/nicnocquee/AdbKeyboard
"""
adb_prefix = _get_adb_prefix(device_id)
encoded_text = base64.b64encode(text.encode("utf-8")).decode("utf-8")
subprocess.run(
adb_prefix
+ [
"shell",
"am",
"broadcast",
"-a",
"ADB_INPUT_B64",
"--es",
"msg",
encoded_text,
],
capture_output=True,
text=True,
)
def clear_text(device_id: str | None = None) -> None:
"""
Clear text in the currently focused input field.
Args:
device_id: Optional ADB device ID for multi-device setups.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "am", "broadcast", "-a", "ADB_CLEAR_TEXT"],
capture_output=True,
text=True,
)
def detect_and_set_adb_keyboard(device_id: str | None = None) -> str:
"""
Detect current keyboard and switch to ADB Keyboard if needed.
Args:
device_id: Optional ADB device ID for multi-device setups.
Returns:
The original keyboard IME identifier for later restoration.
"""
adb_prefix = _get_adb_prefix(device_id)
# Get current IME
result = subprocess.run(
adb_prefix + ["shell", "settings", "get", "secure", "default_input_method"],
capture_output=True,
text=True,
)
current_ime = (result.stdout + result.stderr).strip()
# Switch to ADB Keyboard if not already set
if "com.android.adbkeyboard/.AdbIME" not in current_ime:
subprocess.run(
adb_prefix + ["shell", "ime", "set", "com.android.adbkeyboard/.AdbIME"],
capture_output=True,
text=True,
)
# Warm up the keyboard
type_text("", device_id)
return current_ime
def restore_keyboard(ime: str, device_id: str | None = None) -> None:
"""
Restore the original keyboard IME.
Args:
ime: The IME identifier to restore.
device_id: Optional ADB device ID for multi-device setups.
"""
adb_prefix = _get_adb_prefix(device_id)
subprocess.run(
adb_prefix + ["shell", "ime", "set", ime], capture_output=True, text=True
)
def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
return ["adb"]

View File

@@ -0,0 +1,108 @@
"""Screenshot utilities for capturing Android device screen."""
import base64
import os
import subprocess
import uuid
from dataclasses import dataclass
from io import BytesIO
from typing import Tuple
from PIL import Image
@dataclass
class Screenshot:
"""Represents a captured screenshot."""
base64_data: str
width: int
height: int
is_sensitive: bool = False
def get_screenshot(device_id: str | None = None, timeout: int = 10) -> Screenshot:
"""
Capture a screenshot from the connected Android device.
Args:
device_id: Optional ADB device ID for multi-device setups.
timeout: Timeout in seconds for screenshot operations.
Returns:
Screenshot object containing base64 data and dimensions.
Note:
If the screenshot fails (e.g., on sensitive screens like payment pages),
a black fallback image is returned with is_sensitive=True.
"""
temp_path = f"/tmp/screenshot_{uuid.uuid4()}.png"
adb_prefix = _get_adb_prefix(device_id)
try:
# Execute screenshot command
result = subprocess.run(
adb_prefix + ["shell", "screencap", "-p", "/sdcard/tmp.png"],
capture_output=True,
text=True,
timeout=timeout,
)
# Check for screenshot failure (sensitive screen)
output = result.stdout + result.stderr
if "Status: -1" in output or "Failed" in output:
return _create_fallback_screenshot(is_sensitive=True)
# Pull screenshot to local temp path
subprocess.run(
adb_prefix + ["pull", "/sdcard/tmp.png", temp_path],
capture_output=True,
text=True,
timeout=5,
)
if not os.path.exists(temp_path):
return _create_fallback_screenshot(is_sensitive=False)
# Read and encode image
img = Image.open(temp_path)
width, height = img.size
buffered = BytesIO()
img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Cleanup
os.remove(temp_path)
return Screenshot(
base64_data=base64_data, width=width, height=height, is_sensitive=False
)
except Exception as e:
print(f"Screenshot error: {e}")
return _create_fallback_screenshot(is_sensitive=False)
def _get_adb_prefix(device_id: str | None) -> list:
"""Get ADB command prefix with optional device specifier."""
if device_id:
return ["adb", "-s", device_id]
return ["adb"]
def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""Create a black fallback image when screenshot fails."""
default_width, default_height = 1080, 2400
black_img = Image.new("RGB", (default_width, default_height), color="black")
buffered = BytesIO()
black_img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return Screenshot(
base64_data=base64_data,
width=default_width,
height=default_height,
is_sensitive=is_sensitive,
)

244
phone_agent/agent.py Normal file
View File

@@ -0,0 +1,244 @@
"""Main PhoneAgent class for orchestrating phone automation."""
import json
import traceback
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.actions import ActionHandler
from phone_agent.actions.handler import do, finish, parse_action
from phone_agent.adb import get_current_app, get_screenshot
from phone_agent.config import SYSTEM_PROMPT
from phone_agent.model import ModelClient, ModelConfig
from phone_agent.model.client import MessageBuilder
@dataclass
class AgentConfig:
"""Configuration for the PhoneAgent."""
max_steps: int = 100
device_id: str | None = None
system_prompt: str = SYSTEM_PROMPT
verbose: bool = True
@dataclass
class StepResult:
"""Result of a single agent step."""
success: bool
finished: bool
action: dict[str, Any] | None
thinking: str
message: str | None = None
class PhoneAgent:
"""
AI-powered agent for automating Android phone interactions.
The agent uses a vision-language model to understand screen content
and decide on actions to complete user tasks.
Args:
model_config: Configuration for the AI model.
agent_config: Configuration for the agent behavior.
confirmation_callback: Optional callback for sensitive action confirmation.
takeover_callback: Optional callback for takeover requests.
Example:
>>> from phone_agent import PhoneAgent
>>> from phone_agent.model import ModelConfig
>>>
>>> model_config = ModelConfig(base_url="http://localhost:8000/v1")
>>> agent = PhoneAgent(model_config)
>>> agent.run("Open WeChat and send a message to John")
"""
def __init__(
self,
model_config: ModelConfig | None = None,
agent_config: AgentConfig | None = None,
confirmation_callback: Callable[[str], bool] | None = None,
takeover_callback: Callable[[str], None] | None = None,
):
self.model_config = model_config or ModelConfig()
self.agent_config = agent_config or AgentConfig()
self.model_client = ModelClient(self.model_config)
self.action_handler = ActionHandler(
device_id=self.agent_config.device_id,
confirmation_callback=confirmation_callback,
takeover_callback=takeover_callback,
)
self._context: list[dict[str, Any]] = []
self._step_count = 0
def run(self, task: str) -> str:
"""
Run the agent to complete a task.
Args:
task: Natural language description of the task.
Returns:
Final message from the agent.
"""
self._context = []
self._step_count = 0
# First step with user prompt
result = self._execute_step(task, is_first=True)
if result.finished:
return result.message or "Task completed"
# Continue until finished or max steps reached
while self._step_count < self.agent_config.max_steps:
result = self._execute_step(is_first=False)
if result.finished:
return result.message or "Task completed"
return "Max steps reached"
def step(self, task: str | None = None) -> StepResult:
"""
Execute a single step of the agent.
Useful for manual control or debugging.
Args:
task: Task description (only needed for first step).
Returns:
StepResult with step details.
"""
is_first = len(self._context) == 0
if is_first and not task:
raise ValueError("Task is required for the first step")
return self._execute_step(task, is_first)
def reset(self) -> None:
"""Reset the agent state for a new task."""
self._context = []
self._step_count = 0
def _execute_step(
self, user_prompt: str | None = None, is_first: bool = False
) -> StepResult:
"""Execute a single step of the agent loop."""
self._step_count += 1
# Capture current screen state
screenshot = get_screenshot(self.agent_config.device_id)
current_app = get_current_app(self.agent_config.device_id)
# Build messages
if is_first:
self._context.append(
MessageBuilder.create_system_message(self.agent_config.system_prompt)
)
screen_info = MessageBuilder.build_screen_info(current_app)
text_content = f"{user_prompt}\n\n{screen_info}"
self._context.append(
MessageBuilder.create_user_message(
text=text_content, image_base64=screenshot.base64_data
)
)
else:
screen_info = MessageBuilder.build_screen_info(current_app)
text_content = f"** Screen Info **\n\n{screen_info}"
self._context.append(
MessageBuilder.create_user_message(
text=text_content, image_base64=screenshot.base64_data
)
)
# Get model response
try:
response = self.model_client.request(self._context)
except Exception as e:
if self.agent_config.verbose:
traceback.print_exc()
return StepResult(
success=False,
finished=True,
action=None,
thinking="",
message=f"Model error: {e}",
)
# Parse action from response
try:
action = parse_action(response.action)
except ValueError:
if self.agent_config.verbose:
traceback.print_exc()
action = finish(message=response.action)
if self.agent_config.verbose:
# 打印思考过程
print("\n" + "=" * 50)
print("💭 思考过程:")
print("-" * 50)
print(response.thinking)
print("-" * 50)
print("🎯 执行动作:")
print(json.dumps(action, ensure_ascii=False, indent=2))
print("=" * 50 + "\n")
# Remove image from context to save space
self._context[-1] = MessageBuilder.remove_images_from_message(self._context[-1])
# Execute action
try:
result = self.action_handler.execute(
action, screenshot.width, screenshot.height
)
except Exception as e:
if self.agent_config.verbose:
traceback.print_exc()
result = self.action_handler.execute(
finish(message=str(e)), screenshot.width, screenshot.height
)
# Add assistant response to context
self._context.append(
MessageBuilder.create_assistant_message(
f"<think>{response.thinking}</think><answer>{response.action}</answer>"
)
)
# Check if finished
finished = action.get("_metadata") == "finish" or result.should_finish
if finished and self.agent_config.verbose:
print("\n" + "🎉 " + "=" * 48)
print(f"✅ 任务完成: {result.message or action.get('message', '完成')}")
print("=" * 50 + "\n")
return StepResult(
success=result.success,
finished=finished,
action=action,
thinking=response.thinking,
message=result.message or action.get("message"),
)
@property
def context(self) -> list[dict[str, Any]]:
"""Get the current conversation context."""
return self._context.copy()
@property
def step_count(self) -> int:
"""Get the current step count."""
return self._step_count

View File

@@ -0,0 +1,6 @@
"""Configuration module for Phone Agent."""
from phone_agent.config.apps import APP_PACKAGES
from phone_agent.config.prompts import SYSTEM_PROMPT
__all__ = ["APP_PACKAGES", "SYSTEM_PROMPT"]

111
phone_agent/config/apps.py Normal file
View File

@@ -0,0 +1,111 @@
"""App name to package name mapping for supported applications."""
APP_PACKAGES: dict[str, str] = {
# Social & Messaging
"微信": "com.tencent.mm",
"QQ": "com.tencent.mobileqq",
"微博": "com.sina.weibo",
# E-commerce
"淘宝": "com.taobao.taobao",
"京东": "com.jingdong.app.mall",
"拼多多": "com.xunmeng.pinduoduo",
"淘宝闪购": "com.taobao.taobao",
"京东秒送": "com.jingdong.app.mall",
# Lifestyle & Social
"小红书": "com.xingin.xhs",
"豆瓣": "com.douban.frodo",
"知乎": "com.zhihu.android",
# Maps & Navigation
"高德地图": "com.autonavi.minimap",
"百度地图": "com.baidu.BaiduMap",
# Food & Services
"美团": "com.sankuai.meituan",
"大众点评": "com.dianping.v1",
"饿了么": "me.ele",
"肯德基": "com.yek.android.kfc.activitys",
# Travel
"携程": "ctrip.android.view",
"铁路12306": "com.MobileTicket",
"12306": "com.MobileTicket",
"去哪儿": "com.Qunar",
"去哪儿旅行": "com.Qunar",
"滴滴出行": "com.sdu.didi.psnger",
# Video & Entertainment
"bilibili": "tv.danmaku.bili",
"抖音": "com.ss.android.ugc.aweme",
"快手": "com.smile.gifmaker",
"腾讯视频": "com.tencent.qqlive",
"爱奇艺": "com.qiyi.video",
"优酷视频": "com.youku.phone",
"芒果TV": "com.hunantv.imgo.activity",
"红果短剧": "com.phoenix.read",
# Music & Audio
"网易云音乐": "com.netease.cloudmusic",
"QQ音乐": "com.tencent.qqmusic",
"汽水音乐": "com.luna.music",
"喜马拉雅": "com.ximalaya.ting.android",
# Reading
"番茄小说": "com.dragon.read",
"番茄免费小说": "com.dragon.read",
"七猫免费小说": "com.kmxs.reader",
# Productivity
"飞书": "com.ss.android.lark",
"QQ邮箱": "com.tencent.androidqqmail",
# AI & Tools
"豆包": "com.larus.nova",
# Health & Fitness
"keep": "com.gotokeep.keep",
"美柚": "com.lingan.seeyou",
# News & Information
"腾讯新闻": "com.tencent.news",
"今日头条": "com.ss.android.article.news",
# Real Estate
"贝壳找房": "com.lianjia.beike",
"安居客": "com.anjuke.android.app",
# Finance
"同花顺": "com.hexin.plat.android",
# Games
"星穹铁道": "com.miHoYo.hkrpg",
"崩坏:星穹铁道": "com.miHoYo.hkrpg",
"恋与深空": "com.papegames.lysk.cn",
}
def get_package_name(app_name: str) -> str | None:
"""
Get the package name for an app.
Args:
app_name: The display name of the app.
Returns:
The Android package name, or None if not found.
"""
return APP_PACKAGES.get(app_name)
def get_app_name(package_name: str) -> str | None:
"""
Get the app name from a package name.
Args:
package_name: The Android package name.
Returns:
The display name of the app, or None if not found.
"""
for name, package in APP_PACKAGES.items():
if package == package_name:
return name
return None
def list_supported_apps() -> list[str]:
"""
Get a list of all supported app names.
Returns:
List of app names.
"""
return list(APP_PACKAGES.keys())

View File

@@ -0,0 +1,70 @@
"""System prompts for the AI agent."""
from datetime import datetime
today = datetime.today()
formatted_date = today.strftime("%Y年%m月%d")
SYSTEM_PROMPT = "今天的日期是: " + formatted_date + '''
你是一个智能体分析专家,可以根据操作历史和当前状态图执行一系列操作来完成任务。
你必须严格按照要求输出以下格式:
<think>{think}</think>
<answer>{action}</answer>
其中:
- {think} 是对你为什么选择这个操作的简短推理说明。
- {action} 是本次执行的具体操作指令,必须严格遵循下方定义的指令格式。
操作指令及其作用如下:
- do(action="Launch", app="xxx")
Launch是启动目标app的操作这比通过主屏幕导航更快。此操作完成后您将自动收到结果状态的截图。
- do(action="Tap", element=[x,y])
Tap是点击操作点击屏幕上的特定点。可用此操作点击按钮、选择项目、从主屏幕打开应用程序或与任何可点击的用户界面元素进行交互。坐标系统从左上角 (0,0) 开始到右下角999,999)结束。此操作完成后,您将自动收到结果状态的截图。
- do(action="Tap", element=[x,y], message="重要操作")
基本功能同Tap点击涉及财产、支付、隐私等敏感按钮时触发。
- do(action="Type", text="xxx")
Type是输入操作在当前聚焦的输入框中输入文本。使用此操作前请确保输入框已被聚焦先点击它。输入的文本将像使用键盘输入一样输入。重要提示手机可能正在使用 ADB 键盘,该键盘不会像普通键盘那样占用屏幕空间。要确认键盘已激活,请查看屏幕底部是否显示 'ADB Keyboard {ON}' 类似的文本,或者检查输入框是否处于激活/高亮状态。不要仅仅依赖视觉上的键盘显示。自动清除文本:当你使用输入操作时,输入框中现有的任何文本(包括占位符文本和实际输入)都会在输入新文本前自动清除。你无需在输入前手动清除文本——直接使用输入操作输入所需文本即可。操作完成后,你将自动收到结果状态的截图。
- do(action="Type_Name", text="xxx")
Type_Name是输入人名的操作基本功能同Type。
- do(action="Interact")
Interact是当有多个满足条件的选项时而触发的交互操作询问用户如何选择。
- do(action="Swipe", start=[x1,y1], end=[x2,y2])
Swipe是滑动操作通过从起始坐标拖动到结束坐标来执行滑动手势。可用于滚动内容、在屏幕之间导航、下拉通知栏以及项目栏或进行基于手势的导航。坐标系统从左上角 (0,0) 开始到右下角999,999)结束。滑动持续时间会自动调整以实现自然的移动。此操作完成后,您将自动收到结果状态的截图。
- do(action="Note", message="True")
记录当前页面内容以便后续总结。
- do(action="Call_API", instruction="xxx")
总结或评论当前页面或已记录的内容。
- do(action="Long Press", element=[x,y])
Long Pres是长按操作在屏幕上的特定点长按指定时间。可用于触发上下文菜单、选择文本或激活长按交互。坐标系统从左上角 (0,0) 开始到右下角999,999)结束。此操作完成后,您将自动收到结果状态的屏幕截图。
- do(action="Double Tap", element=[x,y])
Double Tap在屏幕上的特定点快速连续点按两次。使用此操作可以激活双击交互如缩放、选择文本或打开项目。坐标系统从左上角 (0,0) 开始到右下角999,999)结束。此操作完成后,您将自动收到结果状态的截图。
- do(action="Take_over", message="xxx")
Take_over是接管操作表示在登录和验证阶段需要用户协助。
- do(action="Back")
导航返回到上一个屏幕或关闭当前对话框。相当于按下 Android 的返回按钮。使用此操作可以从更深的屏幕返回、关闭弹出窗口或退出当前上下文。此操作完成后,您将自动收到结果状态的截图。
- do(action="Home")
Home是回到系统桌面的操作相当于按下 Android 主屏幕按钮。使用此操作可退出当前应用并返回启动器,或从已知状态启动新任务。此操作完成后,您将自动收到结果状态的截图。
- do(action="Wait", duration="x seconds")
等待页面加载x为需要等待多少秒。
- finish(message="xxx")
finish是结束任务的操作表示准确完整完成任务message是终止信息。
必须遵循的规则:
1. 在执行任何操作前先检查当前app是否是目标app如果不是先执行 Launch。
2. 如果进入到了无关页面,先执行 Back。如果执行Back后页面没有变化请点击页面左上角的返回键进行返回或者右上角的X号关闭。
3. 如果页面未加载出内容,最多连续 Wait 三次,否则执行 Back重新进入。
4. 如果页面显示网络问题,需要重新加载,请点击重新加载。
5. 如果当前页面找不到目标联系人、商品、店铺等信息,可以尝试 Swipe 滑动查找。
6. 遇到价格区间、时间区间等筛选条件,如果没有完全符合的,可以放宽要求。
7. 在做小红书总结类任务时一定要筛选图文笔记。
8. 购物车全选后再点击全选可以把状态设为全不选,在做购物车任务时,如果购物车里已经有商品被选中时,你需要点击全选后再点击取消全选,再去找需要购买或者删除的商品。
9. 在做外卖任务时,如果相应店铺购物车里已经有其他商品你需要先把购物车清空再去购买用户指定的外卖。
10. 在做点外卖任务时,如果用户需要点多个外卖,请尽量在同一店铺进行购买,如果无法找到可以下单,并说明某个商品未找到。
11. 请严格遵循用户意图执行任务用户的特殊要求可以执行多次搜索滑动查找。比如i用户要求点一杯咖啡要咸的你可以直接搜索咸咖啡或者搜索咖啡后滑动查找咸的咖啡比如海盐咖啡。ii用户要找到XX群发一条消息你可以先搜索XX群找不到结果后""字去掉搜索XX重试。iii用户要找到宠物友好的餐厅你可以搜索餐厅找到筛选找到设施选择可带宠物或者直接搜索可带宠物必要时可以使用AI搜索。
12. 在选择日期时,如果原滑动方向与预期日期越来越远,请向反方向滑动查找。
13. 执行任务过程中如果有多个可选择的项目栏,请逐个查找每个项目栏,直到完成任务,一定不要在同一项目栏多次查找,从而陷入死循环。
14. 在执行下一步操作前请一定要检查上一步的操作是否生效如果点击没生效可能因为app反应较慢请先稍微等待一下如果还是不生效请调整一下点击位置重试如果仍然不生效请跳过这一步继续任务并在finish message说明点击不生效。
15. 在执行任务中如果遇到滑动不生效的情况请调整一下起始点位置增大滑动距离重试如果还是不生效有可能是已经滑到底了请继续向反方向滑动直到顶部或底部如果仍然没有符合要求的结果请跳过这一步继续任务并在finish message说明但没找到要求的项目。
16. 在做游戏任务时如果在战斗页面如果有自动战斗一定要开启自动战斗,如果多轮历史状态相似要检查自动战斗是否开启。
17. 如果没有合适的搜索结果,可能是因为搜索页面不对,请返回到搜索页面的上一级尝试重新搜索,如果尝试三次返回上一级搜索后仍然没有符合要求的结果,执行 finish(message="原因")。
18. 在结束任务前请一定要仔细检查任务是否完整准确的完成,如果出现错选、漏选、多选的情况,请返回之前的步骤进行纠正。
'''

View File

@@ -0,0 +1,5 @@
"""Model client module for AI inference."""
from phone_agent.model.client import ModelClient, ModelConfig
__all__ = ["ModelClient", "ModelConfig"]

168
phone_agent/model/client.py Normal file
View File

@@ -0,0 +1,168 @@
"""Model client for AI inference using OpenAI-compatible API."""
import json
from dataclasses import dataclass, field
from typing import Any
from openai import OpenAI
@dataclass
class ModelConfig:
"""Configuration for the AI model."""
base_url: str = "http://localhost:8000/v1"
api_key: str = "EMPTY"
model_name: str = "autoglm-phone-9b"
max_tokens: int = 3000
temperature: float = 0.0
top_p: float = 0.85
frequency_penalty: float = 0.2
extra_body: dict[str, Any] = field(
default_factory=lambda: {"skip_special_tokens": False}
)
@dataclass
class ModelResponse:
"""Response from the AI model."""
thinking: str
action: str
raw_content: str
class ModelClient:
"""
Client for interacting with OpenAI-compatible vision-language models.
Args:
config: Model configuration.
"""
def __init__(self, config: ModelConfig | None = None):
self.config = config or ModelConfig()
self.client = OpenAI(base_url=self.config.base_url, api_key=self.config.api_key)
def request(self, messages: list[dict[str, Any]]) -> ModelResponse:
"""
Send a request to the model.
Args:
messages: List of message dictionaries in OpenAI format.
Returns:
ModelResponse containing thinking and action.
Raises:
ValueError: If the response cannot be parsed.
"""
response = self.client.chat.completions.create(
messages=messages,
model=self.config.model_name,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
top_p=self.config.top_p,
frequency_penalty=self.config.frequency_penalty,
extra_body=self.config.extra_body,
)
raw_content = response.choices[0].message.content
# Parse thinking and action from response
thinking, action = self._parse_response(raw_content)
return ModelResponse(thinking=thinking, action=action, raw_content=raw_content)
def _parse_response(self, content: str) -> tuple[str, str]:
"""
Parse the model response into thinking and action parts.
Args:
content: Raw response content.
Returns:
Tuple of (thinking, action).
"""
if "<answer>" not in content:
return "", content
parts = content.split("<answer>", 1)
thinking = parts[0].replace("<think>", "").replace("</think>", "").strip()
action = parts[1].replace("</answer>", "").strip()
return thinking, action
class MessageBuilder:
"""Helper class for building conversation messages."""
@staticmethod
def create_system_message(content: str) -> dict[str, Any]:
"""Create a system message."""
return {"role": "system", "content": content}
@staticmethod
def create_user_message(
text: str, image_base64: str | None = None
) -> dict[str, Any]:
"""
Create a user message with optional image.
Args:
text: Text content.
image_base64: Optional base64-encoded image.
Returns:
Message dictionary.
"""
content = []
if image_base64:
content.append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_base64}"},
}
)
content.append({"type": "text", "text": text})
return {"role": "user", "content": content}
@staticmethod
def create_assistant_message(content: str) -> dict[str, Any]:
"""Create an assistant message."""
return {"role": "assistant", "content": content}
@staticmethod
def remove_images_from_message(message: dict[str, Any]) -> dict[str, Any]:
"""
Remove image content from a message to save context space.
Args:
message: Message dictionary.
Returns:
Message with images removed.
"""
if isinstance(message.get("content"), list):
message["content"] = [
item for item in message["content"] if item.get("type") == "text"
]
return message
@staticmethod
def build_screen_info(current_app: str, **extra_info) -> str:
"""
Build screen info string for the model.
Args:
current_app: Current app name.
**extra_info: Additional info to include.
Returns:
JSON string with screen info.
"""
info = {"current_app": current_app, **extra_info}
return json.dumps(info, ensure_ascii=False)