Features: - Web Dashboard: FastAPI-based dashboard with Vue.js frontend - Multi-device support (ADB, HDC, iOS) - Real-time WebSocket updates for task progress - Device management with status tracking - Task queue with execution controls (start/stop/re-execute) - Detailed task information display (thinking, actions, completion messages) - Screenshot viewing per device - LAN deployment support with configurable CORS - Callback Hooks: Interrupt and modify task execution - step_callback: Called after each step with StepResult - before_action_callback: Called before executing action - Support for task interruption and dynamic task switching - Example scripts demonstrating callback usage - Configuration: Environment-based configuration - .env file support for all settings - .env.example template with documentation - Model API configuration (base URL, model name, API key) - Dashboard configuration (host, port, CORS, device type) - Phone agent configuration (delays, max steps, language) Technical improvements: - Fixed forward reference issue with StepResult - Added package exports for callback types and configs - Enhanced dependencies with FastAPI, WebSocket support - Thread-safe task execution with device locking - Async WebSocket broadcasting from sync thread pool Co-Authored-By: Claude <noreply@anthropic.com>
416 lines
14 KiB
Python
416 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Phone Agent Callback Hooks Examples / Phone Agent 回调钩子示例
|
||
|
||
Demonstrates how to use callback hooks to control task execution.
|
||
演示如何使用回调钩子来控制任务执行。
|
||
|
||
Features / 功能:
|
||
- Interrupt tasks during execution / 在执行过程中中断任务
|
||
- Switch to new tasks dynamically / 动态切换到新任务
|
||
- Modify actions before execution / 在执行前修改操作
|
||
- Collect execution statistics / 收集执行统计信息
|
||
|
||
Configuration / 配置:
|
||
This script loads settings from .env file (if present).
|
||
本脚本会从 .env 文件加载配置(如果存在)。
|
||
"""
|
||
|
||
import threading
|
||
import time
|
||
from collections import defaultdict
|
||
from dotenv import load_dotenv
|
||
|
||
# Load .env file for configuration
|
||
# 加载 .env 配置文件
|
||
load_dotenv()
|
||
|
||
from phone_agent import PhoneAgent
|
||
from phone_agent.agent import AgentConfig, StepResult
|
||
from phone_agent.config import get_messages
|
||
from phone_agent.model import ModelConfig
|
||
|
||
|
||
def _create_agent_with_callbacks(
|
||
lang: str = "cn",
|
||
step_callback=None,
|
||
before_action_callback=None,
|
||
) -> PhoneAgent:
|
||
"""
|
||
Helper function to create PhoneAgent with config from environment variables.
|
||
从环境变量创建配置的 PhoneAgent 辅助函数。
|
||
"""
|
||
import os
|
||
|
||
model_config = ModelConfig(
|
||
base_url=os.getenv("PHONE_AGENT_BASE_URL", "http://localhost:8000/v1"),
|
||
model_name=os.getenv("PHONE_AGENT_MODEL", "autoglm-phone-9b"),
|
||
api_key=os.getenv("PHONE_AGENT_API_KEY", "EMPTY"),
|
||
)
|
||
|
||
agent_config = AgentConfig(
|
||
max_steps=int(os.getenv("PHONE_AGENT_MAX_STEPS", "100")),
|
||
device_id=os.getenv("PHONE_AGENT_DEVICE_ID"),
|
||
lang=lang,
|
||
step_callback=step_callback,
|
||
before_action_callback=before_action_callback,
|
||
)
|
||
|
||
return PhoneAgent(model_config=model_config, agent_config=agent_config)
|
||
|
||
|
||
# ============================================================================
|
||
# Example 1: Interrupt after max steps / 示例1:达到最大步数后中断
|
||
# ============================================================================
|
||
|
||
def example_interrupt_after_steps(lang: str = "cn"):
|
||
"""Interrupt task after reaching maximum steps / 达到最大步数后中断任务"""
|
||
msgs = get_messages(lang)
|
||
|
||
max_allowed_steps = 5
|
||
steps_taken = []
|
||
|
||
def step_callback(result: StepResult) -> str | None:
|
||
"""Called after each step / 每步执行后调用"""
|
||
steps_taken.append(result.step_count)
|
||
print(f"[Callback] Step {result.step_count}: {result.action}")
|
||
|
||
# Interrupt after reaching max steps
|
||
if result.step_count >= max_allowed_steps:
|
||
return "stop"
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Task will be interrupted after {max_allowed_steps} steps")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开小红书浏览美食内容")
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Task interrupted after {len(steps_taken)} steps")
|
||
print(f"{'='*50}")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 2: Switch tasks dynamically / 示例2:动态切换任务
|
||
# ============================================================================
|
||
|
||
def example_switch_tasks(lang: str = "cn"):
|
||
"""Switch to new task based on conditions / 根据条件切换到新任务"""
|
||
msgs = get_messages(lang)
|
||
|
||
task_queue = ["打开微信", "打开淘宝", "打开美团"]
|
||
current_task_index = [0] # Use list to allow modification in closure
|
||
|
||
def step_callback(result: StepResult) -> str | None:
|
||
"""Switch to next task after current one completes"""
|
||
# Check if current task is done
|
||
if result.finished and current_task_index[0] < len(task_queue):
|
||
next_task = task_queue[current_task_index[0]]
|
||
current_task_index[0] += 1
|
||
print(f"\n[Callback] Switching to task: {next_task}\n")
|
||
return next_task
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Task queue: {' -> '.join(task_queue)}")
|
||
print(f"{'='*50}\n")
|
||
|
||
# Start with first task
|
||
result = agent.run(task_queue.pop(0))
|
||
|
||
|
||
# ============================================================================
|
||
# Example 3: Interactive task control / 示例3:交互式任务控制
|
||
# ============================================================================
|
||
|
||
def example_interactive_control(lang: str = "cn"):
|
||
"""Control task execution with user input / 通过用户输入控制任务执行"""
|
||
import sys
|
||
from select import select
|
||
|
||
msgs = get_messages(lang)
|
||
|
||
class TaskController:
|
||
def __init__(self):
|
||
self.new_task = None
|
||
self.should_stop = False
|
||
self._thread = threading.Thread(target=self._input_listener, daemon=True)
|
||
self._thread.start()
|
||
|
||
def _input_listener(self):
|
||
"""Listen for user input in background"""
|
||
print("\n[Interactive Control]")
|
||
print(" Enter 's' + Enter to stop")
|
||
print(" Enter 'n:<task>' + Enter to switch to new task")
|
||
print("-" * 40)
|
||
|
||
while True:
|
||
try:
|
||
# Non-blocking input check
|
||
if select([sys.stdin], [], [], 0.1)[0]:
|
||
cmd = sys.stdin.readline().strip()
|
||
if cmd.lower() == 's':
|
||
self.should_stop = True
|
||
print("\n[Control] STOP requested\n")
|
||
elif cmd.lower().startswith('n:'):
|
||
self.new_task = cmd[2:]
|
||
print(f"\n[Control] New task: {self.new_task}\n")
|
||
except (IOError, EOFError):
|
||
break
|
||
|
||
def step_callback(self, result: StepResult) -> str | None:
|
||
"""Check control flags after each step"""
|
||
if self.should_stop:
|
||
return "stop"
|
||
if self.new_task:
|
||
task = self.new_task
|
||
self.new_task = None
|
||
return task
|
||
return None
|
||
|
||
controller = TaskController()
|
||
agent = _create_agent_with_callbacks(lang=lang, step_callback=controller.step_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Interactive Control Mode")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开抖音浏览视频")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 4: Modify actions before execution / 示例4:在执行前修改操作
|
||
# ============================================================================
|
||
|
||
def example_modify_actions(lang: str = "cn"):
|
||
"""Modify actions before they are executed / 在操作执行前进行修改"""
|
||
msgs = get_messages(lang)
|
||
|
||
action_log = []
|
||
|
||
def before_action_callback(action: dict) -> dict | None:
|
||
"""Called before executing each action / 每个操作执行前调用"""
|
||
action_log.append(action.copy())
|
||
|
||
# Example: Prevent launching certain apps
|
||
if action.get("action") == "Launch":
|
||
app = action.get("app", "")
|
||
if "游戏" in app or "Game" in app:
|
||
print(f"[Callback] Blocked launching: {app}")
|
||
# Replace with a safe action
|
||
return {"action": "Note", "message": f"Skipped {app}"}
|
||
|
||
# Example: Add delay after Tap actions
|
||
if action.get("action") == "Tap":
|
||
print(f"[Callback] Tap at {action.get('element')}")
|
||
|
||
# Return None to proceed with original action
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Actions will be logged and filtered")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开微信查看朋友圈")
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Total actions executed: {len(action_log)}")
|
||
print(f"{'='*50}")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 5: Collect statistics / 示例5:收集统计信息
|
||
# ============================================================================
|
||
|
||
def example_collect_statistics(lang: str = "cn"):
|
||
"""Collect execution statistics / 收集执行统计信息"""
|
||
msgs = get_messages(lang)
|
||
|
||
stats = {
|
||
"actions": defaultdict(int),
|
||
"total_steps": 0,
|
||
"errors": 0,
|
||
}
|
||
|
||
def before_action_callback(action: dict) -> dict | None:
|
||
"""Count action types"""
|
||
action_type = action.get("action", "Unknown")
|
||
stats["actions"][action_type] += 1
|
||
return None
|
||
|
||
def step_callback(result: StepResult) -> str | None:
|
||
"""Collect step statistics"""
|
||
stats["total_steps"] = result.step_count
|
||
if not result.success:
|
||
stats["errors"] += 1
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(
|
||
lang=lang,
|
||
before_action_callback=before_action_callback,
|
||
step_callback=step_callback,
|
||
)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Collecting execution statistics...")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开美团搜索附近的餐厅")
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Execution Statistics:")
|
||
print(f" Total steps: {stats['total_steps']}")
|
||
print(f" Errors: {stats['errors']}")
|
||
print(f"\n Action breakdown:")
|
||
for action_type, count in sorted(stats["actions"].items()):
|
||
print(f" {action_type}: {count}")
|
||
print(f"{'='*50}")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 6: Task queue with priority / 示例6:优先级任务队列
|
||
# ============================================================================
|
||
|
||
def example_priority_task_queue(lang: str = "cn"):
|
||
"""Implement priority-based task switching / 实现基于优先级的任务切换"""
|
||
from queue import PriorityQueue
|
||
from dataclasses import dataclass, field
|
||
|
||
@dataclass(order=True)
|
||
class PrioritizedTask:
|
||
priority: int
|
||
task: str = field(compare=False)
|
||
|
||
task_queue = PriorityQueue()
|
||
task_queue.put(PrioritizedTask(priority=2, task="打开微信发消息"))
|
||
task_queue.put(PrioritizedTask(priority=1, task="打开淘宝")) # Higher priority
|
||
task_queue.put(PrioritizedTask(priority=3, task="打开美团"))
|
||
|
||
def step_callback(result: StepResult) -> str | None:
|
||
"""Switch to higher priority task when available"""
|
||
if result.finished and not task_queue.empty():
|
||
next_task = task_queue.get()
|
||
print(f"\n[Callback] Switching to priority {next_task.priority} task: {next_task.task}\n")
|
||
return next_task.task
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Priority Task Queue Mode")
|
||
print(f"{'='*50}\n")
|
||
|
||
# Start with initial task
|
||
result = agent.run("打开抖音")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 7: Timeout control / 示例7:超时控制
|
||
# ============================================================================
|
||
|
||
def example_timeout_control(lang: str = "cn"):
|
||
"""Interrupt task if it takes too long / 任务耗时过长则中断"""
|
||
msgs = get_messages(lang)
|
||
|
||
start_time = [time.time()]
|
||
timeout_seconds = 30
|
||
|
||
def step_callback(result: StepResult) -> str | None:
|
||
"""Check if task has timed out"""
|
||
elapsed = time.time() - start_time[0]
|
||
print(f"[Callback] Elapsed time: {elapsed:.1f}s")
|
||
|
||
if elapsed > timeout_seconds:
|
||
print(f"\n[Callback] Task timeout after {elapsed:.1f}s\n")
|
||
return "stop"
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Task will timeout after {timeout_seconds} seconds")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开小红书浏览内容")
|
||
|
||
|
||
# ============================================================================
|
||
# Example 8: Conditional action blocking / 示例8:条件性操作拦截
|
||
# ============================================================================
|
||
|
||
def example_conditional_blocking(lang: str = "cn"):
|
||
"""Block actions based on conditions / 根据条件拦截操作"""
|
||
msgs = get_messages(lang)
|
||
|
||
blocked_apps = ["王者荣耀", "和平精英", "抖音"] # Apps to block
|
||
|
||
def before_action_callback(action: dict) -> dict | None:
|
||
"""Block launching certain apps"""
|
||
if action.get("action") == "Launch":
|
||
app = action.get("app", "")
|
||
if any(blocked in app for blocked in blocked_apps):
|
||
print(f"[Callback] BLOCKED app launch: {app}")
|
||
# Return finish action to stop
|
||
return {"action": "finish", "message": f"App {app} is blocked"}
|
||
return None
|
||
|
||
agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Blocked apps: {', '.join(blocked_apps)}")
|
||
print(f"{'='*50}\n")
|
||
|
||
result = agent.run("打开抖音刷视频")
|
||
|
||
|
||
# ============================================================================
|
||
# Main / 主程序
|
||
# ============================================================================
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="Phone Agent Callback Hooks Examples")
|
||
parser.add_argument(
|
||
"--example", "-e",
|
||
type=int,
|
||
default=1,
|
||
choices=range(1, 9),
|
||
help="Example number to run (1-8)",
|
||
)
|
||
parser.add_argument(
|
||
"--lang",
|
||
type=str,
|
||
default="cn",
|
||
choices=["cn", "en"],
|
||
help="Language for UI messages (cn=Chinese, en=English)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
examples = {
|
||
1: ("Interrupt after max steps", example_interrupt_after_steps),
|
||
2: ("Switch tasks dynamically", example_switch_tasks),
|
||
3: ("Interactive task control", example_interactive_control),
|
||
4: ("Modify actions before execution", example_modify_actions),
|
||
5: ("Collect statistics", example_collect_statistics),
|
||
6: ("Priority task queue", example_priority_task_queue),
|
||
7: ("Timeout control", example_timeout_control),
|
||
8: ("Conditional action blocking", example_conditional_blocking),
|
||
}
|
||
|
||
name, func = examples[args.example]
|
||
print(f"\n{'='*50}")
|
||
print(f"Example {args.example}: {name}")
|
||
print(f"{'='*50}")
|
||
|
||
func(args.lang)
|
||
|
||
print(f"\n✓ Example {args.example} completed\n")
|