#!/usr/bin/env python3 """ Phone Agent Callback Hooks Examples / Phone Agent 回调钩子示例 Demonstrates how to use callback hooks to control task execution. 演示如何使用回调钩子来控制任务执行。 Features / 功能: - Interrupt tasks during execution / 在执行过程中中断任务 - Switch to new tasks dynamically / 动态切换到新任务 - Modify actions before execution / 在执行前修改操作 - Collect execution statistics / 收集执行统计信息 Configuration / 配置: This script loads settings from .env file (if present). 本脚本会从 .env 文件加载配置(如果存在)。 """ import threading import time from collections import defaultdict from dotenv import load_dotenv # Load .env file for configuration # 加载 .env 配置文件 load_dotenv() from phone_agent import PhoneAgent from phone_agent.agent import AgentConfig, StepResult from phone_agent.config import get_messages from phone_agent.model import ModelConfig def _create_agent_with_callbacks( lang: str = "cn", step_callback=None, before_action_callback=None, ) -> PhoneAgent: """ Helper function to create PhoneAgent with config from environment variables. 从环境变量创建配置的 PhoneAgent 辅助函数。 """ import os model_config = ModelConfig( base_url=os.getenv("PHONE_AGENT_BASE_URL", "http://localhost:8000/v1"), model_name=os.getenv("PHONE_AGENT_MODEL", "autoglm-phone-9b"), api_key=os.getenv("PHONE_AGENT_API_KEY", "EMPTY"), ) agent_config = AgentConfig( max_steps=int(os.getenv("PHONE_AGENT_MAX_STEPS", "100")), device_id=os.getenv("PHONE_AGENT_DEVICE_ID"), lang=lang, step_callback=step_callback, before_action_callback=before_action_callback, ) return PhoneAgent(model_config=model_config, agent_config=agent_config) # ============================================================================ # Example 1: Interrupt after max steps / 示例1:达到最大步数后中断 # ============================================================================ def example_interrupt_after_steps(lang: str = "cn"): """Interrupt task after reaching maximum steps / 达到最大步数后中断任务""" msgs = get_messages(lang) max_allowed_steps = 5 steps_taken = [] def step_callback(result: StepResult) -> str | None: """Called after each step / 每步执行后调用""" steps_taken.append(result.step_count) print(f"[Callback] Step {result.step_count}: {result.action}") # Interrupt after reaching max steps if result.step_count >= max_allowed_steps: return "stop" return None agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback) print(f"\n{'='*50}") print(f"Task will be interrupted after {max_allowed_steps} steps") print(f"{'='*50}\n") result = agent.run("打开小红书浏览美食内容") print(f"\n{'='*50}") print(f"Task interrupted after {len(steps_taken)} steps") print(f"{'='*50}") # ============================================================================ # Example 2: Switch tasks dynamically / 示例2:动态切换任务 # ============================================================================ def example_switch_tasks(lang: str = "cn"): """Switch to new task based on conditions / 根据条件切换到新任务""" msgs = get_messages(lang) task_queue = ["打开微信", "打开淘宝", "打开美团"] current_task_index = [0] # Use list to allow modification in closure def step_callback(result: StepResult) -> str | None: """Switch to next task after current one completes""" # Check if current task is done if result.finished and current_task_index[0] < len(task_queue): next_task = task_queue[current_task_index[0]] current_task_index[0] += 1 print(f"\n[Callback] Switching to task: {next_task}\n") return next_task return None agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback) print(f"\n{'='*50}") print(f"Task queue: {' -> '.join(task_queue)}") print(f"{'='*50}\n") # Start with first task result = agent.run(task_queue.pop(0)) # ============================================================================ # Example 3: Interactive task control / 示例3:交互式任务控制 # ============================================================================ def example_interactive_control(lang: str = "cn"): """Control task execution with user input / 通过用户输入控制任务执行""" import sys from select import select msgs = get_messages(lang) class TaskController: def __init__(self): self.new_task = None self.should_stop = False self._thread = threading.Thread(target=self._input_listener, daemon=True) self._thread.start() def _input_listener(self): """Listen for user input in background""" print("\n[Interactive Control]") print(" Enter 's' + Enter to stop") print(" Enter 'n:' + Enter to switch to new task") print("-" * 40) while True: try: # Non-blocking input check if select([sys.stdin], [], [], 0.1)[0]: cmd = sys.stdin.readline().strip() if cmd.lower() == 's': self.should_stop = True print("\n[Control] STOP requested\n") elif cmd.lower().startswith('n:'): self.new_task = cmd[2:] print(f"\n[Control] New task: {self.new_task}\n") except (IOError, EOFError): break def step_callback(self, result: StepResult) -> str | None: """Check control flags after each step""" if self.should_stop: return "stop" if self.new_task: task = self.new_task self.new_task = None return task return None controller = TaskController() agent = _create_agent_with_callbacks(lang=lang, step_callback=controller.step_callback) print(f"\n{'='*50}") print(f"Interactive Control Mode") print(f"{'='*50}\n") result = agent.run("打开抖音浏览视频") # ============================================================================ # Example 4: Modify actions before execution / 示例4:在执行前修改操作 # ============================================================================ def example_modify_actions(lang: str = "cn"): """Modify actions before they are executed / 在操作执行前进行修改""" msgs = get_messages(lang) action_log = [] def before_action_callback(action: dict) -> dict | None: """Called before executing each action / 每个操作执行前调用""" action_log.append(action.copy()) # Example: Prevent launching certain apps if action.get("action") == "Launch": app = action.get("app", "") if "游戏" in app or "Game" in app: print(f"[Callback] Blocked launching: {app}") # Replace with a safe action return {"action": "Note", "message": f"Skipped {app}"} # Example: Add delay after Tap actions if action.get("action") == "Tap": print(f"[Callback] Tap at {action.get('element')}") # Return None to proceed with original action return None agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback) print(f"\n{'='*50}") print(f"Actions will be logged and filtered") print(f"{'='*50}\n") result = agent.run("打开微信查看朋友圈") print(f"\n{'='*50}") print(f"Total actions executed: {len(action_log)}") print(f"{'='*50}") # ============================================================================ # Example 5: Collect statistics / 示例5:收集统计信息 # ============================================================================ def example_collect_statistics(lang: str = "cn"): """Collect execution statistics / 收集执行统计信息""" msgs = get_messages(lang) stats = { "actions": defaultdict(int), "total_steps": 0, "errors": 0, } def before_action_callback(action: dict) -> dict | None: """Count action types""" action_type = action.get("action", "Unknown") stats["actions"][action_type] += 1 return None def step_callback(result: StepResult) -> str | None: """Collect step statistics""" stats["total_steps"] = result.step_count if not result.success: stats["errors"] += 1 return None agent = _create_agent_with_callbacks( lang=lang, before_action_callback=before_action_callback, step_callback=step_callback, ) print(f"\n{'='*50}") print(f"Collecting execution statistics...") print(f"{'='*50}\n") result = agent.run("打开美团搜索附近的餐厅") print(f"\n{'='*50}") print(f"Execution Statistics:") print(f" Total steps: {stats['total_steps']}") print(f" Errors: {stats['errors']}") print(f"\n Action breakdown:") for action_type, count in sorted(stats["actions"].items()): print(f" {action_type}: {count}") print(f"{'='*50}") # ============================================================================ # Example 6: Task queue with priority / 示例6:优先级任务队列 # ============================================================================ def example_priority_task_queue(lang: str = "cn"): """Implement priority-based task switching / 实现基于优先级的任务切换""" from queue import PriorityQueue from dataclasses import dataclass, field @dataclass(order=True) class PrioritizedTask: priority: int task: str = field(compare=False) task_queue = PriorityQueue() task_queue.put(PrioritizedTask(priority=2, task="打开微信发消息")) task_queue.put(PrioritizedTask(priority=1, task="打开淘宝")) # Higher priority task_queue.put(PrioritizedTask(priority=3, task="打开美团")) def step_callback(result: StepResult) -> str | None: """Switch to higher priority task when available""" if result.finished and not task_queue.empty(): next_task = task_queue.get() print(f"\n[Callback] Switching to priority {next_task.priority} task: {next_task.task}\n") return next_task.task return None agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback) print(f"\n{'='*50}") print(f"Priority Task Queue Mode") print(f"{'='*50}\n") # Start with initial task result = agent.run("打开抖音") # ============================================================================ # Example 7: Timeout control / 示例7:超时控制 # ============================================================================ def example_timeout_control(lang: str = "cn"): """Interrupt task if it takes too long / 任务耗时过长则中断""" msgs = get_messages(lang) start_time = [time.time()] timeout_seconds = 30 def step_callback(result: StepResult) -> str | None: """Check if task has timed out""" elapsed = time.time() - start_time[0] print(f"[Callback] Elapsed time: {elapsed:.1f}s") if elapsed > timeout_seconds: print(f"\n[Callback] Task timeout after {elapsed:.1f}s\n") return "stop" return None agent = _create_agent_with_callbacks(lang=lang, step_callback=step_callback) print(f"\n{'='*50}") print(f"Task will timeout after {timeout_seconds} seconds") print(f"{'='*50}\n") result = agent.run("打开小红书浏览内容") # ============================================================================ # Example 8: Conditional action blocking / 示例8:条件性操作拦截 # ============================================================================ def example_conditional_blocking(lang: str = "cn"): """Block actions based on conditions / 根据条件拦截操作""" msgs = get_messages(lang) blocked_apps = ["王者荣耀", "和平精英", "抖音"] # Apps to block def before_action_callback(action: dict) -> dict | None: """Block launching certain apps""" if action.get("action") == "Launch": app = action.get("app", "") if any(blocked in app for blocked in blocked_apps): print(f"[Callback] BLOCKED app launch: {app}") # Return finish action to stop return {"action": "finish", "message": f"App {app} is blocked"} return None agent = _create_agent_with_callbacks(lang=lang, before_action_callback=before_action_callback) print(f"\n{'='*50}") print(f"Blocked apps: {', '.join(blocked_apps)}") print(f"{'='*50}\n") result = agent.run("打开抖音刷视频") # ============================================================================ # Main / 主程序 # ============================================================================ if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Phone Agent Callback Hooks Examples") parser.add_argument( "--example", "-e", type=int, default=1, choices=range(1, 9), help="Example number to run (1-8)", ) parser.add_argument( "--lang", type=str, default="cn", choices=["cn", "en"], help="Language for UI messages (cn=Chinese, en=English)", ) args = parser.parse_args() examples = { 1: ("Interrupt after max steps", example_interrupt_after_steps), 2: ("Switch tasks dynamically", example_switch_tasks), 3: ("Interactive task control", example_interactive_control), 4: ("Modify actions before execution", example_modify_actions), 5: ("Collect statistics", example_collect_statistics), 6: ("Priority task queue", example_priority_task_queue), 7: ("Timeout control", example_timeout_control), 8: ("Conditional action blocking", example_conditional_blocking), } name, func = examples[args.example] print(f"\n{'='*50}") print(f"Example {args.example}: {name}") print(f"{'='*50}") func(args.lang) print(f"\n✓ Example {args.example} completed\n")