主要功能: - 纯视觉元素定位 + DOM辅助的混合方案 - 解决 mouse.click() 与 Vue 页面交互问题 - 使用 elementFromPoint + JS click/focus 实现可靠点击 - 智能元素定位: 根据描述生成CSS选择器获取精确坐标 - 区域扫描作为后备定位方案 - 完整的测试报告生成 (HTML+JSON) - 截图记录每个操作步骤 技术改进: - controller.py: 改进 click_at 使用 JavaScript 交互 - executor.py: 添加 _find_element_by_description 智能定位 - planner.py: 增强 prompt 传入视口尺寸 - main.py: 获取实际视口大小传给 planner
This commit is contained in:
5
src/agent/__init__.py
Normal file
5
src/agent/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# Agent module - AI decision making
|
||||
from .planner import TestPlanner
|
||||
from .executor import ActionExecutor
|
||||
|
||||
__all__ = ["TestPlanner", "ActionExecutor"]
|
||||
361
src/agent/executor.py
Normal file
361
src/agent/executor.py
Normal file
@@ -0,0 +1,361 @@
|
||||
"""
|
||||
Action Executor - Executes AI-planned actions on browser
|
||||
"""
|
||||
from typing import Dict, Any, List
|
||||
import json
|
||||
import re
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ActionExecutor:
|
||||
"""Executes actions on browser based on AI decisions"""
|
||||
|
||||
def __init__(self, browser, analyzer):
|
||||
self.browser = browser
|
||||
self.analyzer = analyzer
|
||||
self.action_log: List[Dict[str, Any]] = []
|
||||
|
||||
def execute_action(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Execute a single action"""
|
||||
action_type = action.get("action", "").lower()
|
||||
result = {"action": action, "success": False}
|
||||
|
||||
try:
|
||||
if action_type == "click":
|
||||
self._do_click(action)
|
||||
elif action_type == "type":
|
||||
self._do_type(action)
|
||||
elif action_type == "scroll":
|
||||
self._do_scroll(action)
|
||||
elif action_type == "wait":
|
||||
self._do_wait(action)
|
||||
elif action_type == "verify":
|
||||
self._do_verify(action, result)
|
||||
else:
|
||||
# 未知操作类型,记录警告但不标记失败
|
||||
logger.warning(f"未知操作类型: {action_type}")
|
||||
result["warning"] = f"未知操作类型: {action_type}"
|
||||
|
||||
# 只有已知操作类型才标记成功
|
||||
if action_type in ("click", "type", "scroll", "wait"):
|
||||
result["success"] = True
|
||||
|
||||
# 保存执行后的截图
|
||||
try:
|
||||
result["screenshot"] = self.browser.screenshot_base64()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"操作执行失败: {action_type} - {e}")
|
||||
result["error"] = str(e)
|
||||
result["success"] = False
|
||||
|
||||
self.action_log.append(result)
|
||||
return result
|
||||
|
||||
def _do_click(self, action: Dict[str, Any]) -> None:
|
||||
"""Execute click action with smart element detection"""
|
||||
target = action.get("target", "")
|
||||
|
||||
# 优先尝试通过 AI 描述找到对应的 DOM 元素
|
||||
element_info = self._find_element_by_description(target)
|
||||
if element_info and element_info.get("found"):
|
||||
x, y = element_info["x"], element_info["y"]
|
||||
logger.info(f"通过 DOM 定位: ({x}, {y}) - {target}")
|
||||
self.browser.click_at(x, y)
|
||||
self.browser.wait(300)
|
||||
|
||||
if self._check_input_focused() or "按钮" in target or "button" in target.lower():
|
||||
logger.info(f"点击成功: ({x}, {y})")
|
||||
return
|
||||
|
||||
# 如果 AI 提供了坐标,尝试直接使用(作为后备)
|
||||
if "x" in action and "y" in action:
|
||||
x, y = int(action["x"]), int(action["y"])
|
||||
logger.info(f"尝试 AI 坐标: ({x}, {y}) - {target}")
|
||||
self.browser.click_at(x, y)
|
||||
self.browser.wait(300)
|
||||
|
||||
if self._check_input_focused():
|
||||
return
|
||||
|
||||
# 最后尝试区域扫描
|
||||
logger.warning(f"精确定位失败,尝试区域扫描...")
|
||||
region = self._get_element_region(target)
|
||||
if region:
|
||||
coords = self._scan_region_for_element(region, target)
|
||||
if coords:
|
||||
self.browser.click_at(coords[0], coords[1])
|
||||
self.browser.wait(300)
|
||||
return
|
||||
|
||||
logger.warning(f"无法精确定位: {target}")
|
||||
|
||||
def _find_element_by_description(self, target: str) -> dict:
|
||||
"""根据描述找到 DOM 元素的精确坐标"""
|
||||
# 根据描述生成选择器查询
|
||||
selectors = []
|
||||
|
||||
if "用户名" in target or "username" in target.lower():
|
||||
selectors.extend([
|
||||
"input[placeholder*='用户名']",
|
||||
"input[placeholder*='账号']",
|
||||
"input[type='text']"
|
||||
])
|
||||
elif "密码" in target or "password" in target.lower():
|
||||
selectors.extend([
|
||||
"input[placeholder*='密码']",
|
||||
"input[type='password']"
|
||||
])
|
||||
elif "登录" in target and "按钮" in target:
|
||||
selectors.extend([
|
||||
"button[aria-label='login']",
|
||||
"button[type='submit']",
|
||||
"button:contains('登录')"
|
||||
])
|
||||
elif "输入框" in target:
|
||||
selectors.append("input:visible")
|
||||
elif "按钮" in target:
|
||||
selectors.append("button:visible")
|
||||
|
||||
if not selectors:
|
||||
return None
|
||||
|
||||
# 尝试每个选择器找到元素中心
|
||||
for selector in selectors:
|
||||
try:
|
||||
result = self.browser.page.evaluate(f'''
|
||||
() => {{
|
||||
const el = document.querySelector("{selector}");
|
||||
if (el) {{
|
||||
const r = el.getBoundingClientRect();
|
||||
return {{
|
||||
found: true,
|
||||
x: Math.round(r.left + r.width / 2),
|
||||
y: Math.round(r.top + r.height / 2),
|
||||
tagName: el.tagName
|
||||
}};
|
||||
}}
|
||||
return {{ found: false }};
|
||||
}}
|
||||
''')
|
||||
if result.get("found"):
|
||||
logger.info(f"找到元素 '{selector}': ({result['x']}, {result['y']})")
|
||||
return result
|
||||
except:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def _get_element_region(self, target: str) -> dict:
|
||||
"""让 AI 返回元素所在的区域"""
|
||||
img = self.browser.screenshot_base64()
|
||||
viewport = self.browser.page.viewport_size
|
||||
width = viewport["width"] if viewport else 1920
|
||||
height = viewport["height"] if viewport else 1080
|
||||
|
||||
prompt = f"""在 {width}x{height} 像素的截图中,描述 "{target}" 所在的区域位置。
|
||||
|
||||
返回 JSON:
|
||||
{{
|
||||
"horizontal": "left" 或 "center" 或 "right",
|
||||
"vertical": "top" 或 "middle" 或 "bottom",
|
||||
"x_percent_start": 0-100 (区域左边界百分比),
|
||||
"x_percent_end": 0-100 (区域右边界百分比),
|
||||
"y_percent_start": 0-100 (区域上边界百分比),
|
||||
"y_percent_end": 0-100 (区域下边界百分比)
|
||||
}}
|
||||
|
||||
例如右侧表单区域: {{"horizontal": "right", "vertical": "middle", "x_percent_start": 60, "x_percent_end": 95, "y_percent_start": 30, "y_percent_end": 70}}
|
||||
|
||||
只返回 JSON。"""
|
||||
|
||||
response = self.analyzer.model.analyze(img, prompt)
|
||||
|
||||
try:
|
||||
match = re.search(r'\{[\s\S]*\}', response)
|
||||
if match:
|
||||
region = json.loads(match.group())
|
||||
logger.info(f"AI 返回区域: {region}")
|
||||
return region
|
||||
except:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _scan_region_for_element(self, region: dict, target: str) -> tuple:
|
||||
"""在指定区域内扫描寻找可交互元素"""
|
||||
viewport = self.browser.page.viewport_size
|
||||
width = viewport["width"] if viewport else 1920
|
||||
height = viewport["height"] if viewport else 1080
|
||||
|
||||
# 扩展区域边界(AI 返回的区域可能偏差,向右扩展)
|
||||
x_percent_start = region.get("x_percent_start", 50)
|
||||
x_percent_end = region.get("x_percent_end", 100)
|
||||
|
||||
# 向右扩展 20% 来补偿偏差
|
||||
x_percent_start = max(0, x_percent_start - 5)
|
||||
x_percent_end = min(100, x_percent_end + 25)
|
||||
|
||||
x_start = int(width * x_percent_start / 100)
|
||||
x_end = int(width * x_percent_end / 100)
|
||||
y_start = int(height * region.get("y_percent_start", 20) / 100)
|
||||
y_end = int(height * region.get("y_percent_end", 80) / 100)
|
||||
|
||||
logger.info(f"扩展扫描区域: x[{x_start}-{x_end}], y[{y_start}-{y_end}]")
|
||||
|
||||
# 对输入框进行网格扫描
|
||||
if "输入框" in target or "input" in target.lower():
|
||||
# 在区域内尝试多个点
|
||||
y_center = (y_start + y_end) // 2
|
||||
|
||||
# 从右向左扫描(因为表单在右侧)
|
||||
step = 50 # 每 50 像素尝试一次
|
||||
for x in range(x_end - 50, x_start, -step):
|
||||
logger.info(f"尝试点击: ({x}, {y_center})")
|
||||
self.browser.click_at(x, y_center)
|
||||
self.browser.wait(200)
|
||||
|
||||
if self._check_input_focused():
|
||||
logger.info(f"找到可交互元素: ({x}, {y_center})")
|
||||
return (x, y_center)
|
||||
|
||||
# 如果横向扫描失败,返回区域中心
|
||||
return ((x_start + x_end) // 2, y_center)
|
||||
|
||||
elif "按钮" in target or "button" in target.lower():
|
||||
# 按钮在区域偏下位置
|
||||
center_x = (x_start + x_end) // 2
|
||||
center_y = y_start + int((y_end - y_start) * 0.7)
|
||||
return (center_x, center_y)
|
||||
|
||||
else:
|
||||
# 默认返回区域中心
|
||||
return ((x_start + x_end) // 2, (y_start + y_end) // 2)
|
||||
|
||||
def _check_input_focused(self) -> bool:
|
||||
"""检查是否有输入框获得焦点"""
|
||||
try:
|
||||
# 使用 JavaScript 检查活动元素
|
||||
result = self.browser.page.evaluate("""
|
||||
() => {
|
||||
const el = document.activeElement;
|
||||
return el && (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA');
|
||||
}
|
||||
""")
|
||||
return result
|
||||
except:
|
||||
return False
|
||||
|
||||
def _locate_element(self, target: str, hint: str = "") -> tuple:
|
||||
"""使用 AI 定位元素(保留作为备用方法)"""
|
||||
img = self.browser.screenshot_base64()
|
||||
|
||||
viewport = self.browser.page.viewport_size
|
||||
width = viewport["width"] if viewport else 1920
|
||||
height = viewport["height"] if viewport else 1080
|
||||
|
||||
prompt = f"""在 {width}x{height} 像素的截图中,找到以下元素的精确中心坐标:
|
||||
"{target}"
|
||||
|
||||
{hint}
|
||||
|
||||
返回 JSON: {{"x": 数字, "y": 数字, "found": true}}
|
||||
只返回 JSON。"""
|
||||
|
||||
response = self.analyzer.model.analyze(img, prompt)
|
||||
coords = self._extract_coordinates({"raw_response": response})
|
||||
return coords
|
||||
|
||||
def _verify_click_success(self, target: str, before: str, after: str) -> bool:
|
||||
"""验证点击是否成功"""
|
||||
return True # 简化处理
|
||||
|
||||
def _extract_coordinates(self, response: Dict) -> tuple:
|
||||
"""从 AI 响应中提取坐标"""
|
||||
raw = response.get("raw_response", "")
|
||||
|
||||
# 尝试多种格式匹配
|
||||
patterns = [
|
||||
r'"x"\s*:\s*(\d+).*?"y"\s*:\s*(\d+)',
|
||||
r'x[:\s]+(\d+).*?y[:\s]+(\d+)',
|
||||
r'\((\d+)\s*,\s*(\d+)\)',
|
||||
]
|
||||
|
||||
for pattern in patterns:
|
||||
match = re.search(pattern, raw, re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
return (int(match.group(1)), int(match.group(2)))
|
||||
|
||||
return None
|
||||
|
||||
def _do_type(self, action: Dict[str, Any]) -> None:
|
||||
"""Execute type action"""
|
||||
text = action.get("text", action.get("value", ""))
|
||||
if not text:
|
||||
raise ValueError("输入操作缺少文本内容")
|
||||
|
||||
logger.info(f"执行输入: '{text}'")
|
||||
|
||||
if "selector" in action:
|
||||
self.browser.type_text(action["selector"], text)
|
||||
else:
|
||||
# 直接键盘输入
|
||||
if self.browser.page:
|
||||
# 先清空可能的现有内容
|
||||
self.browser.page.keyboard.press("Control+a")
|
||||
self.browser.wait(50)
|
||||
# 逐字符输入,模拟真实打字
|
||||
self.browser.page.keyboard.type(text, delay=50)
|
||||
self.browser.wait(100)
|
||||
logger.info(f"输入完成: '{text}'")
|
||||
else:
|
||||
raise RuntimeError("浏览器页面未初始化")
|
||||
|
||||
def _do_scroll(self, action: Dict[str, Any]) -> None:
|
||||
"""Execute scroll action"""
|
||||
y = action.get("y", 500)
|
||||
self.browser.scroll(0, int(y))
|
||||
|
||||
def _do_wait(self, action: Dict[str, Any]) -> None:
|
||||
"""Execute wait action"""
|
||||
ms = action.get("ms", action.get("duration", 1000))
|
||||
self.browser.wait(int(ms))
|
||||
|
||||
def _do_verify(self, action: Dict[str, Any], result: Dict[str, Any]) -> None:
|
||||
"""Execute verify action - 使用 AI 验证页面状态"""
|
||||
target = action.get("target", "")
|
||||
|
||||
img = self.browser.screenshot_base64()
|
||||
prompt = f"""请验证以下条件是否满足:
|
||||
"{target}"
|
||||
|
||||
仔细分析当前页面截图,返回 JSON:
|
||||
{{"passed": true/false, "reason": "说明"}}
|
||||
|
||||
只返回 JSON。"""
|
||||
|
||||
response = self.analyzer.model.analyze(img, prompt)
|
||||
|
||||
try:
|
||||
match = re.search(r'\{.*\}', response, re.DOTALL)
|
||||
if match:
|
||||
verify_result = json.loads(match.group())
|
||||
passed = verify_result.get("passed", False)
|
||||
reason = verify_result.get("reason", "")
|
||||
|
||||
result["success"] = passed
|
||||
result["verify_passed"] = passed
|
||||
result["verify_reason"] = reason
|
||||
|
||||
if not passed:
|
||||
logger.warning(f"验证失败: {reason}")
|
||||
else:
|
||||
result["success"] = False
|
||||
result["error"] = "无法解析验证结果"
|
||||
except json.JSONDecodeError as e:
|
||||
result["success"] = False
|
||||
result["error"] = f"JSON 解析失败: {e}"
|
||||
109
src/agent/planner.py
Normal file
109
src/agent/planner.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""
|
||||
Test Planner - AI-driven test planning and decision making
|
||||
"""
|
||||
from typing import List, Dict, Any, Optional
|
||||
import json
|
||||
import re
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestPlanner:
|
||||
"""Plans and coordinates test execution using AI"""
|
||||
|
||||
def __init__(self, analyzer):
|
||||
self.analyzer = analyzer
|
||||
self.history: List[Dict[str, Any]] = []
|
||||
|
||||
def plan_test(self, goal: str, image_base64: str,
|
||||
viewport_width: int = 1920, viewport_height: int = 1080) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Generate test steps for a given goal with precise coordinates
|
||||
|
||||
Args:
|
||||
goal: 测试目标描述
|
||||
image_base64: 页面截图
|
||||
viewport_width: 视口宽度(像素)
|
||||
viewport_height: 视口高度(像素)
|
||||
"""
|
||||
prompt = f"""你是一个精准的 Web 自动化测试助手,专门负责视觉定位和鼠标操作。
|
||||
|
||||
**重要信息**:
|
||||
- 当前截图尺寸: {viewport_width} x {viewport_height} 像素
|
||||
- 坐标系: 左上角为 (0, 0),右下角为 ({viewport_width}, {viewport_height})
|
||||
- 所有坐标必须是基于此尺寸的精确像素值
|
||||
|
||||
**目标**: {goal}
|
||||
|
||||
**任务**: 分析截图,生成精确的鼠标和键盘操作步骤。
|
||||
|
||||
**关键要求**:
|
||||
1. click 操作: 必须提供精确的 x, y 像素坐标(元素中心点)
|
||||
2. type 操作: 确保之前已 click 对应的输入框
|
||||
3. 坐标精度: 精确到像素级别
|
||||
|
||||
**返回格式** (只返回 JSON,无其他内容):
|
||||
```json
|
||||
[
|
||||
{{"step": 1, "action": "click", "target": "元素描述", "x": 数字, "y": 数字}},
|
||||
{{"step": 2, "action": "type", "value": "输入内容"}},
|
||||
{{"step": 3, "action": "verify", "target": "验证条件"}}
|
||||
]
|
||||
```
|
||||
|
||||
**action 类型**:
|
||||
- click: 鼠标点击 (必须有 x, y)
|
||||
- type: 键盘输入 (必须有 value)
|
||||
- scroll: 滚动页面 (y 为滚动距离)
|
||||
- wait: 等待 (ms 为毫秒数)
|
||||
- verify: 验证页面状态
|
||||
|
||||
请分析 {viewport_width}x{viewport_height} 像素的截图,返回测试步骤:"""
|
||||
|
||||
response = self.analyzer.model.analyze(image_base64, prompt)
|
||||
steps = self._parse_steps(response)
|
||||
|
||||
logger.info(f"计划生成 {len(steps)} 个步骤 (视口: {viewport_width}x{viewport_height})")
|
||||
for step in steps:
|
||||
if step.get("action") == "click" and "x" in step and "y" in step:
|
||||
logger.info(f" 步骤 {step.get('step')}: click ({step['x']}, {step['y']}) - {step.get('target', '')}")
|
||||
else:
|
||||
logger.info(f" 步骤 {step.get('step')}: {step.get('action')} - {step.get('target', step.get('value', ''))}")
|
||||
|
||||
return steps
|
||||
|
||||
def _parse_steps(self, response: str) -> List[Dict[str, Any]]:
|
||||
"""Parse AI response into structured steps"""
|
||||
try:
|
||||
# 尝试提取 JSON 数组
|
||||
match = re.search(r'\[[\s\S]*\]', response)
|
||||
if match:
|
||||
steps = json.loads(match.group())
|
||||
# 验证步骤格式
|
||||
return self._validate_steps(steps)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f"JSON 解析失败: {e}")
|
||||
|
||||
logger.warning(f"无法解析响应: {response[:200]}")
|
||||
return [{"raw": response, "error": "解析失败"}]
|
||||
|
||||
def _validate_steps(self, steps: List[Dict]) -> List[Dict]:
|
||||
"""验证和修正步骤格式"""
|
||||
validated = []
|
||||
for step in steps:
|
||||
action = step.get("action", "").lower()
|
||||
|
||||
# click 操作必须有坐标
|
||||
if action == "click" and ("x" not in step or "y" not in step):
|
||||
logger.warning(f"click 操作缺少坐标: {step}")
|
||||
# 保留步骤但标记需要坐标
|
||||
step["needs_coordinates"] = True
|
||||
|
||||
# type 操作必须有 value
|
||||
if action == "type" and not step.get("value"):
|
||||
logger.warning(f"type 操作缺少 value: {step}")
|
||||
|
||||
validated.append(step)
|
||||
|
||||
return validated
|
||||
Reference in New Issue
Block a user