Files
Open-AutoGLM/dashboard/services/task_executor.py
let5sne.win10 3552df23d6 Add Web Dashboard with multi-device control and callback hooks
Features:
- Web Dashboard: FastAPI-based dashboard with Vue.js frontend
  - Multi-device support (ADB, HDC, iOS)
  - Real-time WebSocket updates for task progress
  - Device management with status tracking
  - Task queue with execution controls (start/stop/re-execute)
  - Detailed task information display (thinking, actions, completion messages)
  - Screenshot viewing per device
  - LAN deployment support with configurable CORS

- Callback Hooks: Interrupt and modify task execution
  - step_callback: Called after each step with StepResult
  - before_action_callback: Called before executing action
  - Support for task interruption and dynamic task switching
  - Example scripts demonstrating callback usage

- Configuration: Environment-based configuration
  - .env file support for all settings
  - .env.example template with documentation
  - Model API configuration (base URL, model name, API key)
  - Dashboard configuration (host, port, CORS, device type)
  - Phone agent configuration (delays, max steps, language)

Technical improvements:
- Fixed forward reference issue with StepResult
- Added package exports for callback types and configs
- Enhanced dependencies with FastAPI, WebSocket support
- Thread-safe task execution with device locking
- Async WebSocket broadcasting from sync thread pool

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-09 02:20:06 +08:00

411 lines
13 KiB
Python

"""
Task executor for running tasks on devices with thread pool.
"""
import asyncio
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor, Future
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Dict, Optional
from phone_agent import AgentConfig, PhoneAgent
from phone_agent.agent import StepResult
from phone_agent.model import ModelConfig
from dashboard.config import config
from dashboard.models.task import TaskCreateRequest, TaskSchema, TaskStatus
from dashboard.services.device_manager import DeviceManager
from dashboard.services.websocket_manager import WebSocketManager
def _run_async(coro):
"""Run async coroutine from sync context and wait for completion."""
try:
loop = asyncio.get_running_loop()
# We're already in an async context, use run_coroutine_threadsafe
import concurrent.futures
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for completion with timeout to avoid hanging
try:
future.result(timeout=5)
except concurrent.futures.TimeoutError:
pass
except Exception:
pass
except RuntimeError:
# No running loop, use asyncio.run
asyncio.run(coro)
@dataclass
class ActiveTask:
"""An active task being executed."""
task_id: str
device_id: str
task: str
status: TaskStatus
current_step: int
max_steps: int
started_at: datetime
updated_at: datetime
finished_at: Optional[datetime] = None
future: Optional[Future] = None
stop_requested: bool = False
current_action: Optional[dict] = None
thinking: Optional[str] = None
error: Optional[str] = None
completion_message: Optional[str] = None
class TaskExecutor:
"""Execute tasks on devices with thread pool."""
def __init__(
self,
device_manager: DeviceManager,
ws_manager: Optional[WebSocketManager] = None,
max_workers: int = 10,
):
"""Initialize the task executor.
Args:
device_manager: Device manager instance
ws_manager: WebSocket manager for real-time updates (optional)
max_workers: Maximum number of concurrent tasks
"""
self.device_manager = device_manager
self.ws_manager = ws_manager
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.active_tasks: Dict[str, ActiveTask] = {}
self.task_history: Dict[str, TaskSchema] = {}
self._lock = threading.Lock()
def set_ws_manager(self, ws_manager: WebSocketManager):
"""Set the WebSocket manager.
Args:
ws_manager: WebSocket manager instance
"""
self.ws_manager = ws_manager
async def execute_task(self, request: TaskCreateRequest) -> str:
"""Execute task on device.
Args:
request: Task creation request
Returns:
Task ID
"""
task_id = f"task_{uuid.uuid4().hex[:8]}"
# Create active task
active_task = ActiveTask(
task_id=task_id,
device_id=request.device_id,
task=request.task,
status=TaskStatus.RUNNING,
current_step=0,
max_steps=request.max_steps,
started_at=datetime.now(),
updated_at=datetime.now(),
)
with self._lock:
self.active_tasks[task_id] = active_task
# Notify WebSocket
if self.ws_manager:
await self.ws_manager.broadcast_task_started(task_id, request.dict())
# Submit to thread pool
future = self.executor.submit(
self._run_task,
task_id,
request,
)
active_task.future = future
return task_id
def _run_task(self, task_id: str, request: TaskCreateRequest):
"""Run task in thread pool.
Args:
task_id: Task ID
request: Task creation request
"""
import os
# Get model config from request
model_config = ModelConfig(
base_url=request.base_url,
model_name=request.model_name,
api_key=request.api_key,
max_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
frequency_penalty=request.frequency_penalty,
lang=request.lang,
)
# Get agent config
agent_config = AgentConfig(
max_steps=request.max_steps,
device_id=request.device_id,
lang=request.lang,
step_callback=lambda result: self._step_callback(task_id, result),
before_action_callback=lambda action: self._before_action_callback(
task_id, action
),
)
# Create agent
agent = PhoneAgent(model_config=model_config, agent_config=agent_config)
try:
# Acquire device
if not self.device_manager.acquire_device(request.device_id):
raise Exception(f"Device {request.device_id} is not available")
try:
# Run task
result = agent.run(request.task)
# Update task status
with self._lock:
if task_id in self.active_tasks:
task = self.active_tasks[task_id]
task.status = (
TaskStatus.COMPLETED if result else TaskStatus.FAILED
)
task.finished_at = datetime.now()
task.updated_at = datetime.now()
finally:
# Release device
self.device_manager.release_device(request.device_id)
# Broadcast device status update
if self.ws_manager:
device = self.device_manager._devices.get(request.device_id)
if device:
_run_async(
self.ws_manager.broadcast_device_update(
request.device_id,
{
"status": device.status.value,
"is_connected": device.is_connected,
"current_app": device.current_app,
},
)
)
# Notify completion
if self.ws_manager:
with self._lock:
task = self.active_tasks.get(task_id)
task_status = task.status if task else TaskStatus.COMPLETED
message = result.message if hasattr(result, "message") else None
_run_async(
self.ws_manager.broadcast_task_completed(
task_id,
request.device_id,
task_status,
message,
)
)
except Exception as e:
# Release device on error
self.device_manager.release_device(request.device_id)
# Broadcast device status update
if self.ws_manager:
device = self.device_manager._devices.get(request.device_id)
if device:
_run_async(
self.ws_manager.broadcast_device_update(
request.device_id,
{
"status": device.status.value,
"is_connected": device.is_connected,
},
)
)
# Update task status
with self._lock:
if task_id in self.active_tasks:
task = self.active_tasks[task_id]
task.status = TaskStatus.FAILED
task.error = str(e)
task.finished_at = datetime.now()
task.updated_at = datetime.now()
# Notify error
if self.ws_manager:
_run_async(
self.ws_manager.broadcast_task_failed(
task_id, request.device_id, str(e)
)
)
finally:
# Move to history
with self._lock:
if task_id in self.active_tasks:
active_task = self.active_tasks.pop(task_id)
task_schema = self._active_task_to_schema(active_task)
self.task_history[task_id] = task_schema
# Trim history
if len(self.task_history) > config.MAX_TASK_HISTORY:
oldest = min(self.task_history.items(), key=lambda x: x[1].started_at)
del self.task_history[oldest[0]]
def _step_callback(
self, task_id: str, result: StepResult
) -> Optional[str]:
"""Callback after each step.
Args:
task_id: Task ID
result: Step result
Returns:
"stop" to interrupt, new task to switch, None to continue
"""
with self._lock:
if task_id not in self.active_tasks:
return None
task = self.active_tasks[task_id]
# Check if stop was requested
if task.stop_requested:
return "stop"
# Update task state
task.current_step = result.step_count
task.updated_at = datetime.now()
task.thinking = result.thinking
task.current_action = result.action
# Store completion message when finished
if result.finished and result.message:
task.completion_message = result.message
# Notify WebSocket
if self.ws_manager:
_run_async(
self.ws_manager.broadcast_step_update(
task_id=task_id,
device_id=task.device_id,
step=result.step_count,
action=result.action,
thinking=result.thinking,
finished=result.finished,
success=result.success,
message=result.message,
)
)
return None
def _before_action_callback(self, task_id: str, action: dict) -> Optional[dict]:
"""Callback before executing action.
Args:
task_id: Task ID
action: Action to execute
Returns:
Modified action or None to proceed as-is
"""
# Can be used for action validation/logging
return None
async def stop_task(self, task_id: str):
"""Stop running task.
Args:
task_id: Task ID
"""
with self._lock:
if task_id not in self.active_tasks:
return
task = self.active_tasks[task_id]
task.stop_requested = True
async def get_task_status(self, task_id: str) -> Optional[TaskSchema]:
"""Get task status.
Args:
task_id: Task ID
Returns:
Task schema or None if not found
"""
with self._lock:
if task_id in self.active_tasks:
return self._active_task_to_schema(self.active_tasks[task_id])
elif task_id in self.task_history:
return self.task_history[task_id]
return None
async def list_tasks(self) -> list[TaskSchema]:
"""List all tasks (active and recent).
Returns:
List of task schemas
"""
with self._lock:
active_schemas = [
self._active_task_to_schema(t) for t in self.active_tasks.values()
]
history_schemas = list(self.task_history.values())
return active_schemas + history_schemas
def _active_task_to_schema(self, active_task: ActiveTask) -> TaskSchema:
"""Convert ActiveTask to TaskSchema.
Args:
active_task: Active task
Returns:
Task schema
"""
return TaskSchema(
task_id=active_task.task_id,
device_id=active_task.device_id,
task=active_task.task,
status=active_task.status,
current_step=active_task.current_step,
max_steps=active_task.max_steps,
current_action=active_task.current_action,
thinking=active_task.thinking,
started_at=active_task.started_at,
updated_at=active_task.updated_at,
finished_at=active_task.finished_at,
error=active_task.error,
completion_message=active_task.completion_message,
)
def get_active_task_count(self) -> int:
"""Get number of active tasks.
Returns:
Active task count
"""
with self._lock:
return len(self.active_tasks)