""" Task executor for running tasks on devices with thread pool. """ import asyncio import threading import uuid from concurrent.futures import ThreadPoolExecutor, Future from copy import deepcopy from dataclasses import dataclass, field from datetime import datetime from typing import Callable, Dict, Optional from phone_agent import AgentConfig, PhoneAgent from phone_agent.agent import StepResult from phone_agent.model import ModelConfig from dashboard.config import config from dashboard.models.task import TaskCreateRequest, TaskSchema, TaskStatus from dashboard.services.device_manager import DeviceManager from dashboard.services.websocket_manager import WebSocketManager def _run_async(coro): """Run async coroutine from sync context and wait for completion.""" try: loop = asyncio.get_running_loop() # We're already in an async context, use run_coroutine_threadsafe import concurrent.futures future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for completion with timeout to avoid hanging try: future.result(timeout=5) except concurrent.futures.TimeoutError: pass except Exception: pass except RuntimeError: # No running loop, use asyncio.run asyncio.run(coro) @dataclass class ActiveTask: """An active task being executed.""" task_id: str device_id: str task: str status: TaskStatus current_step: int max_steps: int started_at: datetime updated_at: datetime finished_at: Optional[datetime] = None future: Optional[Future] = None stop_requested: bool = False current_action: Optional[dict] = None thinking: Optional[str] = None error: Optional[str] = None completion_message: Optional[str] = None class TaskExecutor: """Execute tasks on devices with thread pool.""" def __init__( self, device_manager: DeviceManager, ws_manager: Optional[WebSocketManager] = None, max_workers: int = 10, ): """Initialize the task executor. Args: device_manager: Device manager instance ws_manager: WebSocket manager for real-time updates (optional) max_workers: Maximum number of concurrent tasks """ self.device_manager = device_manager self.ws_manager = ws_manager self.executor = ThreadPoolExecutor(max_workers=max_workers) self.active_tasks: Dict[str, ActiveTask] = {} self.task_history: Dict[str, TaskSchema] = {} self._lock = threading.Lock() def set_ws_manager(self, ws_manager: WebSocketManager): """Set the WebSocket manager. Args: ws_manager: WebSocket manager instance """ self.ws_manager = ws_manager async def execute_task(self, request: TaskCreateRequest) -> str: """Execute task on device. Args: request: Task creation request Returns: Task ID """ task_id = f"task_{uuid.uuid4().hex[:8]}" # Create active task active_task = ActiveTask( task_id=task_id, device_id=request.device_id, task=request.task, status=TaskStatus.RUNNING, current_step=0, max_steps=request.max_steps, started_at=datetime.now(), updated_at=datetime.now(), ) with self._lock: self.active_tasks[task_id] = active_task # Notify WebSocket if self.ws_manager: await self.ws_manager.broadcast_task_started(task_id, request.dict()) # Submit to thread pool future = self.executor.submit( self._run_task, task_id, request, ) active_task.future = future return task_id def _run_task(self, task_id: str, request: TaskCreateRequest): """Run task in thread pool. Args: task_id: Task ID request: Task creation request """ import os # Get model config from request model_config = ModelConfig( base_url=request.base_url, model_name=request.model_name, api_key=request.api_key, max_tokens=request.max_tokens, temperature=request.temperature, top_p=request.top_p, frequency_penalty=request.frequency_penalty, lang=request.lang, ) # Get agent config agent_config = AgentConfig( max_steps=request.max_steps, device_id=request.device_id, lang=request.lang, step_callback=lambda result: self._step_callback(task_id, result), before_action_callback=lambda action: self._before_action_callback( task_id, action ), ) # Create agent agent = PhoneAgent(model_config=model_config, agent_config=agent_config) try: # Acquire device if not self.device_manager.acquire_device(request.device_id): raise Exception(f"Device {request.device_id} is not available") try: # Run task result = agent.run(request.task) # Update task status with self._lock: if task_id in self.active_tasks: task = self.active_tasks[task_id] task.status = ( TaskStatus.COMPLETED if result else TaskStatus.FAILED ) task.finished_at = datetime.now() task.updated_at = datetime.now() finally: # Release device self.device_manager.release_device(request.device_id) # Broadcast device status update if self.ws_manager: device = self.device_manager._devices.get(request.device_id) if device: _run_async( self.ws_manager.broadcast_device_update( request.device_id, { "status": device.status.value, "is_connected": device.is_connected, "current_app": device.current_app, }, ) ) # Notify completion if self.ws_manager: with self._lock: task = self.active_tasks.get(task_id) task_status = task.status if task else TaskStatus.COMPLETED message = result.message if hasattr(result, "message") else None _run_async( self.ws_manager.broadcast_task_completed( task_id, request.device_id, task_status, message, ) ) except Exception as e: # Release device on error self.device_manager.release_device(request.device_id) # Broadcast device status update if self.ws_manager: device = self.device_manager._devices.get(request.device_id) if device: _run_async( self.ws_manager.broadcast_device_update( request.device_id, { "status": device.status.value, "is_connected": device.is_connected, }, ) ) # Update task status with self._lock: if task_id in self.active_tasks: task = self.active_tasks[task_id] task.status = TaskStatus.FAILED task.error = str(e) task.finished_at = datetime.now() task.updated_at = datetime.now() # Notify error if self.ws_manager: _run_async( self.ws_manager.broadcast_task_failed( task_id, request.device_id, str(e) ) ) finally: # Move to history with self._lock: if task_id in self.active_tasks: active_task = self.active_tasks.pop(task_id) task_schema = self._active_task_to_schema(active_task) self.task_history[task_id] = task_schema # Trim history if len(self.task_history) > config.MAX_TASK_HISTORY: oldest = min(self.task_history.items(), key=lambda x: x[1].started_at) del self.task_history[oldest[0]] def _step_callback( self, task_id: str, result: StepResult ) -> Optional[str]: """Callback after each step. Args: task_id: Task ID result: Step result Returns: "stop" to interrupt, new task to switch, None to continue """ with self._lock: if task_id not in self.active_tasks: return None task = self.active_tasks[task_id] # Check if stop was requested if task.stop_requested: return "stop" # Update task state task.current_step = result.step_count task.updated_at = datetime.now() task.thinking = result.thinking task.current_action = result.action # Store completion message when finished if result.finished and result.message: task.completion_message = result.message # Notify WebSocket if self.ws_manager: _run_async( self.ws_manager.broadcast_step_update( task_id=task_id, device_id=task.device_id, step=result.step_count, action=result.action, thinking=result.thinking, finished=result.finished, success=result.success, message=result.message, ) ) return None def _before_action_callback(self, task_id: str, action: dict) -> Optional[dict]: """Callback before executing action. Args: task_id: Task ID action: Action to execute Returns: Modified action or None to proceed as-is """ # Can be used for action validation/logging return None async def stop_task(self, task_id: str): """Stop running task. Args: task_id: Task ID """ with self._lock: if task_id not in self.active_tasks: return task = self.active_tasks[task_id] task.stop_requested = True async def get_task_status(self, task_id: str) -> Optional[TaskSchema]: """Get task status. Args: task_id: Task ID Returns: Task schema or None if not found """ with self._lock: if task_id in self.active_tasks: return self._active_task_to_schema(self.active_tasks[task_id]) elif task_id in self.task_history: return self.task_history[task_id] return None async def list_tasks(self) -> list[TaskSchema]: """List all tasks (active and recent). Returns: List of task schemas """ with self._lock: active_schemas = [ self._active_task_to_schema(t) for t in self.active_tasks.values() ] history_schemas = list(self.task_history.values()) return active_schemas + history_schemas def _active_task_to_schema(self, active_task: ActiveTask) -> TaskSchema: """Convert ActiveTask to TaskSchema. Args: active_task: Active task Returns: Task schema """ return TaskSchema( task_id=active_task.task_id, device_id=active_task.device_id, task=active_task.task, status=active_task.status, current_step=active_task.current_step, max_steps=active_task.max_steps, current_action=active_task.current_action, thinking=active_task.thinking, started_at=active_task.started_at, updated_at=active_task.updated_at, finished_at=active_task.finished_at, error=active_task.error, completion_message=active_task.completion_message, ) def get_active_task_count(self) -> int: """Get number of active tasks. Returns: Active task count """ with self._lock: return len(self.active_tasks)