Files
Open-AutoGLM/examples/callback_hooks.py
let5sne.win10 3552df23d6 Add Web Dashboard with multi-device control and callback hooks
Features:
- Web Dashboard: FastAPI-based dashboard with Vue.js frontend
  - Multi-device support (ADB, HDC, iOS)
  - Real-time WebSocket updates for task progress
  - Device management with status tracking
  - Task queue with execution controls (start/stop/re-execute)
  - Detailed task information display (thinking, actions, completion messages)
  - Screenshot viewing per device
  - LAN deployment support with configurable CORS

- Callback Hooks: Interrupt and modify task execution
  - step_callback: Called after each step with StepResult
  - before_action_callback: Called before executing action
  - Support for task interruption and dynamic task switching
  - Example scripts demonstrating callback usage

- Configuration: Environment-based configuration
  - .env file support for all settings
  - .env.example template with documentation
  - Model API configuration (base URL, model name, API key)
  - Dashboard configuration (host, port, CORS, device type)
  - Phone agent configuration (delays, max steps, language)

Technical improvements:
- Fixed forward reference issue with StepResult
- Added package exports for callback types and configs
- Enhanced dependencies with FastAPI, WebSocket support
- Thread-safe task execution with device locking
- Async WebSocket broadcasting from sync thread pool

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-09 02:20:06 +08:00

416 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phone Agent Callback Hooks Examples / Phone Agent 回调钩子示例
Demonstrates how to use callback hooks to control task execution.
演示如何使用回调钩子来控制任务执行。
Features / 功能:
- Interrupt tasks during execution / 在执行过程中中断任务
- Switch to new tasks dynamically / 动态切换到新任务
- Modify actions before execution / 在执行前修改操作
- Collect execution statistics / 收集执行统计信息
Configuration / 配置:
This script loads settings from .env file (if present).
本脚本会从 .env 文件加载配置(如果存在)。
"""
import threading
import time
from collections import defaultdict
from dotenv import load_dotenv
# Load .env file for configuration
# 加载 .env 配置文件
load_dotenv()
from phone_agent import PhoneAgent
from phone_agent.agent import AgentConfig, StepResult
from phone_agent.config import get_messages
from phone_agent.model import ModelConfig
def _create_agent_with_callbacks(
lang: str = "cn",
step_callback=None,
before_action_callback=None,
) -> PhoneAgent:
"""
Helper function to create PhoneAgent with config from environment variables.
从环境变量创建配置的 PhoneAgent 辅助函数。
"""
import os
model_config = ModelConfig(
base_url=os.getenv("PHONE_AGENT_BASE_URL", "http://localhost:8000/v1"),
model_name=os.getenv("PHONE_AGENT_MODEL", "autoglm-phone-9b"),
api_key=os.getenv("PHONE_AGENT_API_KEY", "EMPTY"),
)
agent_config = AgentConfig(
max_steps=int(os.getenv("PHONE_AGENT_MAX_STEPS", "100")),
device_id=os.getenv("PHONE_AGENT_DEVICE_ID"),
lang=lang,
step_callback=step_callback,
before_action_callback=before_action_callback,
)
return PhoneAgent(model_config=model_config, agent_config=agent_config)
# ============================================================================
# Example 1: Interrupt after max steps / 示例1达到最大步数后中断
# ============================================================================
def example_interrupt_after_steps(lang: str = "cn"):
"""Interrupt task after reaching maximum steps / 达到最大步数后中断任务"""
msgs = get_messages(lang)
max_allowed_steps = 5
steps_taken = []
def step_callback(result: StepResult) -> str | None:
"""Called after each step / 每步执行后调用"""
steps_taken.append(result.step_count)
print(f"[Callback] Step {result.step_count}: {result.action}")
# Interrupt after reaching max steps
if result.step_count >= max_allowed_steps:
return "stop"
return None
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
print(f"\n{'='*50}")
print(f"Task will be interrupted after {max_allowed_steps} steps")
print(f"{'='*50}\n")
result = agent.run("打开小红书浏览美食内容")
print(f"\n{'='*50}")
print(f"Task interrupted after {len(steps_taken)} steps")
print(f"{'='*50}")
# ============================================================================
# Example 2: Switch tasks dynamically / 示例2动态切换任务
# ============================================================================
def example_switch_tasks(lang: str = "cn"):
"""Switch to new task based on conditions / 根据条件切换到新任务"""
msgs = get_messages(lang)
task_queue = ["打开微信", "打开淘宝", "打开美团"]
current_task_index = [0] # Use list to allow modification in closure
def step_callback(result: StepResult) -> str | None:
"""Switch to next task after current one completes"""
# Check if current task is done
if result.finished and current_task_index[0] < len(task_queue):
next_task = task_queue[current_task_index[0]]
current_task_index[0] += 1
print(f"\n[Callback] Switching to task: {next_task}\n")
return next_task
return None
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
print(f"\n{'='*50}")
print(f"Task queue: {' -> '.join(task_queue)}")
print(f"{'='*50}\n")
# Start with first task
result = agent.run(task_queue.pop(0))
# ============================================================================
# Example 3: Interactive task control / 示例3交互式任务控制
# ============================================================================
def example_interactive_control(lang: str = "cn"):
"""Control task execution with user input / 通过用户输入控制任务执行"""
import sys
from select import select
msgs = get_messages(lang)
class TaskController:
def __init__(self):
self.new_task = None
self.should_stop = False
self._thread = threading.Thread(target=self._input_listener, daemon=True)
self._thread.start()
def _input_listener(self):
"""Listen for user input in background"""
print("\n[Interactive Control]")
print(" Enter 's' + Enter to stop")
print(" Enter 'n:<task>' + Enter to switch to new task")
print("-" * 40)
while True:
try:
# Non-blocking input check
if select([sys.stdin], [], [], 0.1)[0]:
cmd = sys.stdin.readline().strip()
if cmd.lower() == 's':
self.should_stop = True
print("\n[Control] STOP requested\n")
elif cmd.lower().startswith('n:'):
self.new_task = cmd[2:]
print(f"\n[Control] New task: {self.new_task}\n")
except (IOError, EOFError):
break
def step_callback(self, result: StepResult) -> str | None:
"""Check control flags after each step"""
if self.should_stop:
return "stop"
if self.new_task:
task = self.new_task
self.new_task = None
return task
return None
controller = TaskController()
agent = _create_agent_with_callbacks(lang=lang, step_callback=controller.step_callback)
print(f"\n{'='*50}")
print(f"Interactive Control Mode")
print(f"{'='*50}\n")
result = agent.run("打开抖音浏览视频")
# ============================================================================
# Example 4: Modify actions before execution / 示例4在执行前修改操作
# ============================================================================
def example_modify_actions(lang: str = "cn"):
"""Modify actions before they are executed / 在操作执行前进行修改"""
msgs = get_messages(lang)
action_log = []
def before_action_callback(action: dict) -> dict | None:
"""Called before executing each action / 每个操作执行前调用"""
action_log.append(action.copy())
# Example: Prevent launching certain apps
if action.get("action") == "Launch":
app = action.get("app", "")
if "游戏" in app or "Game" in app:
print(f"[Callback] Blocked launching: {app}")
# Replace with a safe action
return {"action": "Note", "message": f"Skipped {app}"}
# Example: Add delay after Tap actions
if action.get("action") == "Tap":
print(f"[Callback] Tap at {action.get('element')}")
# Return None to proceed with original action
return None
agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback)
print(f"\n{'='*50}")
print(f"Actions will be logged and filtered")
print(f"{'='*50}\n")
result = agent.run("打开微信查看朋友圈")
print(f"\n{'='*50}")
print(f"Total actions executed: {len(action_log)}")
print(f"{'='*50}")
# ============================================================================
# Example 5: Collect statistics / 示例5收集统计信息
# ============================================================================
def example_collect_statistics(lang: str = "cn"):
"""Collect execution statistics / 收集执行统计信息"""
msgs = get_messages(lang)
stats = {
"actions": defaultdict(int),
"total_steps": 0,
"errors": 0,
}
def before_action_callback(action: dict) -> dict | None:
"""Count action types"""
action_type = action.get("action", "Unknown")
stats["actions"][action_type] += 1
return None
def step_callback(result: StepResult) -> str | None:
"""Collect step statistics"""
stats["total_steps"] = result.step_count
if not result.success:
stats["errors"] += 1
return None
agent = _create_agent_with_callbacks(
lang=lang,
before_action_callback=before_action_callback,
step_callback=step_callback,
)
print(f"\n{'='*50}")
print(f"Collecting execution statistics...")
print(f"{'='*50}\n")
result = agent.run("打开美团搜索附近的餐厅")
print(f"\n{'='*50}")
print(f"Execution Statistics:")
print(f" Total steps: {stats['total_steps']}")
print(f" Errors: {stats['errors']}")
print(f"\n Action breakdown:")
for action_type, count in sorted(stats["actions"].items()):
print(f" {action_type}: {count}")
print(f"{'='*50}")
# ============================================================================
# Example 6: Task queue with priority / 示例6优先级任务队列
# ============================================================================
def example_priority_task_queue(lang: str = "cn"):
"""Implement priority-based task switching / 实现基于优先级的任务切换"""
from queue import PriorityQueue
from dataclasses import dataclass, field
@dataclass(order=True)
class PrioritizedTask:
priority: int
task: str = field(compare=False)
task_queue = PriorityQueue()
task_queue.put(PrioritizedTask(priority=2, task="打开微信发消息"))
task_queue.put(PrioritizedTask(priority=1, task="打开淘宝")) # Higher priority
task_queue.put(PrioritizedTask(priority=3, task="打开美团"))
def step_callback(result: StepResult) -> str | None:
"""Switch to higher priority task when available"""
if result.finished and not task_queue.empty():
next_task = task_queue.get()
print(f"\n[Callback] Switching to priority {next_task.priority} task: {next_task.task}\n")
return next_task.task
return None
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
print(f"\n{'='*50}")
print(f"Priority Task Queue Mode")
print(f"{'='*50}\n")
# Start with initial task
result = agent.run("打开抖音")
# ============================================================================
# Example 7: Timeout control / 示例7超时控制
# ============================================================================
def example_timeout_control(lang: str = "cn"):
"""Interrupt task if it takes too long / 任务耗时过长则中断"""
msgs = get_messages(lang)
start_time = [time.time()]
timeout_seconds = 30
def step_callback(result: StepResult) -> str | None:
"""Check if task has timed out"""
elapsed = time.time() - start_time[0]
print(f"[Callback] Elapsed time: {elapsed:.1f}s")
if elapsed > timeout_seconds:
print(f"\n[Callback] Task timeout after {elapsed:.1f}s\n")
return "stop"
return None
agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback)
print(f"\n{'='*50}")
print(f"Task will timeout after {timeout_seconds} seconds")
print(f"{'='*50}\n")
result = agent.run("打开小红书浏览内容")
# ============================================================================
# Example 8: Conditional action blocking / 示例8条件性操作拦截
# ============================================================================
def example_conditional_blocking(lang: str = "cn"):
"""Block actions based on conditions / 根据条件拦截操作"""
msgs = get_messages(lang)
blocked_apps = ["王者荣耀", "和平精英", "抖音"] # Apps to block
def before_action_callback(action: dict) -> dict | None:
"""Block launching certain apps"""
if action.get("action") == "Launch":
app = action.get("app", "")
if any(blocked in app for blocked in blocked_apps):
print(f"[Callback] BLOCKED app launch: {app}")
# Return finish action to stop
return {"action": "finish", "message": f"App {app} is blocked"}
return None
agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback)
print(f"\n{'='*50}")
print(f"Blocked apps: {', '.join(blocked_apps)}")
print(f"{'='*50}\n")
result = agent.run("打开抖音刷视频")
# ============================================================================
# Main / 主程序
# ============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Phone Agent Callback Hooks Examples")
parser.add_argument(
"--example", "-e",
type=int,
default=1,
choices=range(1, 9),
help="Example number to run (1-8)",
)
parser.add_argument(
"--lang",
type=str,
default="cn",
choices=["cn", "en"],
help="Language for UI messages (cn=Chinese, en=English)",
)
args = parser.parse_args()
examples = {
1: ("Interrupt after max steps", example_interrupt_after_steps),
2: ("Switch tasks dynamically", example_switch_tasks),
3: ("Interactive task control", example_interactive_control),
4: ("Modify actions before execution", example_modify_actions),
5: ("Collect statistics", example_collect_statistics),
6: ("Priority task queue", example_priority_task_queue),
7: ("Timeout control", example_timeout_control),
8: ("Conditional action blocking", example_conditional_blocking),
}
name, func = examples[args.example]
print(f"\n{'='*50}")
print(f"Example {args.example}: {name}")
print(f"{'='*50}")
func(args.lang)
print(f"\n✓ Example {args.example} completed\n")