Merge pull request #143 from gekowa/ios-support-3

feat: Added iOS support
This commit is contained in:
yongbin-buaa
2025-12-19 18:15:50 +08:00
committed by GitHub
21 changed files with 2924 additions and 4 deletions

View File

@@ -1,11 +1,12 @@
"""
Phone Agent - An AI-powered phone automation framework.
This package provides tools for automating Android phone interactions
This package provides tools for automating Android and iOS phone interactions
using AI models for visual understanding and decision making.
"""
from phone_agent.agent import PhoneAgent
from phone_agent.agent_ios import IOSPhoneAgent
__version__ = "0.1.0"
__all__ = ["PhoneAgent"]
__all__ = ["PhoneAgent", "IOSPhoneAgent"]

View File

@@ -0,0 +1,280 @@
"""Action handler for iOS automation using WebDriverAgent."""
import time
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.xctest import (
back,
double_tap,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.xctest.input import clear_text, hide_keyboard, type_text
@dataclass
class ActionResult:
"""Result of an action execution."""
success: bool
should_finish: bool
message: str | None = None
requires_confirmation: bool = False
class IOSActionHandler:
"""
Handles execution of actions from AI model output for iOS devices.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
confirmation_callback: Optional callback for sensitive action confirmation.
Should return True to proceed, False to cancel.
takeover_callback: Optional callback for takeover requests (login, captcha).
"""
def __init__(
self,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
confirmation_callback: Callable[[str], bool] | None = None,
takeover_callback: Callable[[str], None] | None = None,
):
self.wda_url = wda_url
self.session_id = session_id
self.confirmation_callback = confirmation_callback or self._default_confirmation
self.takeover_callback = takeover_callback or self._default_takeover
def execute(
self, action: dict[str, Any], screen_width: int, screen_height: int
) -> ActionResult:
"""
Execute an action from the AI model.
Args:
action: The action dictionary from the model.
screen_width: Current screen width in pixels.
screen_height: Current screen height in pixels.
Returns:
ActionResult indicating success and whether to finish.
"""
action_type = action.get("_metadata")
if action_type == "finish":
return ActionResult(
success=True, should_finish=True, message=action.get("message")
)
if action_type != "do":
return ActionResult(
success=False,
should_finish=True,
message=f"Unknown action type: {action_type}",
)
action_name = action.get("action")
handler_method = self._get_handler(action_name)
if handler_method is None:
return ActionResult(
success=False,
should_finish=False,
message=f"Unknown action: {action_name}",
)
try:
return handler_method(action, screen_width, screen_height)
except Exception as e:
return ActionResult(
success=False, should_finish=False, message=f"Action failed: {e}"
)
def _get_handler(self, action_name: str) -> Callable | None:
"""Get the handler method for an action."""
handlers = {
"Launch": self._handle_launch,
"Tap": self._handle_tap,
"Type": self._handle_type,
"Type_Name": self._handle_type,
"Swipe": self._handle_swipe,
"Back": self._handle_back,
"Home": self._handle_home,
"Double Tap": self._handle_double_tap,
"Long Press": self._handle_long_press,
"Wait": self._handle_wait,
"Take_over": self._handle_takeover,
"Note": self._handle_note,
"Call_API": self._handle_call_api,
"Interact": self._handle_interact,
}
return handlers.get(action_name)
def _convert_relative_to_absolute(
self, element: list[int], screen_width: int, screen_height: int
) -> tuple[int, int]:
"""Convert relative coordinates (0-1000) to absolute pixels."""
x = int(element[0] / 1000 * screen_width)
y = int(element[1] / 1000 * screen_height)
return x, y
def _handle_launch(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle app launch action."""
app_name = action.get("app")
if not app_name:
return ActionResult(False, False, "No app name specified")
success = launch_app(
app_name, wda_url=self.wda_url, session_id=self.session_id
)
if success:
return ActionResult(True, False)
return ActionResult(False, False, f"App not found: {app_name}")
def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
print(f"Physically tap on ({x}, {y})")
# Check for sensitive operation
if "message" in action:
if not self.confirmation_callback(action["message"]):
return ActionResult(
success=False,
should_finish=True,
message="User cancelled sensitive operation",
)
tap(x, y, wda_url=self.wda_url, session_id=self.session_id)
return ActionResult(True, False)
def _handle_type(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle text input action."""
text = action.get("text", "")
# Clear existing text and type new text
clear_text(wda_url=self.wda_url, session_id=self.session_id)
time.sleep(0.5)
type_text(text, wda_url=self.wda_url, session_id=self.session_id)
time.sleep(0.5)
# Hide keyboard after typing
hide_keyboard(wda_url=self.wda_url, session_id=self.session_id)
time.sleep(0.5)
return ActionResult(True, False)
def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle swipe action."""
start = action.get("start")
end = action.get("end")
if not start or not end:
return ActionResult(False, False, "Missing swipe coordinates")
start_x, start_y = self._convert_relative_to_absolute(start, width, height)
end_x, end_y = self._convert_relative_to_absolute(end, width, height)
print(f"Physically scroll from ({start_x}, {start_y}) to ({end_x}, {end_y})")
swipe(
start_x,
start_y,
end_x,
end_y,
wda_url=self.wda_url,
session_id=self.session_id,
)
return ActionResult(True, False)
def _handle_back(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle back gesture (swipe from left edge)."""
back(wda_url=self.wda_url, session_id=self.session_id)
return ActionResult(True, False)
def _handle_home(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle home button action."""
home(wda_url=self.wda_url, session_id=self.session_id)
return ActionResult(True, False)
def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle double tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
double_tap(x, y, wda_url=self.wda_url, session_id=self.session_id)
return ActionResult(True, False)
def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle long press action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
long_press(
x,
y,
duration=3.0,
wda_url=self.wda_url,
session_id=self.session_id,
)
return ActionResult(True, False)
def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle wait action."""
duration_str = action.get("duration", "1 seconds")
try:
duration = float(duration_str.replace("seconds", "").strip())
except ValueError:
duration = 1.0
time.sleep(duration)
return ActionResult(True, False)
def _handle_takeover(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle takeover request (login, captcha, etc.)."""
message = action.get("message", "User intervention required")
self.takeover_callback(message)
return ActionResult(True, False)
def _handle_note(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle note action (placeholder for content recording)."""
# This action is typically used for recording page content
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_call_api(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle API call action (placeholder for summarization)."""
# This action is typically used for content summarization
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_interact(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle interaction request (user choice needed)."""
# This action signals that user input is needed
return ActionResult(True, False, message="User interaction required")
@staticmethod
def _default_confirmation(message: str) -> bool:
"""Default confirmation callback using console input."""
response = input(f"Sensitive operation: {message}\nConfirm? (Y/N): ")
return response.upper() == "Y"
@staticmethod
def _default_takeover(message: str) -> None:
"""Default takeover callback using console input."""
input(f"{message}\nPress Enter after completing manual operation...")

277
phone_agent/agent_ios.py Normal file
View File

@@ -0,0 +1,277 @@
"""iOS PhoneAgent class for orchestrating iOS phone automation."""
import json
import traceback
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.actions.handler import do, finish, parse_action
from phone_agent.actions.handler_ios import IOSActionHandler
from phone_agent.config import get_messages, get_system_prompt
from phone_agent.model import ModelClient, ModelConfig
from phone_agent.model.client import MessageBuilder
from phone_agent.xctest import XCTestConnection, get_current_app, get_screenshot
@dataclass
class IOSAgentConfig:
"""Configuration for the iOS PhoneAgent."""
max_steps: int = 100
wda_url: str = "http://localhost:8100"
session_id: str | None = None
device_id: str | None = None # iOS device UDID
lang: str = "cn"
system_prompt: str | None = None
verbose: bool = True
def __post_init__(self):
if self.system_prompt is None:
self.system_prompt = get_system_prompt(self.lang)
@dataclass
class StepResult:
"""Result of a single agent step."""
success: bool
finished: bool
action: dict[str, Any] | None
thinking: str
message: str | None = None
class IOSPhoneAgent:
"""
AI-powered agent for automating iOS phone interactions.
The agent uses a vision-language model to understand screen content
and decide on actions to complete user tasks via WebDriverAgent.
Args:
model_config: Configuration for the AI model.
agent_config: Configuration for the iOS agent behavior.
confirmation_callback: Optional callback for sensitive action confirmation.
takeover_callback: Optional callback for takeover requests.
Example:
>>> from phone_agent.agent_ios import IOSPhoneAgent, IOSAgentConfig
>>> from phone_agent.model import ModelConfig
>>>
>>> model_config = ModelConfig(base_url="http://localhost:8000/v1")
>>> agent_config = IOSAgentConfig(wda_url="http://localhost:8100")
>>> agent = IOSPhoneAgent(model_config, agent_config)
>>> agent.run("Open Safari and search for Apple")
"""
def __init__(
self,
model_config: ModelConfig | None = None,
agent_config: IOSAgentConfig | None = None,
confirmation_callback: Callable[[str], bool] | None = None,
takeover_callback: Callable[[str], None] | None = None,
):
self.model_config = model_config or ModelConfig()
self.agent_config = agent_config or IOSAgentConfig()
self.model_client = ModelClient(self.model_config)
# Initialize WDA connection and create session if needed
self.wda_connection = XCTestConnection(wda_url=self.agent_config.wda_url)
# Auto-create session if not provided
if self.agent_config.session_id is None:
success, session_id = self.wda_connection.start_wda_session()
if success and session_id != "session_started":
self.agent_config.session_id = session_id
if self.agent_config.verbose:
print(f"✅ Created WDA session: {session_id}")
elif self.agent_config.verbose:
print(f"⚠️ Using default WDA session (no explicit session ID)")
self.action_handler = IOSActionHandler(
wda_url=self.agent_config.wda_url,
session_id=self.agent_config.session_id,
confirmation_callback=confirmation_callback,
takeover_callback=takeover_callback,
)
self._context: list[dict[str, Any]] = []
self._step_count = 0
def run(self, task: str) -> str:
"""
Run the agent to complete a task.
Args:
task: Natural language description of the task.
Returns:
Final message from the agent.
"""
self._context = []
self._step_count = 0
# First step with user prompt
result = self._execute_step(task, is_first=True)
if result.finished:
return result.message or "Task completed"
# Continue until finished or max steps reached
while self._step_count < self.agent_config.max_steps:
result = self._execute_step(is_first=False)
if result.finished:
return result.message or "Task completed"
return "Max steps reached"
def step(self, task: str | None = None) -> StepResult:
"""
Execute a single step of the agent.
Useful for manual control or debugging.
Args:
task: Task description (only needed for first step).
Returns:
StepResult with step details.
"""
is_first = len(self._context) == 0
if is_first and not task:
raise ValueError("Task is required for the first step")
return self._execute_step(task, is_first)
def reset(self) -> None:
"""Reset the agent state for a new task."""
self._context = []
self._step_count = 0
def _execute_step(
self, user_prompt: str | None = None, is_first: bool = False
) -> StepResult:
"""Execute a single step of the agent loop."""
self._step_count += 1
# Capture current screen state
screenshot = get_screenshot(
wda_url=self.agent_config.wda_url,
session_id=self.agent_config.session_id,
device_id=self.agent_config.device_id,
)
current_app = get_current_app(
wda_url=self.agent_config.wda_url, session_id=self.agent_config.session_id
)
# Build messages
if is_first:
self._context.append(
MessageBuilder.create_system_message(self.agent_config.system_prompt)
)
screen_info = MessageBuilder.build_screen_info(current_app)
text_content = f"{user_prompt}\n\n{screen_info}"
self._context.append(
MessageBuilder.create_user_message(
text=text_content, image_base64=screenshot.base64_data
)
)
else:
screen_info = MessageBuilder.build_screen_info(current_app)
text_content = f"** Screen Info **\n\n{screen_info}"
self._context.append(
MessageBuilder.create_user_message(
text=text_content, image_base64=screenshot.base64_data
)
)
# Get model response
try:
response = self.model_client.request(self._context)
except Exception as e:
if self.agent_config.verbose:
traceback.print_exc()
return StepResult(
success=False,
finished=True,
action=None,
thinking="",
message=f"Model error: {e}",
)
# Parse action from response
try:
action = parse_action(response.action)
except ValueError:
if self.agent_config.verbose:
traceback.print_exc()
action = finish(message=response.action)
if self.agent_config.verbose:
# Print thinking process
msgs = get_messages(self.agent_config.lang)
print("\n" + "=" * 50)
print(f"💭 {msgs['thinking']}:")
print("-" * 50)
print(response.thinking)
print("-" * 50)
print(f"🎯 {msgs['action']}:")
print(json.dumps(action, ensure_ascii=False, indent=2))
print("=" * 50 + "\n")
# Remove image from context to save space
self._context[-1] = MessageBuilder.remove_images_from_message(self._context[-1])
# Execute action
try:
result = self.action_handler.execute(
action, screenshot.width, screenshot.height
)
except Exception as e:
if self.agent_config.verbose:
traceback.print_exc()
result = self.action_handler.execute(
finish(message=str(e)), screenshot.width, screenshot.height
)
# Add assistant response to context
self._context.append(
MessageBuilder.create_assistant_message(
f"<think>{response.thinking}</think><answer>{response.action}</answer>"
)
)
# Check if finished
finished = action.get("_metadata") == "finish" or result.should_finish
if finished and self.agent_config.verbose:
msgs = get_messages(self.agent_config.lang)
print("\n" + "🎉 " + "=" * 48)
print(
f"{msgs['task_completed']}: {result.message or action.get('message', msgs['done'])}"
)
print("=" * 50 + "\n")
return StepResult(
success=result.success,
finished=finished,
action=action,
thinking=response.thinking,
message=result.message or action.get("message"),
)
@property
def context(self) -> list[dict[str, Any]]:
"""Get the current conversation context."""
return self._context.copy()
@property
def step_count(self) -> int:
"""Get the current step count."""
return self._step_count

View File

@@ -1,6 +1,7 @@
"""Configuration module for Phone Agent."""
from phone_agent.config.apps import APP_PACKAGES
from phone_agent.config.apps_ios import APP_PACKAGES_IOS
from phone_agent.config.i18n import get_message, get_messages
from phone_agent.config.prompts_en import SYSTEM_PROMPT as SYSTEM_PROMPT_EN
from phone_agent.config.prompts_zh import SYSTEM_PROMPT as SYSTEM_PROMPT_ZH
@@ -35,6 +36,7 @@ SYSTEM_PROMPT = SYSTEM_PROMPT_ZH
__all__ = [
"APP_PACKAGES",
"APP_PACKAGES_IOS",
"SYSTEM_PROMPT",
"SYSTEM_PROMPT_ZH",
"SYSTEM_PROMPT_EN",

View File

@@ -224,4 +224,4 @@ def list_supported_apps() -> list[str]:
Returns:
List of app names.
"""
return list(APP_PACKAGES.keys())
return list(APP_PACKAGES.keys())

View File

@@ -0,0 +1,339 @@
"""App name to iOS bundle ID mapping for supported applications.
Based on iOS app bundle ID conventions and common iOS applications.
Bundle IDs are in the format: com.company.appName
"""
APP_PACKAGES_IOS: dict[str, str] = {
# Tencent Apps (腾讯系)
"微信": "com.tencent.xin",
"企业微信": "com.tencent.ww",
"微信读书": "com.tencent.weread",
"微信听书": "com.tencent.wehear",
"QQ": "com.tencent.mqq",
"QQ音乐": "com.tencent.QQMusic",
"QQ阅读": "com.tencent.qqreaderiphone",
"QQ邮箱": "com.tencent.qqmail",
"QQ浏览器": "com.tencent.mttlite",
"TIM": "com.tencent.tim",
"微视": "com.tencent.microvision",
"腾讯新闻": "com.tencent.info",
"腾讯视频": "com.tencent.live4iphone",
"腾讯动漫": "com.tencent.ied.app.comic",
"腾讯微云": "com.tencent.weiyun",
"腾讯体育": "com.tencent.sportskbs",
"腾讯文档": "com.tencent.txdocs",
"腾讯翻译君": "com.tencent.qqtranslator",
"腾讯课堂": "com.tencent.edu",
"腾讯地图": "com.tencent.sosomap",
"小鹅拼拼": "com.tencent.dwdcoco",
"全民k歌": "com.tencent.QQKSong",
# Alibaba Apps (阿里系)
"支付宝": "com.alipay.iphoneclient",
"钉钉": "com.laiwang.DingTalk",
"闲鱼": "com.taobao.fleamarket",
"淘宝": "com.taobao.taobao4iphone",
"斗鱼": "tv.douyu.live",
"天猫": "com.taobao.tmall",
"口碑": "com.taobao.kbmeishi",
"饿了么": "me.ele.ios.eleme",
"高德地图": "com.autonavi.amap",
"UC浏览器": "com.ucweb.iphone.lowversion",
"一淘": "com.taobao.etaocoupon",
"飞猪": "com.taobao.travel",
"虾米音乐": "com.xiami.spark",
"淘票票": "com.taobao.movie.MoviePhoneClient",
"优酷": "com.youku.YouKu",
"菜鸟裹裹": "com.cainiao.cnwireless",
"土豆视频": "com.tudou.tudouiphone",
# ByteDance Apps (字节系)
"抖音": "com.ss.iphone.ugc.Aweme",
"抖音极速版": "com.ss.iphone.ugc.aweme.lite",
"抖音火山版": "com.ss.iphone.ugc.Live",
"Tiktok": "com.zhiliaoapp.musically",
"飞书": "com.bytedance.ee.lark",
"今日头条": "com.ss.iphone.article.News",
"西瓜视频": "com.ss.iphone.article.Video",
"皮皮虾": "com.bd.iphone.super",
# Meituan Apps (美团系)
"美团": "com.meituan.imeituan",
"美团外卖": "com.meituan.itakeaway",
"大众点评": "com.dianping.dpscope",
"美团优选": "com.meituan.iyouxuan",
"美团优选团长": "com.meituan.igrocery.gh",
"美团骑手": "com.meituan.banma.homebrew",
"美团开店宝": "com.meituan.imerchantbiz",
"美团拍店": "com.meituan.pai",
"美团众包": "com.meituan.banma.crowdsource",
"美团买菜": "com.baobaoaichi.imaicai",
# JD Apps (京东系)
"京东": "com.360buy.jdmobile",
"京东读书": "com.jd.reader",
# NetEase Apps (网易系)
"网易新闻": "com.netease.news",
"网易云音乐": "com.netease.cloudmusic",
"网易邮箱大师": "com.netease.macmail",
"网易严选": "com.netease.yanxuan",
"网易公开课": "com.netease.videoHD",
"网易有道词典": "youdaoPro",
"有道云笔记": "com.youdao.note.YoudaoNoteMac",
# Baidu Apps (百度系)
"百度": "com.baidu.BaiduMobile",
"百度网盘": "com.baidu.netdisk",
"百度贴吧": "com.baidu.tieba",
"百度地图": "com.baidu.map",
"百度阅读": "com.baidu.yuedu",
"百度翻译": "com.baidu.translate",
"百度文库": "com.baidu.Wenku",
"百度视频": "com.baidu.videoiphone",
"百度输入法": "com.baidu.inputMethod",
# Kuaishou Apps (快手系)
"快手": "com.jiangjia.gif",
"快手极速版": "com.kuaishou.nebula",
# Other Popular Apps
"哔哩哔哩": "tv.danmaku.bilianime",
"芒果TV": "com.hunantv.imgotv",
"苏宁易购": "SuningEMall",
"微博": "com.sina.weibo",
"微博极速版": "com.sina.weibolite",
"微博国际": "com.weibo.international",
"墨客": "com.moke.moke.iphone",
"豆瓣": "com.douban.frodo",
"知乎": "com.zhihu.ios",
"小红书": "com.xingin.discover",
"喜马拉雅": "com.gemd.iting",
"得到": "com.luojilab.LuoJiFM-IOS",
"得物": "com.siwuai.duapp",
"起点读书": "m.qidian.QDReaderAppStore",
"番茄小说": "com.dragon.read",
"书旗小说": "com.shuqicenter.reader",
"拼多多": "com.xunmeng.pinduoduo",
"多点": "com.dmall.dmall",
"便利蜂": "com.bianlifeng.customer.ios",
"亿通行": "com.ruubypay.yitongxing",
"云闪付": "com.unionpay.chsp",
"大都会Metro": "com.DDH.SHSubway",
"爱奇艺视频": "com.qiyi.iphone",
"搜狐视频": "com.sohu.iPhoneVideo",
"搜狐新闻": "com.sohu.newspaper",
"搜狗浏览器": "com.sogou.SogouExplorerMobile",
"虎牙": "com.yy.kiwi",
"比心": "com.yitan.bixin",
"转转": "com.wuba.zhuanzhuan",
"YY": "yyvoice",
"绿洲": "com.sina.oasis",
"陌陌": "com.wemomo.momoappdemo1",
"什么值得买": "com.smzdm.client.ios",
"美团秀秀": "com.meitu.mtxx",
"唯品会": "com.vipshop.iphone",
"唱吧": "com.changba.ktv",
"酷狗音乐": "com.kugou.kugou1002",
"CSDN": "net.csdn.CsdnPlus",
"多抓鱼": "com.duozhuyu.dejavu",
"自如": "com.ziroom.ZiroomProject",
"携程": "ctrip.com",
"去哪儿旅行": "com.qunar.iphoneclient8",
"Xmind": "net.xmind.brownieapp",
"印象笔记": "com.yinxiang.iPhone",
"欧陆词典": "eusoft.eudic.pro",
"115": "com.115.personal",
"名片全能王": "com.intsig.camcard.lite",
"中国银行": "com.boc.BOCMBCI",
"58同城": "com.taofang.iphone",
# International Apps
"Google Chrome": "com.google.chrome.ios",
"Gmail": "com.google.Gmail",
"Facebook": "com.facebook.Facebook",
"Firefox": "org.mozilla.ios.Firefox",
"Messenger": "com.facebook.Messenger",
"Instagram": "com.burbn.instagram",
"Starbucks": "com.starbucks.mystarbucks",
"Luckin Coffee": "com.bjlc.luckycoffee",
"Line": "jp.naver.line",
"Linkedin": "com.linkedin.LinkedIn",
"Dcard": "com.dcard.app.Dcard",
"Youtube": "com.google.ios.youtube",
"Spotify": "com.spotify.client",
"Netflix": "com.netflix.Netflix",
"Twitter": "com.atebits.Tweetie2",
"WhatsApp": "net.whatsapp.WhatsApp",
# Apple Native Apps (Apple 原生应用)
"Safari": "com.apple.mobilesafari",
"App Store": "com.apple.AppStore",
"设置": "com.apple.Preferences",
"相机": "com.apple.camera",
"照片": "com.apple.mobileslideshow",
"时钟": "com.apple.mobiletimer",
"闹钟": "com.apple.mobiletimer",
"备忘录": "com.apple.mobilenotes",
"提醒事项": "com.apple.reminders",
"快捷指令": "com.apple.shortcuts",
"天气": "com.apple.weather",
"日历": "com.apple.mobilecal",
"地图": "com.apple.Maps",
"电话": "com.apple.mobilephone",
"通讯录": "com.apple.MobileAddressBook",
"信息": "com.apple.MobileSMS",
"Facetime": "com.apple.facetime",
"FaceTime": "com.apple.facetime",
"计算器": "com.apple.calculator",
"家庭": "com.apple.Home",
"健康": "com.apple.Health",
"钱包": "com.apple.Passbook",
"股市": "com.apple.stocks",
"图书": "com.apple.iBooks",
"新闻": "com.apple.news",
"视频": "com.apple.tv",
"文件": "com.apple.DocumentsApp",
"邮件": "com.apple.mobilemail",
"查找": "com.apple.findmy",
"翻译": "com.apple.Translate",
"音乐": "com.apple.Music",
"播客": "com.apple.podcasts",
"库乐队": "com.apple.mobilegarageband",
"语音备忘录": "com.apple.VoiceMemos",
"iMovie": "com.apple.iMovie",
"Watch": "com.apple.Bridge",
"Apple Store": "com.apple.store.Jolly",
"TestFlight": "com.apple.TestFlight",
"Keynote": "com.apple.Keynote",
"Keynote 讲演": "com.apple.Keynote",
}
def get_bundle_id(app_name: str) -> str | None:
"""
Get the iOS bundle ID for an app.
Args:
app_name: The display name of the app.
Returns:
The iOS bundle ID, or None if not found.
"""
return APP_PACKAGES_IOS.get(app_name)
def get_app_name(bundle_id: str) -> str | None:
"""
Get the app name from an iOS bundle ID.
Args:
bundle_id: The iOS bundle ID.
Returns:
The display name of the app, or None if not found.
"""
for name, bid in APP_PACKAGES_IOS.items():
if bid == bundle_id:
return name
return None
def list_supported_apps() -> list[str]:
"""
Get a list of all supported iOS app names.
Returns:
List of app names.
"""
return list(APP_PACKAGES_IOS.keys())
def check_app_installed(app_name: str, wda_url: str = "http://localhost:8100") -> bool:
"""
Check if an app is installed on the iOS device.
Args:
app_name: The display name of the app.
wda_url: WebDriverAgent URL.
Returns:
True if app is installed, False otherwise.
Note:
This uses the iTunes API to get app information. For actual
installation check on device, you would need to use WDA's
app listing capabilities or URL scheme checking.
"""
bundle_id = get_bundle_id(app_name)
if not bundle_id:
return False
try:
import requests
# Query iTunes API for app info
url = f"https://itunes.apple.com/lookup?bundleId={bundle_id}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("resultCount", 0) > 0
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error checking app installation: {e}")
return False
def get_app_info_from_itunes(bundle_id: str) -> dict | None:
"""
Get app information from iTunes API using bundle ID.
Args:
bundle_id: The iOS bundle ID.
Returns:
Dictionary with app info (name, version, etc.) or None if not found.
"""
try:
import requests
url = f"https://itunes.apple.com/lookup?bundleId={bundle_id}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
results = data.get("results", [])
if results:
return results[0]
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error fetching app info: {e}")
return None
def get_app_info_by_id(app_store_id: str) -> dict | None:
"""
Get app information from iTunes API using App Store ID.
Args:
app_store_id: The numeric App Store ID (e.g., "414478124" for WeChat).
Returns:
Dictionary with app info or None if not found.
"""
try:
import requests
url = f"https://itunes.apple.com/lookup?id={app_store_id}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
results = data.get("results", [])
if results:
return results[0]
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error fetching app info by ID: {e}")
return None

View File

@@ -0,0 +1,47 @@
"""XCTest utilities for iOS device interaction via WebDriverAgent/XCUITest."""
from phone_agent.xctest.connection import (
ConnectionType,
DeviceInfo,
XCTestConnection,
list_devices,
quick_connect,
)
from phone_agent.xctest.device import (
back,
double_tap,
get_current_app,
home,
launch_app,
long_press,
swipe,
tap,
)
from phone_agent.xctest.input import (
clear_text,
type_text,
)
from phone_agent.xctest.screenshot import get_screenshot
__all__ = [
# Screenshot
"get_screenshot",
# Input
"type_text",
"clear_text",
# Device control
"get_current_app",
"tap",
"swipe",
"back",
"home",
"double_tap",
"long_press",
"launch_app",
# Connection management
"XCTestConnection",
"DeviceInfo",
"ConnectionType",
"quick_connect",
"list_devices",
]

View File

@@ -0,0 +1,382 @@
"""iOS device connection management via idevice tools and WebDriverAgent."""
import subprocess
import time
from dataclasses import dataclass
from enum import Enum
class ConnectionType(Enum):
"""Type of iOS connection."""
USB = "usb"
NETWORK = "network"
@dataclass
class DeviceInfo:
"""Information about a connected iOS device."""
device_id: str # UDID
status: str
connection_type: ConnectionType
model: str | None = None
ios_version: str | None = None
device_name: str | None = None
class XCTestConnection:
"""
Manages connections to iOS devices via libimobiledevice and WebDriverAgent.
Requires:
- libimobiledevice (idevice_id, ideviceinfo)
- WebDriverAgent running on the iOS device
- ios-deploy (optional, for app installation)
Example:
>>> conn = XCTestConnection()
>>> # List connected devices
>>> devices = conn.list_devices()
>>> # Get device info
>>> info = conn.get_device_info()
>>> # Check if WDA is running
>>> is_ready = conn.is_wda_ready()
"""
def __init__(self, wda_url: str = "http://localhost:8100"):
"""
Initialize iOS connection manager.
Args:
wda_url: WebDriverAgent URL (default: http://localhost:8100).
For network devices, use http://<device-ip>:8100
"""
self.wda_url = wda_url.rstrip("/")
def list_devices(self) -> list[DeviceInfo]:
"""
List all connected iOS devices.
Returns:
List of DeviceInfo objects.
Note:
Requires libimobiledevice to be installed.
Install on macOS: brew install libimobiledevice
"""
try:
# Get list of device UDIDs
result = subprocess.run(
["idevice_id", "-ln"],
capture_output=True,
text=True,
timeout=5,
)
devices = []
for line in result.stdout.strip().split("\n"):
udid = line.strip()
if not udid:
continue
# Determine connection type (network devices have specific format)
conn_type = (
ConnectionType.NETWORK
if "-" in udid and len(udid) > 40
else ConnectionType.USB
)
# Get detailed device info
device_info = self._get_device_details(udid)
devices.append(
DeviceInfo(
device_id=udid,
status="connected",
connection_type=conn_type,
model=device_info.get("model"),
ios_version=device_info.get("ios_version"),
device_name=device_info.get("name"),
)
)
return devices
except FileNotFoundError:
print(
"Error: idevice_id not found. Install libimobiledevice: brew install libimobiledevice"
)
return []
except Exception as e:
print(f"Error listing devices: {e}")
return []
def _get_device_details(self, udid: str) -> dict[str, str]:
"""
Get detailed information about a specific device.
Args:
udid: Device UDID.
Returns:
Dictionary with device details.
"""
try:
result = subprocess.run(
["ideviceinfo", "-u", udid],
capture_output=True,
text=True,
timeout=5,
)
info = {}
for line in result.stdout.split("\n"):
if ": " in line:
key, value = line.split(": ", 1)
key = key.strip()
value = value.strip()
if key == "ProductType":
info["model"] = value
elif key == "ProductVersion":
info["ios_version"] = value
elif key == "DeviceName":
info["name"] = value
return info
except Exception:
return {}
def get_device_info(self, device_id: str | None = None) -> DeviceInfo | None:
"""
Get detailed information about a device.
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
DeviceInfo or None if not found.
"""
devices = self.list_devices()
if not devices:
return None
if device_id is None:
return devices[0]
for device in devices:
if device.device_id == device_id:
return device
return None
def is_connected(self, device_id: str | None = None) -> bool:
"""
Check if a device is connected.
Args:
device_id: Device UDID to check. If None, checks if any device is connected.
Returns:
True if connected, False otherwise.
"""
devices = self.list_devices()
if not devices:
return False
if device_id is None:
return len(devices) > 0
return any(d.device_id == device_id for d in devices)
def is_wda_ready(self, timeout: int = 2) -> bool:
"""
Check if WebDriverAgent is running and accessible.
Args:
timeout: Request timeout in seconds.
Returns:
True if WDA is ready, False otherwise.
"""
try:
import requests
response = requests.get(
f"{self.wda_url}/status", timeout=timeout, verify=False
)
return response.status_code == 200
except ImportError:
print(
"Error: requests library not found. Install it: pip install requests"
)
return False
except Exception:
return False
def start_wda_session(self) -> tuple[bool, str]:
"""
Start a new WebDriverAgent session.
Returns:
Tuple of (success, session_id or error_message).
"""
try:
import requests
response = requests.post(
f"{self.wda_url}/session",
json={"capabilities": {}},
timeout=30,
verify=False,
)
if response.status_code in (200, 201):
data = response.json()
session_id = data.get("sessionId") or data.get("value", {}).get(
"sessionId"
)
return True, session_id or "session_started"
else:
return False, f"Failed to start session: {response.text}"
except ImportError:
return (
False,
"requests library not found. Install it: pip install requests",
)
except Exception as e:
return False, f"Error starting WDA session: {e}"
def get_wda_status(self) -> dict | None:
"""
Get WebDriverAgent status information.
Returns:
Status dictionary or None if not available.
"""
try:
import requests
response = requests.get(f"{self.wda_url}/status", timeout=5, verify=False)
if response.status_code == 200:
return response.json()
return None
except Exception:
return None
def pair_device(self, device_id: str | None = None) -> tuple[bool, str]:
"""
Pair with an iOS device (required for some operations).
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
Tuple of (success, message).
"""
try:
cmd = ["idevicepair"]
if device_id:
cmd.extend(["-u", device_id])
cmd.append("pair")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
output = result.stdout + result.stderr
if "SUCCESS" in output or "already paired" in output.lower():
return True, "Device paired successfully"
else:
return False, output.strip()
except FileNotFoundError:
return (
False,
"idevicepair not found. Install libimobiledevice: brew install libimobiledevice",
)
except Exception as e:
return False, f"Error pairing device: {e}"
def get_device_name(self, device_id: str | None = None) -> str | None:
"""
Get the device name.
Args:
device_id: Device UDID. If None, uses first available device.
Returns:
Device name string or None if not found.
"""
try:
cmd = ["ideviceinfo"]
if device_id:
cmd.extend(["-u", device_id])
cmd.extend(["-k", "DeviceName"])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
return result.stdout.strip() or None
except Exception as e:
print(f"Error getting device name: {e}")
return None
def restart_wda(self) -> tuple[bool, str]:
"""
Restart WebDriverAgent (requires manual restart on device).
Returns:
Tuple of (success, message).
Note:
This method only checks if WDA needs restart.
Actual restart requires re-running WDA on the device via Xcode or other means.
"""
if self.is_wda_ready():
return True, "WDA is already running"
else:
return (
False,
"WDA is not running. Please start it manually on the device.",
)
def quick_connect(wda_url: str = "http://localhost:8100") -> tuple[bool, str]:
"""
Quick helper to check iOS device connection and WDA status.
Args:
wda_url: WebDriverAgent URL.
Returns:
Tuple of (success, message).
"""
conn = XCTestConnection(wda_url=wda_url)
# Check if device is connected
if not conn.is_connected():
return False, "No iOS device connected"
# Check if WDA is ready
if not conn.is_wda_ready():
return False, "WebDriverAgent is not running"
return True, "iOS device connected and WDA ready"
def list_devices() -> list[DeviceInfo]:
"""
Quick helper to list connected iOS devices.
Returns:
List of DeviceInfo objects.
"""
conn = XCTestConnection()
return conn.list_devices()

View File

@@ -0,0 +1,458 @@
"""Device control utilities for iOS automation via WebDriverAgent."""
import subprocess
import time
from typing import Optional
from phone_agent.config.apps_ios import APP_PACKAGES_IOS as APP_PACKAGES
SCALE_FACTOR = 3 # 3 for most modern iPhone
def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str:
"""
Get the correct WDA URL for a session endpoint.
Args:
wda_url: Base WDA URL.
session_id: Optional session ID.
endpoint: The endpoint path.
Returns:
Full URL for the endpoint.
"""
base = wda_url.rstrip("/")
if session_id:
return f"{base}/session/{session_id}/{endpoint}"
else:
# Try to use WDA endpoints without session when possible
return f"{base}/{endpoint}"
def get_current_app(
wda_url: str = "http://localhost:8100", session_id: str | None = None
) -> str:
"""
Get the currently active app bundle ID and name.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
The app name if recognized, otherwise "System Home".
"""
try:
import requests
# Get active app info from WDA using activeAppInfo endpoint
response = requests.get(
f"{wda_url.rstrip('/')}/wda/activeAppInfo", timeout=5, verify=False
)
if response.status_code == 200:
data = response.json()
# Extract bundle ID from response
# Response format: {"value": {"bundleId": "com.apple.AppStore", "name": "", "pid": 825, "processArguments": {...}}, "sessionId": "..."}
value = data.get("value", {})
bundle_id = value.get("bundleId", "")
if bundle_id:
# Try to find app name from bundle ID
for app_name, package in APP_PACKAGES.items():
if package == bundle_id:
return app_name
return "System Home"
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting current app: {e}")
return "System Home"
def tap(
x: int,
y: int,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Tap at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after tap.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for tap/click
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 0.1},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=15, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error tapping: {e}")
def double_tap(
x: int,
y: int,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Double tap at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after double tap.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for double tap
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerUp", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": 100},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error double tapping: {e}")
def long_press(
x: int,
y: int,
duration: float = 3.0,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Long press at the specified coordinates using WebDriver W3C Actions API.
Args:
x: X coordinate.
y: Y coordinate.
duration: Duration of press in seconds.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after long press.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "actions")
# W3C WebDriver Actions API for long press
# Convert duration to milliseconds
duration_ms = int(duration * 1000)
actions = {
"actions": [
{
"type": "pointer",
"id": "finger1",
"parameters": {"pointerType": "touch"},
"actions": [
{"type": "pointerMove", "duration": 0, "x": x / SCALE_FACTOR, "y": y / SCALE_FACTOR},
{"type": "pointerDown", "button": 0},
{"type": "pause", "duration": duration_ms},
{"type": "pointerUp", "button": 0},
],
}
]
}
requests.post(url, json=actions, timeout=int(duration + 10), verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error long pressing: {e}")
def swipe(
start_x: int,
start_y: int,
end_x: int,
end_y: int,
duration: float | None = None,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Swipe from start to end coordinates using WDA dragfromtoforduration endpoint.
Args:
start_x: Starting X coordinate.
start_y: Starting Y coordinate.
end_x: Ending X coordinate.
end_y: Ending Y coordinate.
duration: Duration of swipe in seconds (auto-calculated if None).
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after swipe.
"""
try:
import requests
if duration is None:
# Calculate duration based on distance
dist_sq = (start_x - end_x) ** 2 + (start_y - end_y) ** 2
duration = dist_sq / 1000000 # Convert to seconds
duration = max(0.3, min(duration, 2.0)) # Clamp between 0.3-2 seconds
url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration")
# WDA dragfromtoforduration API payload
payload = {
"fromX": start_x / SCALE_FACTOR,
"fromY": start_y / SCALE_FACTOR,
"toX": end_x / SCALE_FACTOR,
"toY": end_y / SCALE_FACTOR,
"duration": duration,
}
requests.post(url, json=payload, timeout=int(duration + 10), verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error swiping: {e}")
def back(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Navigate back (swipe from left edge).
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after navigation.
Note:
iOS doesn't have a universal back button. This simulates a back gesture
by swiping from the left edge of the screen.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/dragfromtoforduration")
# Swipe from left edge to simulate back gesture
payload = {
"fromX": 0,
"fromY": 640,
"toX": 400,
"toY": 640,
"duration": 0.3,
}
requests.post(url, json=payload, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error performing back gesture: {e}")
def home(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Press the home button.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing home.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/homescreen"
requests.post(url, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error pressing home: {e}")
def launch_app(
app_name: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> bool:
"""
Launch an app by name.
Args:
app_name: The app name (must be in APP_PACKAGES).
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after launching.
Returns:
True if app was launched, False if app not found.
"""
if app_name not in APP_PACKAGES:
return False
try:
import requests
bundle_id = APP_PACKAGES[app_name]
url = _get_wda_session_url(wda_url, session_id, "wda/apps/launch")
response = requests.post(
url, json={"bundleId": bundle_id}, timeout=10, verify=False
)
time.sleep(delay)
return response.status_code in (200, 201)
except ImportError:
print("Error: requests library required. Install: pip install requests")
return False
except Exception as e:
print(f"Error launching app: {e}")
return False
def get_screen_size(
wda_url: str = "http://localhost:8100", session_id: str | None = None
) -> tuple[int, int]:
"""
Get the screen dimensions.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
Tuple of (width, height). Returns (375, 812) as default if unable to fetch.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "window/size")
response = requests.get(url, timeout=5, verify=False)
if response.status_code == 200:
data = response.json()
value = data.get("value", {})
width = value.get("width", 375)
height = value.get("height", 812)
return width, height
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting screen size: {e}")
# Default iPhone screen size (iPhone X and later)
return 375, 812
def press_button(
button_name: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 1.0,
) -> None:
"""
Press a physical button.
Args:
button_name: Button name (e.g., "home", "volumeUp", "volumeDown").
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/pressButton"
requests.post(url, json={"name": button_name}, timeout=10, verify=False)
time.sleep(delay)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error pressing button: {e}")

299
phone_agent/xctest/input.py Normal file
View File

@@ -0,0 +1,299 @@
"""Input utilities for iOS device text input via WebDriverAgent."""
import time
def _get_wda_session_url(wda_url: str, session_id: str | None, endpoint: str) -> str:
"""
Get the correct WDA URL for a session endpoint.
Args:
wda_url: Base WDA URL.
session_id: Optional session ID.
endpoint: The endpoint path.
Returns:
Full URL for the endpoint.
"""
base = wda_url.rstrip("/")
if session_id:
return f"{base}/session/{session_id}/{endpoint}"
else:
# Try to use WDA endpoints without session when possible
return f"{base}/{endpoint}"
def type_text(
text: str,
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
frequency: int = 60,
) -> None:
"""
Type text into the currently focused input field.
Args:
text: The text to type.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
frequency: Typing frequency (keys per minute). Default is 60.
Note:
The input field must be focused before calling this function.
Use tap() to focus on the input field first.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
# Send text to WDA
response = requests.post(
url, json={"value": list(text), "frequency": frequency}, timeout=30, verify=False
)
if response.status_code not in (200, 201):
print(f"Warning: Text input may have failed. Status: {response.status_code}")
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error typing text: {e}")
def clear_text(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Clear text in the currently focused input field.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Note:
This sends a clear command to the active element.
The input field must be focused before calling this function.
"""
try:
import requests
# First, try to get the active element
url = _get_wda_session_url(wda_url, session_id, "element/active")
response = requests.get(url, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
element_id = data.get("value", {}).get("ELEMENT") or data.get("value", {}).get("element-6066-11e4-a52e-4f735466cecf")
if element_id:
# Clear the element
clear_url = _get_wda_session_url(wda_url, session_id, f"element/{element_id}/clear")
requests.post(clear_url, timeout=10, verify=False)
return
# Fallback: send backspace commands
_clear_with_backspace(wda_url, session_id)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error clearing text: {e}")
def _clear_with_backspace(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
max_backspaces: int = 100,
) -> None:
"""
Clear text by sending backspace keys.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
max_backspaces: Maximum number of backspaces to send.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
# Send backspace character multiple times
backspace_char = "\u0008" # Backspace Unicode character
requests.post(
url,
json={"value": [backspace_char] * max_backspaces},
timeout=10,
verify=False,
)
except Exception as e:
print(f"Error clearing with backspace: {e}")
def send_keys(
keys: list[str],
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Send a sequence of keys.
Args:
keys: List of keys to send.
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Example:
>>> send_keys(["H", "e", "l", "l", "o"])
>>> send_keys(["\n"]) # Send enter key
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keys")
requests.post(url, json={"value": keys}, timeout=10, verify=False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error sending keys: {e}")
def press_enter(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
delay: float = 0.5,
) -> None:
"""
Press the Enter/Return key.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
delay: Delay in seconds after pressing enter.
"""
send_keys(["\n"], wda_url, session_id)
time.sleep(delay)
def hide_keyboard(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> None:
"""
Hide the on-screen keyboard.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/keyboard/dismiss"
requests.post(url, timeout=10, verify=False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error hiding keyboard: {e}")
def is_keyboard_shown(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
) -> bool:
"""
Check if the on-screen keyboard is currently shown.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
Returns:
True if keyboard is shown, False otherwise.
"""
try:
import requests
url = _get_wda_session_url(wda_url, session_id, "wda/keyboard/shown")
response = requests.get(url, timeout=5, verify=False)
if response.status_code == 200:
data = response.json()
return data.get("value", False)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception:
pass
return False
def set_pasteboard(
text: str,
wda_url: str = "http://localhost:8100",
) -> None:
"""
Set the device pasteboard (clipboard) content.
Args:
text: Text to set in pasteboard.
wda_url: WebDriverAgent URL.
Note:
This can be useful for inputting large amounts of text.
After setting pasteboard, you can simulate paste gesture.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/setPasteboard"
requests.post(
url, json={"content": text, "contentType": "plaintext"}, timeout=10, verify=False
)
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error setting pasteboard: {e}")
def get_pasteboard(
wda_url: str = "http://localhost:8100",
) -> str | None:
"""
Get the device pasteboard (clipboard) content.
Args:
wda_url: WebDriverAgent URL.
Returns:
Pasteboard content or None if failed.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/wda/getPasteboard"
response = requests.post(url, timeout=10, verify=False)
if response.status_code == 200:
data = response.json()
return data.get("value")
except ImportError:
print("Error: requests library required. Install: pip install requests")
except Exception as e:
print(f"Error getting pasteboard: {e}")
return None

View File

@@ -0,0 +1,230 @@
"""Screenshot utilities for capturing iOS device screen."""
import base64
import os
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from io import BytesIO
from PIL import Image
@dataclass
class Screenshot:
"""Represents a captured screenshot."""
base64_data: str
width: int
height: int
is_sensitive: bool = False
def get_screenshot(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
device_id: str | None = None,
timeout: int = 10,
) -> Screenshot:
"""
Capture a screenshot from the connected iOS device.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
device_id: Optional device UDID (for idevicescreenshot fallback).
timeout: Timeout in seconds for screenshot operations.
Returns:
Screenshot object containing base64 data and dimensions.
Note:
Tries WebDriverAgent first, falls back to idevicescreenshot if available.
If both fail, returns a black fallback image.
"""
# Try WebDriverAgent first (preferred method)
screenshot = _get_screenshot_wda(wda_url, session_id, timeout)
if screenshot:
return screenshot
# Fallback to idevicescreenshot
screenshot = _get_screenshot_idevice(device_id, timeout)
if screenshot:
return screenshot
# Return fallback black image
return _create_fallback_screenshot(is_sensitive=False)
def _get_screenshot_wda(
wda_url: str, session_id: str | None, timeout: int
) -> Screenshot | None:
"""
Capture screenshot using WebDriverAgent.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
timeout: Timeout in seconds.
Returns:
Screenshot object or None if failed.
"""
try:
import requests
url = f"{wda_url.rstrip('/')}/screenshot"
response = requests.get(url, timeout=timeout, verify=False)
if response.status_code == 200:
data = response.json()
base64_data = data.get("value", "")
if base64_data:
# Decode to get dimensions
img_data = base64.b64decode(base64_data)
img = Image.open(BytesIO(img_data))
width, height = img.size
return Screenshot(
base64_data=base64_data,
width=width,
height=height,
is_sensitive=False,
)
except ImportError:
print("Note: requests library not installed. Install: pip install requests")
except Exception as e:
print(f"WDA screenshot failed: {e}")
return None
def _get_screenshot_idevice(
device_id: str | None, timeout: int
) -> Screenshot | None:
"""
Capture screenshot using idevicescreenshot (libimobiledevice).
Args:
device_id: Optional device UDID.
timeout: Timeout in seconds.
Returns:
Screenshot object or None if failed.
"""
try:
temp_path = os.path.join(
tempfile.gettempdir(), f"ios_screenshot_{uuid.uuid4()}.png"
)
cmd = ["idevicescreenshot"]
if device_id:
cmd.extend(["-u", device_id])
cmd.append(temp_path)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
if result.returncode == 0 and os.path.exists(temp_path):
# Read and encode image
img = Image.open(temp_path)
width, height = img.size
buffered = BytesIO()
img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Cleanup
os.remove(temp_path)
return Screenshot(
base64_data=base64_data, width=width, height=height, is_sensitive=False
)
except FileNotFoundError:
print(
"Note: idevicescreenshot not found. Install: brew install libimobiledevice"
)
except Exception as e:
print(f"idevicescreenshot failed: {e}")
return None
def _create_fallback_screenshot(is_sensitive: bool) -> Screenshot:
"""
Create a black fallback image when screenshot fails.
Args:
is_sensitive: Whether the failure was due to sensitive content.
Returns:
Screenshot object with black image.
"""
# Default iPhone screen size (iPhone 14 Pro)
default_width, default_height = 1179, 2556
black_img = Image.new("RGB", (default_width, default_height), color="black")
buffered = BytesIO()
black_img.save(buffered, format="PNG")
base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
return Screenshot(
base64_data=base64_data,
width=default_width,
height=default_height,
is_sensitive=is_sensitive,
)
def save_screenshot(
screenshot: Screenshot,
file_path: str,
) -> bool:
"""
Save a screenshot to a file.
Args:
screenshot: Screenshot object.
file_path: Path to save the screenshot.
Returns:
True if successful, False otherwise.
"""
try:
img_data = base64.b64decode(screenshot.base64_data)
img = Image.open(BytesIO(img_data))
img.save(file_path)
return True
except Exception as e:
print(f"Error saving screenshot: {e}")
return False
def get_screenshot_png(
wda_url: str = "http://localhost:8100",
session_id: str | None = None,
device_id: str | None = None,
) -> bytes | None:
"""
Get screenshot as PNG bytes.
Args:
wda_url: WebDriverAgent URL.
session_id: Optional WDA session ID.
device_id: Optional device UDID.
Returns:
PNG bytes or None if failed.
"""
screenshot = get_screenshot(wda_url, session_id, device_id)
try:
return base64.b64decode(screenshot.base64_data)
except Exception:
return None