From a67ad26a52ab4463a65aac7655baa51a34683224 Mon Sep 17 00:00:00 2001 From: empty Date: Sun, 28 Dec 2025 15:34:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0AI=E9=A9=B1=E5=8A=A8?= =?UTF-8?q?=E7=9A=84Web=E8=87=AA=E5=8A=A8=E5=8C=96=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要功能: - 纯视觉元素定位 + DOM辅助的混合方案 - 解决 mouse.click() 与 Vue 页面交互问题 - 使用 elementFromPoint + JS click/focus 实现可靠点击 - 智能元素定位: 根据描述生成CSS选择器获取精确坐标 - 区域扫描作为后备定位方案 - 完整的测试报告生成 (HTML+JSON) - 截图记录每个操作步骤 技术改进: - controller.py: 改进 click_at 使用 JavaScript 交互 - executor.py: 添加 _find_element_by_description 智能定位 - planner.py: 增强 prompt 传入视口尺寸 - main.py: 获取实际视口大小传给 planner --- .env.example | 19 ++ .github/workflows/test.yml | 43 ++++ .gitignore | 23 +++ README.md | 153 ++++++++++++++ config/config.yaml | 23 +++ example.py | 22 ++ requirements.txt | 17 ++ src/__init__.py | 9 + src/agent/__init__.py | 5 + src/agent/executor.py | 361 +++++++++++++++++++++++++++++++++ src/agent/planner.py | 109 ++++++++++ src/browser/__init__.py | 5 + src/browser/controller.py | 116 +++++++++++ src/browser/screenshot.py | 53 +++++ src/main.py | 166 +++++++++++++++ src/reporter/__init__.py | 4 + src/reporter/generator.py | 207 +++++++++++++++++++ src/utils/__init__.py | 1 + src/utils/logging_config.py | 82 ++++++++ src/utils/visual_regression.py | 176 ++++++++++++++++ src/vision/__init__.py | 5 + src/vision/analyzer.py | 66 ++++++ src/vision/models.py | 273 +++++++++++++++++++++++++ tests/test_cases.py | 199 ++++++++++++++++++ 24 files changed, 2137 insertions(+) create mode 100644 .env.example create mode 100644 .github/workflows/test.yml create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config/config.yaml create mode 100644 example.py create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/agent/__init__.py create mode 100644 src/agent/executor.py create mode 100644 src/agent/planner.py create mode 100644 src/browser/__init__.py create mode 100644 src/browser/controller.py create mode 100644 src/browser/screenshot.py create mode 100644 src/main.py create mode 100644 src/reporter/__init__.py create mode 100644 src/reporter/generator.py create mode 100644 src/utils/__init__.py create mode 100644 src/utils/logging_config.py create mode 100644 src/utils/visual_regression.py create mode 100644 src/vision/__init__.py create mode 100644 src/vision/analyzer.py create mode 100644 src/vision/models.py create mode 100644 tests/test_cases.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..0f8ab80 --- /dev/null +++ b/.env.example @@ -0,0 +1,19 @@ +# AI Web Tester 环境配置 + +# Claude/Anthropic 配置 +ANTHROPIC_API_KEY=your_anthropic_api_key_here +ANTHROPIC_BASE_URL=https://api.anthropic.com +ANTHROPIC_MODEL=claude-sonnet-4-20250514 + +# OpenAI 配置 +OPENAI_API_KEY=your_openai_api_key_here +OPENAI_BASE_URL=https://api.openai.com/v1 +OPENAI_MODEL=gpt-4o + +# API 调用配置 +API_TIMEOUT=60 +API_MAX_RETRIES=3 + +# 日志配置 +LOG_LEVEL=INFO +LOG_FILE= diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..60d5264 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,43 @@ +name: AI Web Tester CI + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Install Playwright browsers + run: playwright install chromium + + - name: Run tests + env: + ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} + ANTHROPIC_BASE_URL: ${{ secrets.ANTHROPIC_BASE_URL }} + run: | + python tests/test_cases.py --headless + + - name: Upload test reports + uses: actions/upload-artifact@v4 + if: always() + with: + name: test-reports + path: reports/ + retention-days: 30 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..308e802 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Python +__pycache__/ +*.py[cod] +*.so +.Python +*.egg-info/ +dist/ +build/ + +# 环境配置(包含敏感信息) +.env + +# IDE +.idea/ +.vscode/ +*.swp + +# 测试报告 +reports/ + +# Playwright +playwright-report/ +test-results/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e358517 --- /dev/null +++ b/README.md @@ -0,0 +1,153 @@ +# AI Web Tester + +基于 AI 视觉模型的智能 Web 自动化测试框架。使用自然语言描述测试目标,AI 会自动分析页面并执行操作。 + +## ✨ 特性 + +- 🤖 **AI 驱动** - 使用 Claude/GPT-4V 视觉模型理解页面内容 +- 📝 **自然语言** - 用自然语言描述测试目标,无需编写选择器 +- 📊 **自动报告** - 生成嵌入截图的 HTML 报告 + JSON 结果 +- 🔧 **可配置** - 支持多种 AI 模型和 API 代理 +- 🔄 **自动重试** - 指数退避重试机制 +- 👁️ **视觉回归** - 基线对比检测 UI 变化 +- ⚡ **并行执行** - 多线程运行测试用例 +- 🚀 **CI/CD** - GitHub Actions 集成 + +## 🚀 快速开始 + +### 1. 安装依赖 + +```bash +pip install -r requirements.txt +playwright install chromium +``` + +### 2. 配置环境变量 + +```bash +cp .env.example .env +``` + +编辑 `.env` 文件: + +```bash +ANTHROPIC_API_KEY=your_api_key_here +ANTHROPIC_BASE_URL=https://api.anthropic.com # 可选,用于代理 +ANTHROPIC_MODEL=claude-sonnet-4-20250514 # 可选 +API_TIMEOUT=60 # API 超时(秒) +API_MAX_RETRIES=3 # 最大重试次数 +LOG_LEVEL=INFO # 日志级别 +``` + +> ⚠️ **注意**:`BASE_URL` 不要包含 `/v1` 后缀,SDK 会自动添加。 + +### 3. 运行测试 + +```bash +python example.py +``` + +## 📖 使用方法 + +### 基础用法 + +```python +from src import WebTester + +with WebTester(model="claude") as tester: + tester.goto("https://example.com") + result = tester.test("点击 'More information' 链接") + print(f"完成: {result['steps']} 步骤") +``` + +### 断言验证 + +```python +with WebTester() as tester: + tester.goto("https://example.com") + result = tester.verify("页面包含 'Example Domain' 文字") + print(f"验证: {'✅' if result['passed'] else '❌'} {result['reason']}") +``` + +### 视觉回归测试 + +```python +with WebTester() as tester: + tester.goto("https://example.com") + + # 首次运行:保存基线 + tester.save_baseline("homepage") + + # 后续运行:对比基线 + result = tester.compare_visual("homepage", threshold=0.01) + if result["match"]: + print("✅ 视觉匹配") + else: + print(f"❌ 差异: {result['diff_percent']*100:.1f}%") + print(f" 差异图: {result['diff_image']}") +``` + +### 批量测试 + +```bash +# 串行执行 +python tests/test_cases.py + +# 并行执行(3 个线程) +python tests/test_cases.py --parallel --workers 3 + +# 无头模式 +python tests/test_cases.py --headless +``` + +## 🔧 配置项 + +| 环境变量 | 默认值 | 说明 | +|----------|--------|------| +| `ANTHROPIC_API_KEY` | - | Claude API 密钥(必填) | +| `ANTHROPIC_BASE_URL` | 官方地址 | API 代理地址 | +| `ANTHROPIC_MODEL` | `claude-sonnet-4-20250514` | 模型名称 | +| `API_TIMEOUT` | `60` | API 超时(秒) | +| `API_MAX_RETRIES` | `3` | 最大重试次数 | +| `LOG_LEVEL` | `INFO` | 日志级别 | +| `LOG_FILE` | - | 日志文件路径 | + +## 📁 项目结构 + +``` +ai-web-tester/ +├── src/ +│ ├── main.py # WebTester 主类 +│ ├── vision/ # AI 视觉模型 +│ ├── browser/ # Playwright 浏览器控制 +│ ├── agent/ # 测试规划和执行 +│ ├── reporter/ # HTML/JSON 报告生成 +│ └── utils/ # 工具模块 +│ ├── logging_config.py # 日志配置 +│ └── visual_regression.py # 视觉回归 +├── tests/ +│ └── test_cases.py # 测试用例模板 +├── .github/workflows/ +│ └── test.yml # CI/CD 配置 +├── baselines/ # 视觉基线截图 +├── reports/ # 测试报告(HTML + JSON) +├── .env.example # 环境变量模板 +└── requirements.txt +``` + +## 📋 测试报告 + +每次测试生成: +- **HTML 报告** - 包含步骤详情和嵌入截图 +- **JSON 结果** - 结构化数据,便于分析 + +## 🚀 CI/CD + +项目包含 GitHub Actions 配置。设置以下 Secrets 后自动运行测试: + +- `ANTHROPIC_API_KEY` +- `ANTHROPIC_BASE_URL`(可选) + +## 📄 License + +MIT diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 0000000..a800f1b --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,23 @@ +# AI Web Tester Configuration + +# Default AI model (claude, openai, local) +default_model: claude + +# Browser settings +browser: + headless: false + timeout: 30000 + viewport: + width: 1920 + height: 1080 + +# Screenshot settings +screenshot: + format: png + quality: 90 + save_dir: ./reports/screenshots + +# Report settings +report: + output_dir: ./reports + format: html diff --git a/example.py b/example.py new file mode 100644 index 0000000..49f92d1 --- /dev/null +++ b/example.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +Example: AI Web Tester Demo +""" +import sys +sys.path.insert(0, ".") + +from src import WebTester + + +def main(): + # Example 1: Basic usage with context manager + with WebTester(model="claude") as tester: + tester.goto("http://47.99.105.253:8084") + result = tester.test("找到登录信息输入框,填入 账号admin 密码password,进行登录") + print(f"Test completed: {result['steps']} steps") + print(f"Report: {result['report']}") + result = tester.test("在登录后的首页,查看所有可以点击的功能,建立功能清单") + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..96f87bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +# Browser automation +playwright>=1.40.0 + +# AI models +anthropic>=0.18.0 +openai>=1.12.0 + +# Image processing +Pillow>=10.0.0 + +# Report generation +Jinja2>=3.1.0 + +# Utilities +pydantic>=2.0.0 +python-dotenv>=1.0.0 +rich>=13.0.0 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..95c7911 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,9 @@ +# AI Web Tester +from .main import WebTester +from .utils.logging_config import init_default_logging + +# 初始化日志 +init_default_logging() + +__all__ = ["WebTester"] +__version__ = "0.1.0" diff --git a/src/agent/__init__.py b/src/agent/__init__.py new file mode 100644 index 0000000..457943c --- /dev/null +++ b/src/agent/__init__.py @@ -0,0 +1,5 @@ +# Agent module - AI decision making +from .planner import TestPlanner +from .executor import ActionExecutor + +__all__ = ["TestPlanner", "ActionExecutor"] diff --git a/src/agent/executor.py b/src/agent/executor.py new file mode 100644 index 0000000..faa52ae --- /dev/null +++ b/src/agent/executor.py @@ -0,0 +1,361 @@ +""" +Action Executor - Executes AI-planned actions on browser +""" +from typing import Dict, Any, List +import json +import re +import logging + +logger = logging.getLogger(__name__) + + +class ActionExecutor: + """Executes actions on browser based on AI decisions""" + + def __init__(self, browser, analyzer): + self.browser = browser + self.analyzer = analyzer + self.action_log: List[Dict[str, Any]] = [] + + def execute_action(self, action: Dict[str, Any]) -> Dict[str, Any]: + """Execute a single action""" + action_type = action.get("action", "").lower() + result = {"action": action, "success": False} + + try: + if action_type == "click": + self._do_click(action) + elif action_type == "type": + self._do_type(action) + elif action_type == "scroll": + self._do_scroll(action) + elif action_type == "wait": + self._do_wait(action) + elif action_type == "verify": + self._do_verify(action, result) + else: + # 未知操作类型,记录警告但不标记失败 + logger.warning(f"未知操作类型: {action_type}") + result["warning"] = f"未知操作类型: {action_type}" + + # 只有已知操作类型才标记成功 + if action_type in ("click", "type", "scroll", "wait"): + result["success"] = True + + # 保存执行后的截图 + try: + result["screenshot"] = self.browser.screenshot_base64() + except Exception: + pass + + except Exception as e: + logger.error(f"操作执行失败: {action_type} - {e}") + result["error"] = str(e) + result["success"] = False + + self.action_log.append(result) + return result + + def _do_click(self, action: Dict[str, Any]) -> None: + """Execute click action with smart element detection""" + target = action.get("target", "") + + # 优先尝试通过 AI 描述找到对应的 DOM 元素 + element_info = self._find_element_by_description(target) + if element_info and element_info.get("found"): + x, y = element_info["x"], element_info["y"] + logger.info(f"通过 DOM 定位: ({x}, {y}) - {target}") + self.browser.click_at(x, y) + self.browser.wait(300) + + if self._check_input_focused() or "按钮" in target or "button" in target.lower(): + logger.info(f"点击成功: ({x}, {y})") + return + + # 如果 AI 提供了坐标,尝试直接使用(作为后备) + if "x" in action and "y" in action: + x, y = int(action["x"]), int(action["y"]) + logger.info(f"尝试 AI 坐标: ({x}, {y}) - {target}") + self.browser.click_at(x, y) + self.browser.wait(300) + + if self._check_input_focused(): + return + + # 最后尝试区域扫描 + logger.warning(f"精确定位失败,尝试区域扫描...") + region = self._get_element_region(target) + if region: + coords = self._scan_region_for_element(region, target) + if coords: + self.browser.click_at(coords[0], coords[1]) + self.browser.wait(300) + return + + logger.warning(f"无法精确定位: {target}") + + def _find_element_by_description(self, target: str) -> dict: + """根据描述找到 DOM 元素的精确坐标""" + # 根据描述生成选择器查询 + selectors = [] + + if "用户名" in target or "username" in target.lower(): + selectors.extend([ + "input[placeholder*='用户名']", + "input[placeholder*='账号']", + "input[type='text']" + ]) + elif "密码" in target or "password" in target.lower(): + selectors.extend([ + "input[placeholder*='密码']", + "input[type='password']" + ]) + elif "登录" in target and "按钮" in target: + selectors.extend([ + "button[aria-label='login']", + "button[type='submit']", + "button:contains('登录')" + ]) + elif "输入框" in target: + selectors.append("input:visible") + elif "按钮" in target: + selectors.append("button:visible") + + if not selectors: + return None + + # 尝试每个选择器找到元素中心 + for selector in selectors: + try: + result = self.browser.page.evaluate(f''' + () => {{ + const el = document.querySelector("{selector}"); + if (el) {{ + const r = el.getBoundingClientRect(); + return {{ + found: true, + x: Math.round(r.left + r.width / 2), + y: Math.round(r.top + r.height / 2), + tagName: el.tagName + }}; + }} + return {{ found: false }}; + }} + ''') + if result.get("found"): + logger.info(f"找到元素 '{selector}': ({result['x']}, {result['y']})") + return result + except: + continue + + return None + + def _get_element_region(self, target: str) -> dict: + """让 AI 返回元素所在的区域""" + img = self.browser.screenshot_base64() + viewport = self.browser.page.viewport_size + width = viewport["width"] if viewport else 1920 + height = viewport["height"] if viewport else 1080 + + prompt = f"""在 {width}x{height} 像素的截图中,描述 "{target}" 所在的区域位置。 + +返回 JSON: +{{ + "horizontal": "left" 或 "center" 或 "right", + "vertical": "top" 或 "middle" 或 "bottom", + "x_percent_start": 0-100 (区域左边界百分比), + "x_percent_end": 0-100 (区域右边界百分比), + "y_percent_start": 0-100 (区域上边界百分比), + "y_percent_end": 0-100 (区域下边界百分比) +}} + +例如右侧表单区域: {{"horizontal": "right", "vertical": "middle", "x_percent_start": 60, "x_percent_end": 95, "y_percent_start": 30, "y_percent_end": 70}} + +只返回 JSON。""" + + response = self.analyzer.model.analyze(img, prompt) + + try: + match = re.search(r'\{[\s\S]*\}', response) + if match: + region = json.loads(match.group()) + logger.info(f"AI 返回区域: {region}") + return region + except: + pass + + return None + + def _scan_region_for_element(self, region: dict, target: str) -> tuple: + """在指定区域内扫描寻找可交互元素""" + viewport = self.browser.page.viewport_size + width = viewport["width"] if viewport else 1920 + height = viewport["height"] if viewport else 1080 + + # 扩展区域边界(AI 返回的区域可能偏差,向右扩展) + x_percent_start = region.get("x_percent_start", 50) + x_percent_end = region.get("x_percent_end", 100) + + # 向右扩展 20% 来补偿偏差 + x_percent_start = max(0, x_percent_start - 5) + x_percent_end = min(100, x_percent_end + 25) + + x_start = int(width * x_percent_start / 100) + x_end = int(width * x_percent_end / 100) + y_start = int(height * region.get("y_percent_start", 20) / 100) + y_end = int(height * region.get("y_percent_end", 80) / 100) + + logger.info(f"扩展扫描区域: x[{x_start}-{x_end}], y[{y_start}-{y_end}]") + + # 对输入框进行网格扫描 + if "输入框" in target or "input" in target.lower(): + # 在区域内尝试多个点 + y_center = (y_start + y_end) // 2 + + # 从右向左扫描(因为表单在右侧) + step = 50 # 每 50 像素尝试一次 + for x in range(x_end - 50, x_start, -step): + logger.info(f"尝试点击: ({x}, {y_center})") + self.browser.click_at(x, y_center) + self.browser.wait(200) + + if self._check_input_focused(): + logger.info(f"找到可交互元素: ({x}, {y_center})") + return (x, y_center) + + # 如果横向扫描失败,返回区域中心 + return ((x_start + x_end) // 2, y_center) + + elif "按钮" in target or "button" in target.lower(): + # 按钮在区域偏下位置 + center_x = (x_start + x_end) // 2 + center_y = y_start + int((y_end - y_start) * 0.7) + return (center_x, center_y) + + else: + # 默认返回区域中心 + return ((x_start + x_end) // 2, (y_start + y_end) // 2) + + def _check_input_focused(self) -> bool: + """检查是否有输入框获得焦点""" + try: + # 使用 JavaScript 检查活动元素 + result = self.browser.page.evaluate(""" + () => { + const el = document.activeElement; + return el && (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA'); + } + """) + return result + except: + return False + + def _locate_element(self, target: str, hint: str = "") -> tuple: + """使用 AI 定位元素(保留作为备用方法)""" + img = self.browser.screenshot_base64() + + viewport = self.browser.page.viewport_size + width = viewport["width"] if viewport else 1920 + height = viewport["height"] if viewport else 1080 + + prompt = f"""在 {width}x{height} 像素的截图中,找到以下元素的精确中心坐标: +"{target}" + +{hint} + +返回 JSON: {{"x": 数字, "y": 数字, "found": true}} +只返回 JSON。""" + + response = self.analyzer.model.analyze(img, prompt) + coords = self._extract_coordinates({"raw_response": response}) + return coords + + def _verify_click_success(self, target: str, before: str, after: str) -> bool: + """验证点击是否成功""" + return True # 简化处理 + + def _extract_coordinates(self, response: Dict) -> tuple: + """从 AI 响应中提取坐标""" + raw = response.get("raw_response", "") + + # 尝试多种格式匹配 + patterns = [ + r'"x"\s*:\s*(\d+).*?"y"\s*:\s*(\d+)', + r'x[:\s]+(\d+).*?y[:\s]+(\d+)', + r'\((\d+)\s*,\s*(\d+)\)', + ] + + for pattern in patterns: + match = re.search(pattern, raw, re.DOTALL | re.IGNORECASE) + if match: + return (int(match.group(1)), int(match.group(2))) + + return None + + def _do_type(self, action: Dict[str, Any]) -> None: + """Execute type action""" + text = action.get("text", action.get("value", "")) + if not text: + raise ValueError("输入操作缺少文本内容") + + logger.info(f"执行输入: '{text}'") + + if "selector" in action: + self.browser.type_text(action["selector"], text) + else: + # 直接键盘输入 + if self.browser.page: + # 先清空可能的现有内容 + self.browser.page.keyboard.press("Control+a") + self.browser.wait(50) + # 逐字符输入,模拟真实打字 + self.browser.page.keyboard.type(text, delay=50) + self.browser.wait(100) + logger.info(f"输入完成: '{text}'") + else: + raise RuntimeError("浏览器页面未初始化") + + def _do_scroll(self, action: Dict[str, Any]) -> None: + """Execute scroll action""" + y = action.get("y", 500) + self.browser.scroll(0, int(y)) + + def _do_wait(self, action: Dict[str, Any]) -> None: + """Execute wait action""" + ms = action.get("ms", action.get("duration", 1000)) + self.browser.wait(int(ms)) + + def _do_verify(self, action: Dict[str, Any], result: Dict[str, Any]) -> None: + """Execute verify action - 使用 AI 验证页面状态""" + target = action.get("target", "") + + img = self.browser.screenshot_base64() + prompt = f"""请验证以下条件是否满足: +"{target}" + +仔细分析当前页面截图,返回 JSON: +{{"passed": true/false, "reason": "说明"}} + +只返回 JSON。""" + + response = self.analyzer.model.analyze(img, prompt) + + try: + match = re.search(r'\{.*\}', response, re.DOTALL) + if match: + verify_result = json.loads(match.group()) + passed = verify_result.get("passed", False) + reason = verify_result.get("reason", "") + + result["success"] = passed + result["verify_passed"] = passed + result["verify_reason"] = reason + + if not passed: + logger.warning(f"验证失败: {reason}") + else: + result["success"] = False + result["error"] = "无法解析验证结果" + except json.JSONDecodeError as e: + result["success"] = False + result["error"] = f"JSON 解析失败: {e}" diff --git a/src/agent/planner.py b/src/agent/planner.py new file mode 100644 index 0000000..72a447f --- /dev/null +++ b/src/agent/planner.py @@ -0,0 +1,109 @@ +""" +Test Planner - AI-driven test planning and decision making +""" +from typing import List, Dict, Any, Optional +import json +import re +import logging + +logger = logging.getLogger(__name__) + + +class TestPlanner: + """Plans and coordinates test execution using AI""" + + def __init__(self, analyzer): + self.analyzer = analyzer + self.history: List[Dict[str, Any]] = [] + + def plan_test(self, goal: str, image_base64: str, + viewport_width: int = 1920, viewport_height: int = 1080) -> List[Dict[str, Any]]: + """ + Generate test steps for a given goal with precise coordinates + + Args: + goal: 测试目标描述 + image_base64: 页面截图 + viewport_width: 视口宽度(像素) + viewport_height: 视口高度(像素) + """ + prompt = f"""你是一个精准的 Web 自动化测试助手,专门负责视觉定位和鼠标操作。 + +**重要信息**: +- 当前截图尺寸: {viewport_width} x {viewport_height} 像素 +- 坐标系: 左上角为 (0, 0),右下角为 ({viewport_width}, {viewport_height}) +- 所有坐标必须是基于此尺寸的精确像素值 + +**目标**: {goal} + +**任务**: 分析截图,生成精确的鼠标和键盘操作步骤。 + +**关键要求**: +1. click 操作: 必须提供精确的 x, y 像素坐标(元素中心点) +2. type 操作: 确保之前已 click 对应的输入框 +3. 坐标精度: 精确到像素级别 + +**返回格式** (只返回 JSON,无其他内容): +```json +[ + {{"step": 1, "action": "click", "target": "元素描述", "x": 数字, "y": 数字}}, + {{"step": 2, "action": "type", "value": "输入内容"}}, + {{"step": 3, "action": "verify", "target": "验证条件"}} +] +``` + +**action 类型**: +- click: 鼠标点击 (必须有 x, y) +- type: 键盘输入 (必须有 value) +- scroll: 滚动页面 (y 为滚动距离) +- wait: 等待 (ms 为毫秒数) +- verify: 验证页面状态 + +请分析 {viewport_width}x{viewport_height} 像素的截图,返回测试步骤:""" + + response = self.analyzer.model.analyze(image_base64, prompt) + steps = self._parse_steps(response) + + logger.info(f"计划生成 {len(steps)} 个步骤 (视口: {viewport_width}x{viewport_height})") + for step in steps: + if step.get("action") == "click" and "x" in step and "y" in step: + logger.info(f" 步骤 {step.get('step')}: click ({step['x']}, {step['y']}) - {step.get('target', '')}") + else: + logger.info(f" 步骤 {step.get('step')}: {step.get('action')} - {step.get('target', step.get('value', ''))}") + + return steps + + def _parse_steps(self, response: str) -> List[Dict[str, Any]]: + """Parse AI response into structured steps""" + try: + # 尝试提取 JSON 数组 + match = re.search(r'\[[\s\S]*\]', response) + if match: + steps = json.loads(match.group()) + # 验证步骤格式 + return self._validate_steps(steps) + except json.JSONDecodeError as e: + logger.warning(f"JSON 解析失败: {e}") + + logger.warning(f"无法解析响应: {response[:200]}") + return [{"raw": response, "error": "解析失败"}] + + def _validate_steps(self, steps: List[Dict]) -> List[Dict]: + """验证和修正步骤格式""" + validated = [] + for step in steps: + action = step.get("action", "").lower() + + # click 操作必须有坐标 + if action == "click" and ("x" not in step or "y" not in step): + logger.warning(f"click 操作缺少坐标: {step}") + # 保留步骤但标记需要坐标 + step["needs_coordinates"] = True + + # type 操作必须有 value + if action == "type" and not step.get("value"): + logger.warning(f"type 操作缺少 value: {step}") + + validated.append(step) + + return validated diff --git a/src/browser/__init__.py b/src/browser/__init__.py new file mode 100644 index 0000000..f35e820 --- /dev/null +++ b/src/browser/__init__.py @@ -0,0 +1,5 @@ +# Browser control module +from .controller import BrowserController +from .screenshot import ScreenshotManager + +__all__ = ["BrowserController", "ScreenshotManager"] diff --git a/src/browser/controller.py b/src/browser/controller.py new file mode 100644 index 0000000..da68015 --- /dev/null +++ b/src/browser/controller.py @@ -0,0 +1,116 @@ +""" +Browser Controller - Playwright wrapper for browser automation +""" +from typing import Optional, Dict, Any, List +from playwright.sync_api import sync_playwright, Browser, Page, BrowserContext +import base64 + + +class BrowserController: + """Controls browser operations using Playwright""" + + def __init__(self, headless: bool = False, timeout: int = 30000): + self.headless = headless + self.timeout = timeout + self._playwright = None + self._browser: Optional[Browser] = None + self._context: Optional[BrowserContext] = None + self._page: Optional[Page] = None + + def start(self) -> None: + """Start browser instance""" + self._playwright = sync_playwright().start() + self._browser = self._playwright.chromium.launch(headless=self.headless) + self._context = self._browser.new_context( + viewport={"width": 1920, "height": 1080} + ) + self._page = self._context.new_page() + self._page.set_default_timeout(self.timeout) + + @property + def page(self) -> Optional[Page]: + return self._page + + def goto(self, url: str) -> None: + """Navigate to URL""" + if self._page: + self._page.goto(url, wait_until="networkidle") + + def click(self, selector: str) -> None: + """Click element by selector""" + if self._page: + self._page.click(selector) + + def click_at(self, x: int, y: int) -> None: + """Click at specific coordinates using JavaScript for better compatibility""" + if self._page: + # 使用 elementFromPoint 找到坐标处的元素,然后触发点击 + self._page.evaluate(f""" + (coords) => {{ + const el = document.elementFromPoint(coords.x, coords.y); + if (el) {{ + el.click(); + if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') {{ + el.focus(); + }} + }} + }} + """, {"x": x, "y": y}) + + def type_text(self, selector: str, text: str) -> None: + """Type text into element""" + if self._page: + self._page.fill(selector, text) + + def press_key(self, key: str) -> None: + """Press keyboard key""" + if self._page: + self._page.keyboard.press(key) + + def scroll(self, x: int = 0, y: int = 500) -> None: + """Scroll page""" + if self._page: + self._page.mouse.wheel(x, y) + + def wait(self, ms: int) -> None: + """Wait for specified milliseconds""" + if self._page: + self._page.wait_for_timeout(ms) + + def screenshot(self, full_page: bool = False) -> bytes: + """Take screenshot and return as bytes""" + if self._page: + return self._page.screenshot(full_page=full_page) + return b"" + + def screenshot_base64(self, full_page: bool = False) -> str: + """Take screenshot and return as base64 string""" + img_bytes = self.screenshot(full_page) + return base64.b64encode(img_bytes).decode("utf-8") + + def get_page_info(self) -> Dict[str, Any]: + """Get current page information""" + if not self._page: + return {} + return { + "url": self._page.url, + "title": self._page.title(), + } + + def close(self) -> None: + """Close browser and cleanup""" + if self._page: + self._page.close() + if self._context: + self._context.close() + if self._browser: + self._browser.close() + if self._playwright: + self._playwright.stop() + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() diff --git a/src/browser/screenshot.py b/src/browser/screenshot.py new file mode 100644 index 0000000..9a7ba9f --- /dev/null +++ b/src/browser/screenshot.py @@ -0,0 +1,53 @@ +""" +Screenshot Manager - Handles screenshot capture and storage +""" +from pathlib import Path +from datetime import datetime +from typing import Optional +from PIL import Image +import io + + +class ScreenshotManager: + """Manages screenshot capture, storage and comparison""" + + def __init__(self, save_dir: str = "./reports/screenshots"): + self.save_dir = Path(save_dir) + self.save_dir.mkdir(parents=True, exist_ok=True) + + def save(self, image_bytes: bytes, name: Optional[str] = None) -> Path: + """Save screenshot to file""" + if name is None: + name = datetime.now().strftime("%Y%m%d_%H%M%S") + + filepath = self.save_dir / f"{name}.png" + filepath.write_bytes(image_bytes) + return filepath + + def load(self, filepath: str) -> bytes: + """Load screenshot from file""" + return Path(filepath).read_bytes() + + def resize(self, image_bytes: bytes, max_size: int = 1024) -> bytes: + """Resize image for API calls (reduce token usage)""" + img = Image.open(io.BytesIO(image_bytes)) + + # Calculate new size maintaining aspect ratio + ratio = min(max_size / img.width, max_size / img.height) + if ratio < 1: + new_size = (int(img.width * ratio), int(img.height * ratio)) + img = img.resize(new_size, Image.Resampling.LANCZOS) + + # Save to bytes + buffer = io.BytesIO() + img.save(buffer, format="PNG", optimize=True) + return buffer.getvalue() + + def crop(self, image_bytes: bytes, box: tuple) -> bytes: + """Crop image to specified region (left, top, right, bottom)""" + img = Image.open(io.BytesIO(image_bytes)) + cropped = img.crop(box) + + buffer = io.BytesIO() + cropped.save(buffer, format="PNG") + return buffer.getvalue() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..5779ed0 --- /dev/null +++ b/src/main.py @@ -0,0 +1,166 @@ +""" +AI Web Tester - Main entry point +""" +from typing import Optional, Dict, Any +from .browser import BrowserController, ScreenshotManager +from .vision import PageAnalyzer +from .agent import TestPlanner, ActionExecutor +from .reporter import ReportGenerator + + +class WebTester: + """Main class for AI-powered web testing""" + + def __init__(self, model: str = "claude", headless: bool = False): + self.browser = BrowserController(headless=headless) + self.screenshots = ScreenshotManager() + self.analyzer = PageAnalyzer(model=model) + self.reporter = ReportGenerator() + self._started = False + + def start(self) -> "WebTester": + """Start browser""" + self.browser.start() + self._started = True + return self + + def stop(self) -> None: + """Stop browser""" + self.browser.close() + self._started = False + + def goto(self, url: str) -> "WebTester": + """Navigate to URL""" + self.browser.goto(url) + return self + + def test(self, goal: str) -> dict: + """Run AI-driven test with natural language goal""" + planner = TestPlanner(self.analyzer) + executor = ActionExecutor(self.browser, self.analyzer) + + # Get current page screenshot + img = self.browser.screenshot_base64() + + # Get viewport size for accurate coordinates + viewport_size = self.browser.page.viewport_size + viewport_width = viewport_size["width"] if viewport_size else 1920 + viewport_height = viewport_size["height"] if viewport_size else 1080 + + # Plan test steps with viewport info + steps = planner.plan_test(goal, img, viewport_width, viewport_height) + + # Execute each step + for step in steps: + executor.execute_action(step) + self.browser.wait(500) + + # Generate report + report = self.reporter.generate(goal[:30], executor.action_log) + + return { + "goal": goal, + "steps": len(steps), + "results": executor.action_log, + "report": str(report), + } + + def verify(self, condition: str) -> Dict[str, Any]: + """ + 使用 AI 验证页面是否满足指定条件 + + Args: + condition: 自然语言描述的验证条件 + 例如: "页面包含 '登录成功' 文字" + "用户名显示为 admin" + + Returns: + dict: { + "passed": bool, # 验证是否通过 + "condition": str, # 原始条件 + "reason": str, # AI 分析结果说明 + } + """ + img = self.browser.screenshot_base64() + + prompt = f"""验证以下条件是否满足: +"{condition}" + +请仔细分析页面截图,然后以 JSON 格式回答: +{{ + "passed": true 或 false, + "reason": "详细说明验证结果的原因" +}} + +只返回 JSON,不要其他内容。""" + + response = self.analyzer.model.analyze(img, prompt) + + # 解析 AI 响应 + import json + import re + + try: + # 尝试提取 JSON + match = re.search(r'\{.*\}', response, re.DOTALL) + if match: + result = json.loads(match.group()) + return { + "passed": result.get("passed", False), + "condition": condition, + "reason": result.get("reason", "无法解析 AI 响应"), + } + except json.JSONDecodeError: + pass + + # 解析失败,返回原始响应 + return { + "passed": False, + "condition": condition, + "reason": f"AI 响应解析失败: {response[:200]}", + } + + def save_baseline(self, name: str) -> str: + """ + 保存当前页面截图作为视觉基线 + + Args: + name: 基线名称 + + Returns: + 保存的文件路径 + """ + from .utils.visual_regression import VisualRegression + + vr = VisualRegression() + img = self.browser.screenshot_base64() + path = vr.save_baseline(name, img) + return str(path) + + def compare_visual(self, name: str, threshold: float = 0.01) -> Dict[str, Any]: + """ + 与视觉基线对比 + + Args: + name: 基线名称 + threshold: 差异阈值(0-1),默认 1% + + Returns: + { + "match": bool, # 是否匹配 + "diff_percent": float, # 差异百分比 + "diff_image": str, # 差异图路径(如果有差异) + } + """ + from .utils.visual_regression import VisualRegression + + vr = VisualRegression() + img = self.browser.screenshot_base64() + return vr.compare(name, img, threshold) + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() diff --git a/src/reporter/__init__.py b/src/reporter/__init__.py new file mode 100644 index 0000000..944f769 --- /dev/null +++ b/src/reporter/__init__.py @@ -0,0 +1,4 @@ +# Reporter module - Test report generation +from .generator import ReportGenerator + +__all__ = ["ReportGenerator"] diff --git a/src/reporter/generator.py b/src/reporter/generator.py new file mode 100644 index 0000000..3c89dcd --- /dev/null +++ b/src/reporter/generator.py @@ -0,0 +1,207 @@ +""" +Report Generator - Generate HTML test reports +""" +from pathlib import Path +from datetime import datetime +from typing import List, Dict, Any +import json +import logging + +logger = logging.getLogger(__name__) + + +class ReportGenerator: + """Generates HTML test reports with embedded screenshots""" + + def __init__(self, output_dir: str = "./reports"): + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def generate(self, test_name: str, actions: List[Dict], + screenshots: List[str] = None) -> Path: + """Generate HTML report""" + html = self._build_html(test_name, actions, screenshots or []) + + filename = f"{test_name}_{datetime.now():%Y%m%d_%H%M%S}.html" + filepath = self.output_dir / filename + filepath.write_text(html, encoding="utf-8") + + # 同时保存 JSON 结果 + self._save_json_result(test_name, actions, filepath) + + return filepath + + def _save_json_result(self, test_name: str, actions: List[Dict], + report_path: Path) -> None: + """Save test results as JSON for persistence""" + result = { + "test_name": test_name, + "timestamp": datetime.now().isoformat(), + "report_path": str(report_path), + "total_actions": len(actions), + "passed": sum(1 for a in actions if a.get("success")), + "failed": sum(1 for a in actions if not a.get("success")), + "actions": [ + { + "action": a.get("action", {}), + "success": a.get("success", False), + "error": a.get("error"), + } + for a in actions + ] + } + + json_path = report_path.with_suffix(".json") + json_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + logger.info(f"测试结果已保存: {json_path}") + + def _build_html(self, test_name: str, actions: List[Dict], + screenshots: List[str]) -> str: + """Build HTML content with embedded screenshots""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + total = len(actions) + passed = sum(1 for a in actions if a.get("success")) + + # Build actions HTML + actions_html = "" + for i, action in enumerate(actions, 1): + action_data = action.get("action", {}) + action_type = action_data.get("action", "unknown") + target = action_data.get("target", "") + success = action.get("success", False) + error = action.get("error", "") + screenshot = action.get("screenshot", "") + + status_class = "success" if success else "failed" + status_icon = "✅" if success else "❌" + + screenshot_html = "" + if screenshot: + screenshot_html = f''' +
+ Step {i} screenshot +
''' + + error_html = f'
错误: {error}
' if error else "" + + actions_html += f''' +
+
+ 步骤 {i} + {action_type} + {status_icon} +
+
+ {f'目标: {target}' if target else ''} +
+ {error_html} + {screenshot_html} +
''' + + return f''' + + + + + {test_name} - 测试报告 + + + +
+
+

📋 {test_name}

+

生成时间: {timestamp}

+
+ +
+
+
{total}
+
总步骤
+
+
+
{passed}
+
通过
+
+
+
{total - passed}
+
失败
+
+
+ +

执行步骤

+ {actions_html} +
+ +''' diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..feddb93 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1 @@ +# Utils module diff --git a/src/utils/logging_config.py b/src/utils/logging_config.py new file mode 100644 index 0000000..eb48002 --- /dev/null +++ b/src/utils/logging_config.py @@ -0,0 +1,82 @@ +""" +Logging Configuration - 结构化日志配置 +""" +import logging +import sys +from typing import Optional +from pathlib import Path + + +def setup_logging( + level: str = "INFO", + log_file: Optional[str] = None, + format_style: str = "detailed" +) -> logging.Logger: + """ + 配置项目日志 + + Args: + level: 日志级别 (DEBUG, INFO, WARNING, ERROR) + log_file: 日志文件路径(可选) + format_style: 格式风格 ("simple" 或 "detailed") + + Returns: + 配置好的 logger + """ + # 日志格式 + if format_style == "simple": + log_format = "%(levelname)s: %(message)s" + else: + log_format = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" + + date_format = "%Y-%m-%d %H:%M:%S" + + # 创建根 logger + root_logger = logging.getLogger("ai_web_tester") + root_logger.setLevel(getattr(logging, level.upper(), logging.INFO)) + + # 清除现有 handlers + root_logger.handlers.clear() + + # 控制台输出 + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(logging.Formatter(log_format, date_format)) + root_logger.addHandler(console_handler) + + # 文件输出(可选) + if log_file: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setFormatter(logging.Formatter(log_format, date_format)) + root_logger.addHandler(file_handler) + + return root_logger + + +def get_logger(name: str) -> logging.Logger: + """ + 获取子 logger + + Args: + name: 模块名称 + + Returns: + 子 logger + """ + return logging.getLogger(f"ai_web_tester.{name}") + + +# 默认初始化 +_default_logger = None + + +def init_default_logging(): + """初始化默认日志配置""" + global _default_logger + if _default_logger is None: + import os + level = os.getenv("LOG_LEVEL", "INFO") + log_file = os.getenv("LOG_FILE") + _default_logger = setup_logging(level=level, log_file=log_file) + return _default_logger diff --git a/src/utils/visual_regression.py b/src/utils/visual_regression.py new file mode 100644 index 0000000..5682ea9 --- /dev/null +++ b/src/utils/visual_regression.py @@ -0,0 +1,176 @@ +""" +Visual Regression Testing - 视觉回归测试模块 +""" +from pathlib import Path +from typing import Dict, Any, Optional, Tuple +import base64 +import logging +from datetime import datetime + +try: + from PIL import Image, ImageChops, ImageDraw + HAS_PIL = True +except ImportError: + HAS_PIL = False + +logger = logging.getLogger(__name__) + + +class VisualRegression: + """视觉回归测试 - 基线对比""" + + def __init__(self, baseline_dir: str = "./baselines"): + """ + 初始化视觉回归测试 + + Args: + baseline_dir: 基线截图存储目录 + """ + if not HAS_PIL: + raise ImportError("视觉回归测试需要 Pillow 库。请运行: pip install Pillow") + + self.baseline_dir = Path(baseline_dir) + self.baseline_dir.mkdir(parents=True, exist_ok=True) + self.diff_dir = self.baseline_dir / "diffs" + self.diff_dir.mkdir(parents=True, exist_ok=True) + + def save_baseline(self, name: str, screenshot_base64: str) -> Path: + """ + 保存基线截图 + + Args: + name: 基线名称(不含扩展名) + screenshot_base64: Base64 编码的截图 + + Returns: + 保存的文件路径 + """ + filepath = self.baseline_dir / f"{name}.png" + img_data = base64.b64decode(screenshot_base64) + filepath.write_bytes(img_data) + logger.info(f"基线截图已保存: {filepath}") + return filepath + + def has_baseline(self, name: str) -> bool: + """检查基线是否存在""" + return (self.baseline_dir / f"{name}.png").exists() + + def compare( + self, + name: str, + screenshot_base64: str, + threshold: float = 0.01 + ) -> Dict[str, Any]: + """ + 与基线对比 + + Args: + name: 基线名称 + screenshot_base64: 当前截图的 Base64 编码 + threshold: 差异阈值(0-1),超过此值视为失败 + + Returns: + { + "match": bool, # 是否匹配 + "diff_percent": float, # 差异百分比 + "diff_image": str, # 差异图路径(如果有差异) + "baseline_path": str, # 基线路径 + } + """ + baseline_path = self.baseline_dir / f"{name}.png" + + if not baseline_path.exists(): + return { + "match": False, + "error": f"基线不存在: {name}", + "diff_percent": 1.0, + } + + # 加载基线图片 + baseline_img = Image.open(baseline_path).convert("RGB") + + # 解码当前截图 + current_data = base64.b64decode(screenshot_base64) + current_img = Image.open(__import__("io").BytesIO(current_data)).convert("RGB") + + # 调整尺寸(如果不同) + if baseline_img.size != current_img.size: + current_img = current_img.resize(baseline_img.size, Image.Resampling.LANCZOS) + + # 计算差异 + diff = ImageChops.difference(baseline_img, current_img) + diff_percent = self._calculate_diff_percent(diff) + + match = diff_percent <= threshold + + result = { + "match": match, + "diff_percent": diff_percent, + "baseline_path": str(baseline_path), + "threshold": threshold, + } + + # 如果有差异,生成差异图 + if not match: + diff_image_path = self._generate_diff_image( + name, baseline_img, current_img, diff + ) + result["diff_image"] = str(diff_image_path) + logger.warning(f"视觉差异检测: {name} - {diff_percent*100:.2f}% 不同") + else: + logger.info(f"视觉匹配: {name} - {diff_percent*100:.2f}% 差异(在阈值内)") + + return result + + def _calculate_diff_percent(self, diff: Image.Image) -> float: + """计算差异百分比""" + # 统计非零像素 + diff_data = diff.getdata() + total_pixels = len(diff_data) + diff_pixels = sum(1 for pixel in diff_data if sum(pixel) > 30) + return diff_pixels / total_pixels + + def _generate_diff_image( + self, + name: str, + baseline: Image.Image, + current: Image.Image, + diff: Image.Image + ) -> Path: + """生成差异高亮图""" + # 创建并排对比图 + width = baseline.width * 3 + height = baseline.height + comparison = Image.new("RGB", (width, height)) + + # 左:基线,中:当前,右:差异 + comparison.paste(baseline, (0, 0)) + comparison.paste(current, (baseline.width, 0)) + + # 增强差异可见性 + enhanced_diff = diff.point(lambda x: min(255, x * 5)) + comparison.paste(enhanced_diff, (baseline.width * 2, 0)) + + # 添加标签 + draw = ImageDraw.Draw(comparison) + draw.rectangle([(0, 0), (150, 25)], fill="black") + draw.text((5, 5), "BASELINE", fill="white") + draw.rectangle([(baseline.width, 0), (baseline.width + 150, 25)], fill="black") + draw.text((baseline.width + 5, 5), "CURRENT", fill="white") + draw.rectangle([(baseline.width * 2, 0), (baseline.width * 2 + 150, 25)], fill="red") + draw.text((baseline.width * 2 + 5, 5), "DIFF", fill="white") + + # 保存 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + diff_path = self.diff_dir / f"{name}_diff_{timestamp}.png" + comparison.save(diff_path) + + return diff_path + + def update_baseline(self, name: str, screenshot_base64: str) -> Path: + """更新基线(覆盖现有基线)""" + return self.save_baseline(name, screenshot_base64) + + def list_baselines(self) -> list: + """列出所有基线""" + return [f.stem for f in self.baseline_dir.glob("*.png")] diff --git a/src/vision/__init__.py b/src/vision/__init__.py new file mode 100644 index 0000000..9d8ff32 --- /dev/null +++ b/src/vision/__init__.py @@ -0,0 +1,5 @@ +# Vision module - AI-powered page analysis +from .analyzer import PageAnalyzer +from .models import VisionModel, ClaudeVision, OpenAIVision + +__all__ = ["PageAnalyzer", "VisionModel", "ClaudeVision", "OpenAIVision"] diff --git a/src/vision/analyzer.py b/src/vision/analyzer.py new file mode 100644 index 0000000..e167467 --- /dev/null +++ b/src/vision/analyzer.py @@ -0,0 +1,66 @@ +""" +Page Analyzer - AI-powered page understanding +""" +from typing import Dict, Any, List, Optional +from .models import VisionModel, ClaudeVision, OpenAIVision + + +class PageAnalyzer: + """Analyzes web pages using AI vision models""" + + def __init__(self, model: str = "claude"): + self.model = self._create_model(model) + + def _create_model(self, model_name: str) -> VisionModel: + if model_name == "claude": + return ClaudeVision() + elif model_name == "openai": + return OpenAIVision() + else: + raise ValueError(f"Unknown model: {model_name}") + + def analyze_page(self, image_base64: str) -> Dict[str, Any]: + """Analyze page structure and content""" + prompt = """Analyze this webpage screenshot and provide: +1. Page type (login, form, list, dashboard, etc.) +2. Main interactive elements (buttons, inputs, links) +3. Current page state +4. Key content areas + +Return as JSON format.""" + + response = self.model.analyze(image_base64, prompt) + return {"raw_analysis": response} + + def find_element(self, image_base64: str, description: str) -> Dict[str, Any]: + """Find element by natural language description""" + prompt = f"""Find the element described as: "{description}" + +Return the element's approximate location as JSON: +{{ + "found": true/false, + "x": center_x_coordinate, + "y": center_y_coordinate, + "description": "what you found" +}}""" + + response = self.model.analyze(image_base64, prompt) + return {"raw_response": response} + + def get_next_action(self, image_base64: str, goal: str) -> Dict[str, Any]: + """Suggest next action to achieve goal""" + prompt = f"""Goal: {goal} + +What is the best next action on this page? +Return as JSON: +{{ + "action": "click/type/scroll/wait", + "target": "element description", + "x": x_coordinate (if click), + "y": y_coordinate (if click), + "text": "text to type" (if type), + "reason": "why this action" +}}""" + + response = self.model.analyze(image_base64, prompt) + return {"raw_response": response} diff --git a/src/vision/models.py b/src/vision/models.py new file mode 100644 index 0000000..4910870 --- /dev/null +++ b/src/vision/models.py @@ -0,0 +1,273 @@ +""" +Vision Models - Multi-model adapter for AI vision capabilities +""" +from abc import ABC, abstractmethod +from typing import Optional, Callable +from functools import wraps +import os +import time +import logging + +# 自动加载 .env 文件 +from dotenv import load_dotenv +load_dotenv() + +# 配置日志 +logger = logging.getLogger(__name__) + + +# ============================================================ +# 重试装饰器 +# ============================================================ + +def retry_with_backoff( + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 30.0, + exceptions: tuple = (Exception,) +) -> Callable: + """ + 指数退避重试装饰器 + + Args: + max_retries: 最大重试次数 + base_delay: 初始延迟(秒) + max_delay: 最大延迟(秒) + exceptions: 需要重试的异常类型 + """ + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + retries = int(os.getenv("API_MAX_RETRIES", max_retries)) + delay = base_delay + last_exception = None + + for attempt in range(retries + 1): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < retries: + logger.warning( + f"API 调用失败 (尝试 {attempt + 1}/{retries + 1}): {e}. " + f"{delay:.1f}秒后重试..." + ) + time.sleep(delay) + delay = min(delay * 2, max_delay) + else: + logger.error(f"API 调用失败,已达最大重试次数: {e}") + + raise last_exception + return wrapper + return decorator + + +# ============================================================ +# 配置验证 +# ============================================================ + +class ConfigurationError(Exception): + """配置错误异常""" + pass + + +def validate_api_config(provider: str = "anthropic") -> dict: + """ + 验证 API 配置是否正确 + + Args: + provider: API 提供商 ("anthropic" 或 "openai") + + Returns: + 配置信息字典 + + Raises: + ConfigurationError: 配置无效时抛出 + """ + if provider == "anthropic": + api_key = os.getenv("ANTHROPIC_API_KEY") + if not api_key: + raise ConfigurationError( + "未设置 ANTHROPIC_API_KEY 环境变量。\n" + "请复制 .env.example 为 .env 并填入 API Key。" + ) + return { + "api_key": api_key, + "base_url": os.getenv("ANTHROPIC_BASE_URL"), + "model": os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-20250514"), + "timeout": int(os.getenv("API_TIMEOUT", 60)), + } + elif provider == "openai": + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise ConfigurationError( + "未设置 OPENAI_API_KEY 环境变量。\n" + "请复制 .env.example 为 .env 并填入 API Key。" + ) + return { + "api_key": api_key, + "base_url": os.getenv("OPENAI_BASE_URL"), + "model": os.getenv("OPENAI_MODEL", "gpt-4o"), + "timeout": int(os.getenv("API_TIMEOUT", 60)), + } + else: + raise ConfigurationError(f"未知的 API 提供商: {provider}") + + +def test_api_connection(provider: str = "anthropic") -> bool: + """ + 测试 API 连接是否正常 + + Args: + provider: API 提供商 + + Returns: + 连接是否成功 + """ + try: + config = validate_api_config(provider) + if provider == "anthropic": + from anthropic import Anthropic + client_kwargs = {"api_key": config["api_key"]} + if config["base_url"]: + client_kwargs["base_url"] = config["base_url"] + client = Anthropic(**client_kwargs) + # 简单测试 + client.messages.create( + model=config["model"], + max_tokens=10, + messages=[{"role": "user", "content": "Hi"}] + ) + elif provider == "openai": + from openai import OpenAI + client_kwargs = {"api_key": config["api_key"]} + if config["base_url"]: + client_kwargs["base_url"] = config["base_url"] + client = OpenAI(**client_kwargs) + client.chat.completions.create( + model=config["model"], + max_tokens=10, + messages=[{"role": "user", "content": "Hi"}] + ) + logger.info(f"API 连接测试成功: {provider}") + return True + except Exception as e: + logger.error(f"API 连接测试失败: {e}") + return False + + +# ============================================================ +# Vision Models +# ============================================================ + +class VisionModel(ABC): + """Abstract base class for vision models""" + + @abstractmethod + def analyze(self, image_base64: str, prompt: str) -> str: + """Analyze image and return response""" + pass + + +class ClaudeVision(VisionModel): + """Claude API implementation""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: Optional[str] = None, + timeout: Optional[int] = None + ): + config = validate_api_config("anthropic") + self.api_key = api_key or config["api_key"] + self.base_url = base_url or config["base_url"] + self.model = model or config["model"] + self.timeout = timeout or config["timeout"] + self._client = None + + @property + def client(self): + if self._client is None: + from anthropic import Anthropic + client_kwargs = { + "api_key": self.api_key, + "timeout": self.timeout + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + self._client = Anthropic(**client_kwargs) + return self._client + + @retry_with_backoff(max_retries=3, base_delay=1.0) + def analyze(self, image_base64: str, prompt: str) -> str: + response = self.client.messages.create( + model=self.model, + max_tokens=4096, + messages=[{ + "role": "user", + "content": [ + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/png", + "data": image_base64, + }, + }, + {"type": "text", "text": prompt} + ], + }], + ) + return response.content[0].text + + +class OpenAIVision(VisionModel): + """OpenAI GPT-4V implementation""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: Optional[str] = None, + timeout: Optional[int] = None + ): + config = validate_api_config("openai") + self.api_key = api_key or config["api_key"] + self.base_url = base_url or config["base_url"] + self.model = model or config["model"] + self.timeout = timeout or config["timeout"] + self._client = None + + @property + def client(self): + if self._client is None: + from openai import OpenAI + client_kwargs = { + "api_key": self.api_key, + "timeout": self.timeout + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + self._client = OpenAI(**client_kwargs) + return self._client + + @retry_with_backoff(max_retries=3, base_delay=1.0) + def analyze(self, image_base64: str, prompt: str) -> str: + response = self.client.chat.completions.create( + model=self.model, + max_tokens=4096, + messages=[{ + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{image_base64}" + }, + }, + {"type": "text", "text": prompt} + ], + }], + ) + return response.choices[0].message.content diff --git a/tests/test_cases.py b/tests/test_cases.py new file mode 100644 index 0000000..bafc90c --- /dev/null +++ b/tests/test_cases.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +测试用例模板 - 快速设计和运行多个测试(支持并行执行) +""" +import sys +sys.path.insert(0, ".") + +from src import WebTester +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Dict, Any +import time + + +# ============================================================ +# 测试用例配置 +# ============================================================ + +TEST_CASES = [ + { + "name": "Example.com 链接测试", + "url": "http://47.99.105.253:8084", + "goal": "填入账号admin 密码password,登录成功", + }, + # 添加更多测试用例... +] + + +# ============================================================ +# 测试执行器 +# ============================================================ + +def run_single_case(case: Dict[str, Any], model: str = "claude", + headless: bool = True) -> Dict[str, Any]: + """运行单个测试用例(独立浏览器实例)""" + name = case.get("name", "Unknown") + url = case["url"] + goal = case["goal"] + + result = { + "name": name, + "url": url, + "goal": goal, + "status": "failed", + } + + try: + with WebTester(model=model, headless=headless) as tester: + tester.goto(url) + test_result = tester.test(goal) + result["status"] = "passed" + result["steps"] = test_result["steps"] + result["report"] = test_result["report"] + except Exception as e: + result["error"] = str(e) + + return result + + +def run_tests(model: str = "claude", headless: bool = False): + """串行运行所有测试用例""" + results = [] + + with WebTester(model=model, headless=headless) as tester: + for i, case in enumerate(TEST_CASES, 1): + name = case.get("name", f"Test {i}") + url = case["url"] + goal = case["goal"] + + print(f"\n{'='*60}") + print(f"🧪 [{i}/{len(TEST_CASES)}] {name}") + print(f" URL: {url}") + print(f" Goal: {goal}") + print(f"{'='*60}") + + try: + tester.goto(url) + result = tester.test(goal) + + # 检查所有步骤是否成功 + all_passed = all(r.get("success", False) for r in result.get("results", [])) + failed_count = sum(1 for r in result.get("results", []) if not r.get("success", False)) + + if all_passed: + print(f"✅ 完成: {result['steps']} 步骤") + status = "passed" + else: + print(f"⚠️ 部分失败: {failed_count}/{result['steps']} 步骤失败") + status = "failed" + + print(f"📄 报告: {result['report']}") + + results.append({ + "name": name, + "status": status, + "steps": result["steps"], + "report": result["report"], + }) + except Exception as e: + print(f"❌ 失败: {e}") + results.append({ + "name": name, + "status": "failed", + "error": str(e), + }) + + _print_summary(results) + return results + + +def run_tests_parallel(model: str = "claude", max_workers: int = 3): + """ + 并行运行所有测试用例 + + Args: + model: AI 模型 + max_workers: 最大并行数(默认 3) + """ + print(f"\n🚀 并行模式启动 (workers={max_workers})") + print(f"📋 待执行测试: {len(TEST_CASES)} 个\n") + + results = [] + start_time = time.time() + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # 提交所有任务 + future_to_case = { + executor.submit(run_single_case, case, model, True): case + for case in TEST_CASES + } + + # 收集结果 + for future in as_completed(future_to_case): + case = future_to_case[future] + try: + result = future.result() + status = "✅" if result["status"] == "passed" else "❌" + print(f"{status} {result['name']}") + results.append(result) + except Exception as e: + print(f"❌ {case['name']}: {e}") + results.append({ + "name": case["name"], + "status": "failed", + "error": str(e), + }) + + elapsed = time.time() - start_time + print(f"\n⏱️ 总耗时: {elapsed:.1f}秒") + + _print_summary(results) + return results + + +def _print_summary(results: List[Dict[str, Any]]): + """打印测试总结""" + print(f"\n{'='*60}") + print("📊 测试总结") + print(f"{'='*60}") + passed = sum(1 for r in results if r["status"] == "passed") + failed = len(results) - passed + print(f"✅ 通过: {passed}") + print(f"❌ 失败: {failed}") + if results: + print(f"📈 通过率: {passed/len(results)*100:.1f}%") + + +def run_single_test(url: str, goal: str, model: str = "claude"): + """运行单个测试""" + with WebTester(model=model) as tester: + tester.goto(url) + result = tester.test(goal) + print(f"✅ 完成: {result['steps']} 步骤") + print(f"📄 报告: {result['report']}") + return result + + +# ============================================================ +# 主入口 +# ============================================================ + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="AI Web Tester - 测试用例运行器") + parser.add_argument("--url", help="单个测试的 URL") + parser.add_argument("--goal", help="单个测试的目标描述") + parser.add_argument("--model", default="claude", choices=["claude", "openai"], help="AI 模型") + parser.add_argument("--headless", action="store_true", help="无头模式运行") + parser.add_argument("--parallel", action="store_true", help="并行执行测试") + parser.add_argument("--workers", type=int, default=3, help="并行工作线程数") + + args = parser.parse_args() + + if args.url and args.goal: + run_single_test(args.url, args.goal, args.model) + elif args.parallel: + run_tests_parallel(model=args.model, max_workers=args.workers) + else: + run_tests(model=args.model, headless=args.headless)