feat: Added iOS support

This commit is contained in:
xhguo
2025-12-12 17:58:20 +08:00
parent b2e985a790
commit 7c23ca549b
16 changed files with 2884 additions and 3 deletions

View File

@@ -0,0 +1,47 @@
"""XCTest utilities for iOS device interaction via WebDriverAgent/XCUITest."""
from phone_agent.xctest.connection import (
ConnectionType,
DeviceInfo,
XCTestConnection,
list_devices,
quick_connect,
)
from phone_agent.xctest.device import (
back,
double_tap,
get_current_app,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.xctest.input import (
clear_text,
type_text,
)
from phone_agent.xctest.screenshot import get_screenshot
__all__ = [
# Screenshot
"get_screenshot",
# Input
"type_text",
"clear_text",
# Device control
"get_current_app",
"tap",
"swipe",
"back",
"home",
"double_tap",
"long_press",
"launch_app",
# Connection management
"XCTestConnection",
"DeviceInfo",
"ConnectionType",
"quick_connect",
"list_devices",
]

View File

@@ -0,0 +1,382 @@
"""iOS device connection management via idevice tools and WebDriverAgent."""
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
class ConnectionType(Enum):
"""Type of iOS connection."""
USB = "usb"
NETWORK = "network"
@dataclass
class DeviceInfo:
"""Information about a connected iOS device."""
device_id: str # UDID
status: str
connection_type: ConnectionType
model: str | None = None
ios_version: str | None = None
device_name: str | None = None
class XCTestConnection:
"""
Manages connections to iOS devices via libimobiledevice and WebDriverAgent.
Requires:
- libimobiledevice (idevice_id, ideviceinfo)
- WebDriverAgent running on the iOS device
- ios-deploy (optional, for app installation)
Example:
>>> conn = XCTestConnection()
>>> # List connected devices
>>> devices = conn.list_devices()
>>> # Get device info
>>> info = conn.get_device_info()
>>> # Check if WDA is running
>>> is_ready = conn.is_wda_ready()
"""
def __init__(self, wda_url: str = "http://localhost:8100"):
"""
Initialize iOS connection manager.
Args:
wda_url: WebDriverAgent URL (default: http://localhost:8100).
For network devices, use http://<device-ip>:8100
"""
self.wda_url = wda_url.rstrip("/")
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected iOS devices.
Returns:
List of DeviceInfo objects.
Note:
Requires libimobiledevice to be installed.
Install on macOS: brew install libimobiledevice
"""
try:
# Get list of device UDIDs
result = subprocess.run(
["idevice_id", "-ln"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n"):
udid = line.strip()
if not udid:
continue
# Determine connection type (network devices have specific format)
conn_type = (
ConnectionType.NETWORK
if "-" in udid and len(udid) > 40
else ConnectionType.USB
)
# Get detailed device info
device_info = self._get_device_details(udid)
devices.append(
DeviceInfo(
device_id=udid,
status="connected",
connection_type=conn_type,
model=device_info.get("model"),
ios_version=device_info.get("ios_version"),
device_name=device_info.get("name"),
)
)
return devices
except FileNotFoundError:
print(
"Error: idevice_id not found. Install libimobiledevice: brew install libimobiledevice"
)
return []
except Exception as e:
print(f"Error listing devices: {e}")
return []
def _get_device_details(self, udid: str) -> dict[str, str]:
"""
Get detailed information about a specific device.
Args:
udid: Device UDID.
Returns:
Dictionary with device details.
"""
try:
result = subprocess.run(
["ideviceinfo", "-u", udid],
capture_output=True,
text=True,
timeout=5,
)
info = {}
for line in result.stdout.split("\n"):
if ": " in line:
key, value = line.split(": ", 1)
key = key.strip()
value = value.strip()
if key == "ProductType":
info["model"] = value
elif key == "ProductVersion":
info["ios_version"] = value
elif key == "DeviceName":
info["name"] = value
return info
except Exception:
return {}
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device UDID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return len(devices) > 0
return any(d.device_id == device_id for d in devices)
def is_wda_ready(self, timeout: int = 2) -> bool:
"""
Check if WebDriverAgent is running and accessible.
Args:
timeout: Request timeout in seconds.
Returns:
True if WDA is ready, False otherwise.
"""
try:
import requests
response = requests.get(
f"{self.wda_url}/status", timeout=timeout, verify=False
)
return response.status_code == 200
except ImportError:
print(
"Error: requests library not found. Install it: pip install requests"
)
return False
except Exception:
return False
def start_wda_session(self) -> tuple[bool, str]:
"""
Start a new WebDriverAgent session.
Returns:
Tuple of (success, session_id or error_message).
"""
try:
import requests
response = requests.post(
f"{self.wda_url}/session",
json={"capabilities": {}},
timeout=30,
verify=False,
)
if response.status_code in (200, 201):
data = response.json()
session_id = data.get("sessionId") or data.get("value", {}).get(
"sessionId"
)
return True, session_id or "session_started"
else:
return False, f"Failed to start session: {response.text}"
except ImportError:
return (
False,
"requests library not found. Install it: pip install requests",
)
except Exception as e:
return False, f"Error starting WDA session: {e}"
def get_wda_status(self) -> dict | None:
"""
Get WebDriverAgent status information.
Returns:
Status dictionary or None if not available.
"""
try:
import requests
response = requests.get(f"{self.wda_url}/status", timeout=5, verify=False)
if response.status_code == 200:
return response.json()
return None
except Exception:
return None
def pair_device(self, device_id: str | None = None) -> tuple[bool, str]:
"""
Pair with an iOS device (required for some operations).
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
Tuple of (success, message).
"""
try:
cmd = ["idevicepair"]
if device_id:
cmd.extend(["-u", device_id])
cmd.append("pair")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
output = result.stdout + result.stderr
if "SUCCESS" in output or "already paired" in output.lower():
return True, "Device paired successfully"
else:
return False, output.strip()
except FileNotFoundError:
return (
False,
"idevicepair not found. Install libimobiledevice: brew install libimobiledevice",
)
except Exception as e:
return False, f"Error pairing device: {e}"
def get_device_name(self, device_id: str | None = None) -> str | None:
"""
Get the device name.
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
Device name string or None if not found.
"""
try:
cmd = ["ideviceinfo"]
if device_id:
cmd.extend(["-u", device_id])
cmd.extend(["-k", "DeviceName"])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
return result.stdout.strip() or None
except Exception as e:
print(f"Error getting device name: {e}")
return None
def restart_wda(self) -> tuple[bool, str]:
"""
Restart WebDriverAgent (requires manual restart on device).
Returns:
Tuple of (success, message).
Note:
This method only checks if WDA needs restart.
Actual restart requires re-running WDA on the device via Xcode or other means.
"""
if self.is_wda_ready():
return True, "WDA is already running"
else:
return (
False,
"WDA is not running. Please start it manually on the device.",
)
def quick_connect(wda_url: str = "http://localhost:8100") -> tuple[bool, str]:
"""
Quick helper to check iOS device connection and WDA status.
Args:
wda_url: WebDriverAgent URL.
Returns:
Tuple of (success, message).
"""
conn = XCTestConnection(wda_url=wda_url)
# Check if device is connected
if not conn.is_connected():
return False, "No iOS device connected"
# Check if WDA is ready
if not conn.is_wda_ready():
return False, "WebDriverAgent is not running"
return True, "iOS device connected and WDA ready"
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected iOS devices.
Returns:
List of DeviceInfo objects.
"""
conn = XCTestConnection()
return conn.list_devices()

View File

@@ -0,0 +1,458 @@
"""Device control utilities for iOS automation via WebDriverAgent."""
import subprocess
import time
from typing import Optional
from phone_agent.config.apps_ios import APP_PACKAGES_IOS as APP_PACKAGES
SCALE_FACTOR = 3 # 3 for most modern iPhone
def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str:
"""
Get the correct WDA URL for a session endpoint.
Args:
wda_url: Base WDA URL.
session_id: Optional session ID.
endpoint: The endpoint path.
Returns:
Full URL for the endpoint.
"""
base = wda_url.rstrip("/")
if session_id:
return f"{base}/session/{session_id}/{endpoint}"
else:
# Try to use WDA endpoints without session when possible
return f"{base}/{endpoint}"
def get_current_app(
wda_url: str = "http://localhost:8100", session_id: str | None = None
) -> str:
"""
Get the currently active app bundle ID and name.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
The app name if recognized, otherwise "System Home".
"""
try:
import requests
# Get active app info from WDA using activeAppInfo endpoint
response = requests.get(
f"{wda_url.rstrip('/')}/wda/activeAppInfo", timeout=5, verify=False
)
if response.status_code == 200:
data = response.json()
# Extract bundle ID from response
# Response format: {"value": {"bundleId": "com.apple.AppStore", "name": "", "pid": 825, "processArguments": {...}}, "sessionId": "..."}
value = data.get("value", {})
bundle_id = value.get("bundleId", "")
if bundle_id:
# Try to find app name from bundle ID
for app_name, package in APP_PACKAGES.items():
if package == bundle_id:
return app_name
return "System Home"
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting current app: {e}")
return "System Home"
def tap(
x: int,
y: int,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Tap at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after tap.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for tap/click
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 0.1},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=15, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error tapping: {e}")
def double_tap(
x: int,
y: int,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Double tap at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after double tap.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for double tap
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerUp", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error double tapping: {e}")
def long_press(
x: int,
y: int,
duration: float = 3.0,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Long press at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
duration: Duration of press in seconds.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after long press.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for long press
# Convert duration to milliseconds
duration_ms = int(duration * 1000)
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": duration_ms},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=int(duration + 10), verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error long pressing: {e}")
def swipe(
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration: float | None = None,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Swipe from start to end coordinates using WDA dragfromtoforduration endpoint.
Args:
start_x: Starting X coordinate.
start_y: Starting Y coordinate.
end_x: Ending X coordinate.
end_y: Ending Y coordinate.
duration: Duration of swipe in seconds (auto-calculated if None).
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after swipe.
"""
try:
import requests
if duration is None:
# Calculate duration based on distance
dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2
duration = dist_sq / 1000000 # Convert to seconds
duration = max(0.3, min(duration, 2.0)) # Clamp between 0.3-2 seconds
url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration")
# WDA dragfromtoforduration API payload
payload = {
"fromX": start_x / SCALE_FACTOR,
"fromY": start_y / SCALE_FACTOR,
"toX": end_x / SCALE_FACTOR,
"toY": end_y / SCALE_FACTOR,
"duration": duration,
}
requests.post(url, json=payload, timeout=int(duration + 10), verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error swiping: {e}")
def back(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Navigate back (swipe from left edge).
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after navigation.
Note:
iOS doesn't have a universal back button. This simulates a back gesture
by swiping from the left edge of the screen.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration")
# Swipe from left edge to simulate back gesture
payload = {
"fromX": 0,
"fromY": 640,
"toX": 400,
"toY": 640,
"duration": 0.3,
}
requests.post(url, json=payload, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error performing back gesture: {e}")
def home(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Press the home button.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing home.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/homescreen"
requests.post(url, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error pressing home: {e}")
def launch_app(
app_name: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> bool:
"""
Launch an app by name.
Args:
app_name: The app name (must be in APP_PACKAGES).
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after launching.
Returns:
True if app was launched, False if app not found.
"""
if app_name not in APP_PACKAGES:
return False
try:
import requests
bundle_id = APP_PACKAGES[app_name]
url = _get_wda_session_url(wda_url, session_id, "wda/apps/launch")
response = requests.post(
url, json={"bundleId": bundle_id}, timeout=10, verify=False
)
time.sleep(delay)
return response.status_code in (200, 201)
except ImportError:
print("Error: requests library required. Install: pip install requests")
return False
except Exception as e:
print(f"Error launching app: {e}")
return False
def get_screen_size(
wda_url: str = "http://localhost:8100", session_id: str | None = None
) -> tuple[int, int]:
"""
Get the screen dimensions.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
Tuple of (width, height). Returns (375, 812) as default if unable to fetch.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "window/size")
response = requests.get(url, timeout=5, verify=False)
if response.status_code == 200:
data = response.json()
value = data.get("value", {})
width = value.get("width", 375)
height = value.get("height", 812)
return width, height
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting screen size: {e}")
# Default iPhone screen size (iPhone X and later)
return 375, 812
def press_button(
button_name: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Press a physical button.
Args:
button_name: Button name (e.g., "home", "volumeUp", "volumeDown").
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/pressButton"
requests.post(url, json={"name": button_name}, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error pressing button: {e}")

299
phone_agent/xctest/input.py Normal file
View File

@@ -0,0 +1,299 @@
"""Input utilities for iOS device text input via WebDriverAgent."""
import time
def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str:
"""
Get the correct WDA URL for a session endpoint.
Args:
wda_url: Base WDA URL.
session_id: Optional session ID.
endpoint: The endpoint path.
Returns:
Full URL for the endpoint.
"""
base = wda_url.rstrip("/")
if session_id:
return f"{base}/session/{session_id}/{endpoint}"
else:
# Try to use WDA endpoints without session when possible
return f"{base}/{endpoint}"
def type_text(
text: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
frequency: int = 60,
) -> None:
"""
Type text into the currently focused input field.
Args:
text: The text to type.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
frequency: Typing frequency (keys per minute). Default is 60.
Note:
The input field must be focused before calling this function.
Use tap() to focus on the input field first.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
# Send text to WDA
response = requests.post(
url, json={"value": list(text), "frequency": frequency}, timeout=30, verify=False
)
if response.status_code not in (200, 201):
print(f"Warning: Text input may have failed. Status: {response.status_code}")
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error typing text: {e}")
def clear_text(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Clear text in the currently focused input field.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Note:
This sends a clear command to the active element.
The input field must be focused before calling this function.
"""
try:
import requests
# First, try to get the active element
url = _get_wda_session_url(wda_url, session_id, "element/active")
response = requests.get(url, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
element_id = data.get("value", {}).get("ELEMENT") or data.get("value", {}).get("element-6066-11e4-a52e-4f735466cecf")
if element_id:
# Clear the element
clear_url = _get_wda_session_url(wda_url, session_id, f"element/{element_id}/clear")
requests.post(clear_url, timeout=10, verify=False)
return
# Fallback: send backspace commands
_clear_with_backspace(wda_url, session_id)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error clearing text: {e}")
def _clear_with_backspace(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
max_backspaces: int = 100,
) -> None:
"""
Clear text by sending backspace keys.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
max_backspaces: Maximum number of backspaces to send.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
# Send backspace character multiple times
backspace_char = "\u0008" # Backspace Unicode character
requests.post(
url,
json={"value": [backspace_char] * max_backspaces},
timeout=10,
verify=False,
)
except Exception as e:
print(f"Error clearing with backspace: {e}")
def send_keys(
keys: list[str],
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Send a sequence of keys.
Args:
keys: List of keys to send.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Example:
>>> send_keys(["H", "e", "l", "l", "o"])
>>> send_keys(["\n"]) # Send enter key
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
requests.post(url, json={"value": keys}, timeout=10, verify=False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error sending keys: {e}")
def press_enter(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 0.5,
) -> None:
"""
Press the Enter/Return key.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing enter.
"""
send_keys(["\n"], wda_url, session_id)
time.sleep(delay)
def hide_keyboard(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Hide the on-screen keyboard.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/keyboard/dismiss"
requests.post(url, timeout=10, verify=False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error hiding keyboard: {e}")
def is_keyboard_shown(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> bool:
"""
Check if the on-screen keyboard is currently shown.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
True if keyboard is shown, False otherwise.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keyboard/shown")
response = requests.get(url, timeout=5, verify=False)
if response.status_code == 200:
data = response.json()
return data.get("value", False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception:
pass
return False
def set_pasteboard(
text: str,
wda_url: str = "http://localhost:8100",
) -> None:
"""
Set the device pasteboard (clipboard) content.
Args:
text: Text to set in pasteboard.
wda_url: WebDriverAgent URL.
Note:
This can be useful for inputting large amounts of text.
After setting pasteboard, you can simulate paste gesture.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/setPasteboard"
requests.post(
url, json={"content": text, "contentType": "plaintext"}, timeout=10, verify=False
)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error setting pasteboard: {e}")
def get_pasteboard(
wda_url: str = "http://localhost:8100",
) -> str | None:
"""
Get the device pasteboard (clipboard) content.
Args:
wda_url: WebDriverAgent URL.
Returns:
Pasteboard content or None if failed.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/getPasteboard"
response = requests.post(url, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
return data.get("value")
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting pasteboard: {e}")
return None

View File

@@ -0,0 +1,230 @@
"""Screenshot utilities for capturing iOS device screen."""
import base64
import os
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from io import BytesIO
from PIL import Image
@dataclass
class Screenshot:
"""Represents a captured screenshot."""
base64_data: str
width: int
height: int
is_sensitive: bool = False
def get_screenshot(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
device_id: str | None = None,
timeout: int = 10,
) -> Screenshot:
"""
Capture a screenshot from the connected iOS device.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
device_id: Optional device UDID (for idevicescreenshot fallback).
timeout: Timeout in seconds for screenshot operations.
Returns:
Screenshot object containing base64 data and dimensions.
Note:
Tries WebDriverAgent first, falls back to idevicescreenshot if available.
If both fail, returns a black fallback image.
"""
# Try WebDriverAgent first (preferred method)
screenshot = _get_screenshot_wda(wda_url, session_id, timeout)
if screenshot:
return screenshot
# Fallback to idevicescreenshot
screenshot = _get_screenshot_idevice(device_id, timeout)
if screenshot:
return screenshot
# Return fallback black image
return _create_fallback_screenshot(is_sensitive=False)
def _get_screenshot_wda(
wda_url: str, session_id: str | None, timeout: int
) -> Screenshot | None:
"""
Capture screenshot using WebDriverAgent.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
timeout: Timeout in seconds.
Returns:
Screenshot object or None if failed.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/screenshot"
response = requests.get(url, timeout=timeout, verify=False)
if response.status_code == 200:
data = response.json()
base64_data = data.get("value", "")
if base64_data:
# Decode to get dimensions
img_data = base64.b64decode(base64_data)
img = Image.open(BytesIO(img_data))
width, height = img.size
return Screenshot(
base64_data=base64_data,
width=width,
height=height,
is_sensitive=False,
)
except ImportError:
print("Note: requests library not installed. Install: pip install requests")
except Exception as e:
print(f"WDA screenshot failed: {e}")
return None
def _get_screenshot_idevice(
device_id: str | None, timeout: int
) -> Screenshot | None:
"""
Capture screenshot using idevicescreenshot (libimobiledevice).
Args:
device_id: Optional device UDID.
timeout: Timeout in seconds.
Returns:
Screenshot object or None if failed.
"""
try:
temp_path = os.path.join(
tempfile.gettempdir(), f"ios_screenshot_{uuid.uuid4()}.png"
)
cmd = ["idevicescreenshot"]
if device_id:
cmd.extend(["-u", device_id])
cmd.append(temp_path)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0 and os.path.exists(temp_path):
# Read and encode image
img = Image.open(temp_path)
width, height = img.size
buffered = BytesIO()
img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Cleanup
os.remove(temp_path)
return Screenshot(
base64_data=base64_data, width=width, height=height, is_sensitive=False
)
except FileNotFoundError:
print(
"Note: idevicescreenshot not found. Install: brew install libimobiledevice"
)
except Exception as e:
print(f"idevicescreenshot failed: {e}")
return None
def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""
Create a black fallback image when screenshot fails.
Args:
is_sensitive: Whether the failure was due to sensitive content.
Returns:
Screenshot object with black image.
"""
# Default iPhone screen size (iPhone 14 Pro)
default_width, default_height = 1179, 2556
black_img = Image.new("RGB", (default_width, default_height), color="black")
buffered = BytesIO()
black_img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return Screenshot(
base64_data=base64_data,
width=default_width,
height=default_height,
is_sensitive=is_sensitive,
)
def save_screenshot(
screenshot: Screenshot,
file_path: str,
) -> bool:
"""
Save a screenshot to a file.
Args:
screenshot: Screenshot object.
file_path: Path to save the screenshot.
Returns:
True if successful, False otherwise.
"""
try:
img_data = base64.b64decode(screenshot.base64_data)
img = Image.open(BytesIO(img_data))
img.save(file_path)
return True
except Exception as e:
print(f"Error saving screenshot: {e}")
return False
def get_screenshot_png(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
device_id: str | None = None,
) -> bytes | None:
"""
Get screenshot as PNG bytes.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
device_id: Optional device UDID.
Returns:
PNG bytes or None if failed.
"""
screenshot = get_screenshot(wda_url, session_id, device_id)
try:
return base64.b64decode(screenshot.base64_data)
except Exception:
return None