""" Device manager for handling device pool and connections. """ import asyncio from datetime import datetime from enum import Enum from threading import Lock from typing import Dict, List, Optional from phone_agent.adb.connection import ADBConnection from phone_agent.adb.screenshot import get_screenshot as adb_screenshot from phone_agent.device_factory import ( DeviceFactory, DeviceType, get_device_factory, set_device_type, ) from phone_agent.hdc.connection import HDCConnection from phone_agent.hdc.screenshot import get_screenshot as hdc_screenshot from dashboard.models.device import DeviceInfo, DeviceStatus, DeviceSchema class DeviceConnectionError(Exception): """Raised when device connection fails.""" pass class DeviceManager: """Manage device pool and connections.""" def __init__(self, device_type: str = "adb"): """Initialize the device manager. Args: device_type: Default device type (adb, hdc, ios) """ self._devices: Dict[str, DeviceInfo] = {} self._device_locks: Dict[str, Lock] = {} self._device_type = self._parse_device_type(device_type) self._screenshot_cache: Dict[str, tuple[str, datetime]] = {} # (base64, timestamp) self._cache_ttl_seconds = 2.0 # Cache screenshots for 2 seconds # Set the global device type set_device_type(self._device_type) def _parse_device_type(self, device_type: str) -> DeviceType: """Parse device type string to enum.""" try: return DeviceType[device_type.upper()] except KeyError: return DeviceType.ADB @property def factory(self) -> DeviceFactory: """Get the device factory instance.""" return get_device_factory() async def refresh_devices(self) -> List[DeviceInfo]: """Scan and return all connected devices. Returns: List of device info objects """ try: # Run device listing in thread pool to avoid blocking loop = asyncio.get_event_loop() devices = await loop.run_in_executor(None, self.factory.list_devices) # Update device cache current_time = datetime.now() for device in devices: device_id = device.device_id # Initialize lock if needed if device_id not in self._device_locks: self._device_locks[device_id] = Lock() # Update or create device info if device_id in self._devices: # Update existing device self._devices[device_id].last_seen = current_time self._devices[device_id].is_connected = True self._devices[device_id].status = ( DeviceStatus.BUSY if not self._is_device_available(device_id) else DeviceStatus.ONLINE ) else: # New device self._devices[device_id] = DeviceInfo( device_id=device_id, status=DeviceStatus.ONLINE, device_type=self._device_type_to_schema(device.connection_type), model=device.model, android_version=device.android_version, current_app=None, last_seen=current_time, is_connected=True, ) # Mark disconnected devices connected_ids = {d.device_id for d in devices} for device_id, device_info in self._devices.items(): if device_id not in connected_ids: device_info.is_connected = False device_info.status = DeviceStatus.OFFLINE return list(self._devices.values()) except Exception as e: raise DeviceConnectionError(f"Failed to refresh devices: {e}") async def get_device(self, device_id: str) -> Optional[DeviceInfo]: """Get device info by ID. Args: device_id: Device identifier Returns: Device info or None if not found """ return self._devices.get(device_id) def acquire_device(self, device_id: str) -> bool: """Acquire lock on device. Args: device_id: Device identifier Returns: True if acquired, False if device is busy """ if device_id not in self._device_locks: return False lock = self._device_locks[device_id] acquired = lock.acquire(blocking=False) if acquired: # Update device status if device_id in self._devices: self._devices[device_id].status = DeviceStatus.BUSY return acquired def release_device(self, device_id: str): """Release device lock. Args: device_id: Device identifier """ if device_id in self._device_locks: self._device_locks[device_id].release() # Update device status if device_id in self._devices: device = self._devices[device_id] if device.is_connected: device.status = DeviceStatus.ONLINE else: device.status = DeviceStatus.OFFLINE def is_device_available(self, device_id: str) -> bool: """Check if device is available for task execution. Args: device_id: Device identifier Returns: True if available, False if busy or offline """ if device_id not in self._devices: return False device = self._devices[device_id] if not device.is_connected: return False if device.status == DeviceStatus.BUSY: return False return True def _is_device_available(self, device_id: str) -> bool: """Internal check if device lock is available.""" if device_id not in self._device_locks: return True lock = self._device_locks[device_id] return not lock.locked() async def get_screenshot(self, device_id: str) -> Optional[str]: """Get screenshot for device. Args: device_id: Device identifier Returns: Base64 encoded screenshot or None """ # Check cache if device_id in self._screenshot_cache: screenshot, timestamp = self._screenshot_cache[device_id] age = (datetime.now() - timestamp).total_seconds() if age < self._cache_ttl_seconds: return screenshot try: loop = asyncio.get_event_loop() if self._device_type == DeviceType.HDC: result = await loop.run_in_executor( None, hdc_screenshot, device_id, 10 ) else: # ADB or default result = await loop.run_in_executor( None, adb_screenshot, device_id, 10 ) if result: # Cache the screenshot self._screenshot_cache[device_id] = (result.base64_data, datetime.now()) return result.base64_data except Exception: pass return None async def get_current_app(self, device_id: str) -> Optional[str]: """Get current app for device. Args: device_id: Device identifier Returns: Current app package name or None """ try: loop = asyncio.get_event_loop() app = await loop.run_in_executor( None, self.factory.get_current_app, device_id ) # Update device info if device_id in self._devices and app: self._devices[device_id].current_app = app return app except Exception: return None async def connect_device(self, address: str) -> bool: """Connect to device via WiFi. Args: address: Device address (IP:PORT) Returns: True if connected successfully """ try: loop = asyncio.get_event_loop() if self._device_type == DeviceType.HDC: conn = HDCConnection() else: conn = ADBConnection() success, _ = await loop.run_in_executor(None, conn.connect, address) if success: # Refresh devices after connecting await self.refresh_devices() return success except Exception: return False async def disconnect_device(self, address: str) -> bool: """Disconnect from device. Args: address: Device address (IP:PORT) Returns: True if disconnected successfully """ try: loop = asyncio.get_event_loop() if self._device_type == DeviceType.HDC: conn = HDCConnection() else: conn = ADBConnection() success, _ = await loop.run_in_executor(None, conn.disconnect, address) if success: # Refresh devices after disconnecting await self.refresh_devices() return success except Exception: return False def _device_type_to_schema(self, connection_type) -> "DeviceSchema": """Convert connection type to DeviceType enum.""" from dashboard.models.device import DeviceType as SchemaDeviceType # Mapping from phone_agent connection types to schema types type_map = { "USB": SchemaDeviceType.ADB, "WIFI": SchemaDeviceType.ADB, "REMOTE": SchemaDeviceType.ADB, } # Try to get string value from enum try: type_str = connection_type.value if hasattr(connection_type, "value") else str(connection_type) return type_map.get(type_str.upper(), SchemaDeviceType.ADB) except (AttributeError, KeyError): return SchemaDeviceType.ADB def list_all_devices(self) -> List[DeviceInfo]: """Get all cached devices. Returns: List of all device info """ return list(self._devices.values())