#!/usr/bin/env python3 """ 测试用例模板 - 快速设计和运行多个测试(支持并行执行) """ import sys sys.path.insert(0, ".") from src import WebTester from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any import time import re def _ensure_sidebar_open(tester: WebTester) -> None: try: page = tester.browser.page if not page: return # 检查是否已有侧边栏文本可见 markers = ("立项论证管理", "产品方案管理", "基础数据") visible = False for marker in markers: try: # 检查是否至少有一个 marker 在页面上可见 loc = page.get_by_text(marker, exact=False) if loc.count() > 0 and any(loc.nth(i).is_visible() for i in range(loc.count())): visible = True break except Exception: continue if visible: return print(" 📂 侧边栏未发现,尝试点击菜单切换按钮...") # 尝试点击左上角的常见切换图标位置,或者特定的菜单按钮 toggle_selectors = [ ".anticon-menu-fold", ".anticon-menu-unfold", ".el-icon-menu", ".toggle-sidebar", ".ant-layout-sider-trigger" ] found_toggle = False for sel in toggle_selectors: try: btn = page.locator(sel) if btn.count() > 0 and btn.first.is_visible(): btn.first.click() found_toggle = True break except Exception: continue if not found_toggle: # 记录尝试点击坐标 tester.browser.click_at(30, 30) tester.browser.wait(500) tester.browser.click_at(80, 25) # 顶部面包屑左侧 tester.browser.wait(1500) except Exception: return def _is_logged_in(tester: WebTester) -> bool: try: page = tester.browser.page if not page: return False url = (page.url or "").lower() if "#/auth/login" in url or "/auth/login" in url or "login" in url: return False # DOM marker: if login form is present, consider not logged-in even if URL doesn't include login try: if page.locator("input[type='password']").count() > 0: if page.get_by_text("登录", exact=False).count() > 0: return False except Exception: pass for marker in ("分析概览", "待办事项", "系统管理"): try: if page.get_by_text(marker, exact=False).count() > 0: return True except Exception: continue # If we can't find dashboard markers, fall back to a weaker heuristic try: if page.locator("input[type='password']").count() > 0: return False except Exception: pass return True except Exception: return False def _do_login_dom(tester: WebTester, username: str = "admin", password: str = "password") -> bool: page = tester.browser.page if not page: return False try: user_locators = [ "input[placeholder*='用户']", "input[placeholder*='账号']", "input[name*='user' i]", "input[name*='account' i]", ] pwd_locators = [ "input[type='password']", "input[placeholder*='密码']", ] user_el = None for sel in user_locators: loc = page.locator(sel) if loc.count() > 0: user_el = loc.first break if user_el is None: # fall back to the first visible text input loc = page.locator("input[type='text'], input:not([type])") if loc.count() > 0: user_el = loc.first pwd_el = None for sel in pwd_locators: loc = page.locator(sel) if loc.count() > 0: pwd_el = loc.first break if user_el is None or pwd_el is None: return False user_el.fill(username) pwd_el.fill(password) clicked = False for text in ("登录", "登 录"): try: btn = page.get_by_role("button", name=re.compile(text)) if btn.count() > 0: btn.first.click() clicked = True break except Exception: pass try: btn = page.get_by_text(text, exact=False) if btn.count() > 0: btn.first.click() clicked = True break except Exception: pass if not clicked: return False tester.browser.wait_for_load_state("networkidle", timeout=15000) for _ in range(20): if _is_logged_in(tester): return True tester.browser.wait(500) return _is_logged_in(tester) except Exception: return False def _ensure_logged_in(tester: WebTester, login_goal: str = "填入账号admin 密码password,登录成功") -> bool: if _is_logged_in(tester): return True if _do_login_dom(tester): return True try: tester.test(login_goal) tester.browser.wait(1500) except Exception: pass return _is_logged_in(tester) def _run_hybrid_steps(tester: WebTester, steps: List[Dict[str, Any]]) -> Dict[str, Any]: step_results: List[Dict[str, Any]] = [] for idx, step in enumerate(steps, 1): action = step.get("action") if action == "goal": _ensure_sidebar_open(tester) before_url = tester.browser.page.url r = tester.test(step.get("goal", "")) after_url = tester.browser.page.url step_results.append({ "step": idx, "action": action, "goal": step.get("goal", ""), "result": r, "url_changed": before_url != after_url }) all_passed = all(x.get("success", False) for x in r.get("results", [])) # 记录警告 if not after_url or before_url == after_url: if any(kw in step.get("goal", "") for kw in ("点击菜单", "跳转", "进入", "研制方案", "项目")): print(f" ⚠️ 警告: 执行 '{step.get('goal', '')}' 后 URL 似乎没有变化") if not all_passed: return {"passed": False, "step_results": step_results} elif action == "explore": _ensure_sidebar_open(tester) try: r = tester.explore(step.get("config", {})) step_results.append({"step": idx, "action": action, "result": r}) # 如果探索过程中没有任何操作成功,或者发生了严重错误,可以考虑判定为失败 if not r.get("action_log") and not r.get("discovered_elements"): return {"passed": False, "step_results": step_results, "error": "探索未发现任何元素或未执行任何操作"} except Exception as e: step_results.append({"step": idx, "action": action, "error": str(e)}) return {"passed": False, "step_results": step_results, "error": str(e)} elif action == "wait": duration = step.get("duration", 1000) tester.browser.wait(duration) step_results.append({"step": idx, "action": action, "duration": duration, "result": {"success": True}}) else: step_results.append({"step": idx, "action": action, "result": {"success": False, "error": f"unknown action: {action}"}}) return {"passed": False, "step_results": step_results} return {"passed": True, "step_results": step_results} # ============================================================ # 测试用例配置 # ============================================================ TEST_CASES = [ # 探索模式: AI 自主发现功能 { "name": "登录后深度探索", "url": "http://47.99.105.253:8084", "mode": "explore", "config": { "max_depth": 20, # 探索深度 "max_clicks": 2000, # 总点击次数 "require_login": { # 探索前的先决条件:登录 "goal": "填入账号admin 密码password,登录成功" }, "focus_patterns": ["管理", "设置", "新增"], # 引导 AI 重点测试这些关键词 "dangerous_patterns": ["删除", "退出", "注销"] # 发现但不点击,防止测试中断 } }, # 业务流程测试: 技术协议评审完整流程 { "name": "技术协议评审流程测试", "url": "http://47.99.105.253:8084", "mode": "hybrid", "steps": [ {"action": "goal", "goal": "点击立项论证管理菜单"}, {"action": "goal", "goal": "点击项目输入子菜单"}, {"action": "goal", "goal": "点击1技术协议及科研合同评审记录"}, {"action": "goal", "goal": "点击新增按钮"}, # 切换到智能探索模式完成表单填写和提交 {"action": "explore", "config": {"max_clicks": 30, "max_depth": 1}}, {"action": "wait", "duration": 2000}, {"action": "goal", "goal": "点击提交按钮"}, {"action": "wait", "duration": 2000}, {"action": "goal", "goal": "点击待办事项"}, {"action": "goal", "goal": "点击最新提交的个人待办记录"}, {"action": "goal", "goal": "点击审核/处理按钮"}, {"action": "explore", "config": {"max_clicks": 15, "max_depth": 1}}, # 智能处理审核表单 ] }, # 业务流程测试: 产品方案管理流程 { "name": "产品方案管理流程测试", "url": "http://47.99.105.253:8084", "mode": "hybrid", "steps": [ {"action": "goal", "goal": "点击一级菜单'产品方案管理'"}, {"action": "wait", "duration": 1000}, {"action": "goal", "goal": "展开二级分类'研制方案'"}, {"action": "wait", "duration": 1000}, {"action": "goal", "goal": "点击菜单项'研制方案'"}, {"action": "wait", "duration": 1000}, {"action": "goal", "goal": "点击新增按钮"}, # 使用探索模式完成表单填写和提交 {"action": "explore", "config": {"max_clicks": 20, "max_depth": 1}}, {"action": "wait", "duration": 2000}, {"action": "goal", "goal": "点击待办事项"}, {"action": "goal", "goal": "点击审批详情"}, {"action": "explore", "config": {"max_clicks": 10, "max_depth": 1}}, ] }, # 混合模式: 先执行目标,再探索 # { # "name": "登录后探索", # "url": "http://47.99.105.253:8084", # "mode": "hybrid", # "steps": [ # {"action": "goal", "goal": "登录"}, # {"action": "explore", "config": {"max_clicks": 10}} # ] # }, ] # ============================================================ # 测试执行器 # ============================================================ def run_single_case(case: Dict[str, Any], model: str = "claude", headless: bool = True) -> Dict[str, Any]: """运行单个测试用例(独立浏览器实例)""" name = case.get("name", "Unknown") url = case["url"] mode = case.get("mode", "goal") result = { "name": name, "url": url, "mode": mode, "status": "failed", } try: with WebTester(model=model, headless=headless) as tester: tester.goto(url) # Always attempt deterministic login first (single-case mode should be robust) _ensure_logged_in(tester) # 检查是否需要登录(适用于所有模式) login_needed = False login_goal = None # 检查配置中是否需要登录 if mode == "explore": config = case.get("config", {}) require_login = config.get("require_login") if require_login: login_goal = require_login.get("goal", "") login_needed = True elif mode == "hybrid": # 检查步骤中是否包含登录 for step in case.get("steps", []): if "登录" in step.get("goal", "") or "admin" in step.get("goal", ""): login_needed = False # 步骤中已包含登录 break else: # 如果步骤中没有登录,但系统可能需要登录 login_goal = "填入账号admin 密码password,登录成功" login_needed = True elif mode == "goal": # 目标模式检查是否需要登录 goal = case.get("goal", "") if "登录" not in goal and "admin" not in goal: login_goal = "填入账号admin 密码password,登录成功" login_needed = True # 执行登录 if login_needed and login_goal: if not _is_logged_in(tester): _ensure_logged_in(tester, login_goal) _ensure_sidebar_open(tester) if mode == "goal": # 目标模式 goal = case.get("goal", "") test_result = tester.test(goal) result["status"] = "passed" result["steps"] = test_result["steps"] result["report"] = test_result["report"] elif mode == "explore": # 探索模式 config = case.get("config", {}) # 如果需要先登录 require_login = config.pop("require_login", None) if require_login: login_goal = require_login.get("goal", "") if login_goal: tester.test(login_goal) tester.browser.wait(1000) # 执行探索 explore_result = tester.explore(config) result["status"] = "passed" result["elements"] = len(explore_result.get("discovered_elements", [])) result["bugs"] = len(explore_result.get("bug_list", [])) result["report"] = explore_result.get("report", "") elif mode == "hybrid": # 混合模式 hybrid_res = _run_hybrid_steps(tester, case.get("steps", [])) result["status"] = "passed" if hybrid_res.get("passed") else "failed" result["hybrid"] = hybrid_res except Exception as e: result["error"] = str(e) return result def run_tests(model: str = "claude", headless: bool = False, cases: List[Dict[str, Any]] = None): """串行运行所有测试用例""" results = [] selected_cases = cases if cases is not None else TEST_CASES with WebTester(model=model, headless=headless) as tester: for i, case in enumerate(selected_cases, 1): name = case.get("name", f"Test {i}") url = case["url"] mode = case.get("mode", "goal") print(f"\n{'='*60}") print(f"🧪 [{i}/{len(selected_cases)}] {name}") print(f" URL: {url}") print(f" Mode: {mode}") print(f"{'='*60}") try: tester.goto(url) _ensure_logged_in(tester) # 检查是否需要登录(适用于所有模式) login_needed = False login_goal = None # 检查配置中是否需要登录 if mode == "explore": config = case.get("config", {}).copy() require_login = config.pop("require_login", None) if require_login: login_goal = require_login.get("goal", "") login_needed = True elif mode == "hybrid": # 检查步骤中是否包含登录 for step in case.get("steps", []): if "登录" in step.get("goal", "") or "admin" in step.get("goal", ""): login_needed = False # 步骤中已包含登录 break else: # 如果步骤中没有登录,但系统可能需要登录 login_goal = "填入账号admin 密码password,登录成功" login_needed = True elif mode == "goal": # 目标模式检查是否需要登录 goal = case.get("goal", "") if "登录" not in goal and "admin" not in goal: login_goal = "填入账号admin 密码password,登录成功" login_needed = True # 执行登录 if login_needed and login_goal: print(f" 🔎 检查登录状态...") if not _is_logged_in(tester): ok = _ensure_logged_in(tester, login_goal) if ok: print(f" ✅ 登录成功") else: print(f" ⚠️ 登录未确认成功,继续执行") else: print(f" ✅ 已处于登录状态,跳过登录步骤") _ensure_sidebar_open(tester) if mode == "goal": goal = case.get("goal", "") result = tester.test(goal) # 检查所有步骤是否成功 all_passed = all(r.get("success", False) for r in result.get("results", [])) failed_count = sum(1 for r in result.get("results", []) if not r.get("success", False)) if all_passed: print(f"✅ 完成: {result['steps']} 步骤") status = "passed" else: print(f"⚠️ 部分失败: {failed_count}/{result['steps']} 步骤失败") first_failed = next((r for r in result.get("results", []) if not r.get("success", False)), None) if first_failed: action_type = first_failed.get("action_type") or first_failed.get("action") or "unknown" target = first_failed.get("target") or first_failed.get("element") or first_failed.get("description") or "" err = first_failed.get("error") or first_failed.get("reason") or "" print(f" ❌ 首个失败: {action_type} {target} {err}".strip()) status = "failed" print(f"📄 报告: {result['report']}") results.append({ "name": name, "status": status, "steps": result["steps"], "report": result["report"], }) elif mode == "explore": config = case.get("config", {}).copy() login_status = "skipped" login_error = None # 如果需要先登录 require_login = config.pop("require_login", None) if require_login: login_goal = require_login.get("goal", "") if login_goal: try: # 检查是否已登录 print(f" 🔎 检查登录状态...") if _is_logged_in(tester): print(f" ✅ 已处于登录状态,跳过登录步骤") login_status = "already_logged_in" else: print(f" 🔐 执行登录: {login_goal}") login_result = tester.test(login_goal) tester.browser.wait(3000) # 检查登录结果 login_success = all(r.get("success", False) for r in login_result.get("results", [])) if login_success: login_status = "success" else: login_status = "partial_failure" # 登录部分失败,但仍继续探索 print(f" ⚠️ 登录步骤部分失败,继续尝试探索...") # 等待页面加载 print(f" 🔎 确认登录跳转结果...") tester.browser.wait_for_load_state("networkidle") tester.browser.wait(2000) except Exception as e: login_error = str(e) login_status = "error" print(f" ⚠️ 登录过程出错: {e},继续尝试探索...") # 无论登录结果如何,都执行探索 print(f" 🔍 开始功能探索...") try: result = tester.explore(config) elements = len(result.get("discovered_elements", [])) bugs = len(result.get("bug_list", [])) print(f"✅ 探索完成: 发现 {elements} 个元素, {bugs} 个问题") print(f"📄 报告: {result.get('report', '')}") # 根据登录状态决定最终状态 final_status = "passed" if login_status in ("success", "already_logged_in", "skipped") else "partial" results.append({ "name": name, "status": final_status, "login_status": login_status, "login_error": login_error, "elements": elements, "bugs": bugs, "report": result.get("report", ""), }) except Exception as e: print(f" ❌ 探索过程出错: {e}") results.append({ "name": name, "status": "failed", "login_status": login_status, "error": str(e), }) elif mode == "hybrid": try: hybrid_res = _run_hybrid_steps(tester, case.get("steps", [])) status = "passed" if hybrid_res.get("passed") else "failed" if status != "passed": step_results = hybrid_res.get("step_results", []) last = step_results[-1] if step_results else None if last: step_idx = last.get("step") action = last.get("action") goal_txt = last.get("goal", "") print(f" ❌ Hybrid 失败在 step {step_idx}: {action} {goal_txt}".strip()) results.append({ "name": name, "status": status, "mode": "hybrid", "hybrid": hybrid_res, }) except Exception as e: print(f"❌ 失败: {e}") results.append({ "name": name, "status": "failed", "mode": "hybrid", "error": str(e), }) except Exception as e: print(f"❌ 失败: {e}") results.append({ "name": name, "status": "failed", "error": str(e), }) _print_summary(results) return results def run_tests_parallel(model: str = "claude", max_workers: int = 3, cases: List[Dict[str, Any]] = None): """ 并行运行所有测试用例 Args: model: AI 模型 max_workers: 最大并行数(默认 3) """ print(f"\n🚀 并行模式启动 (workers={max_workers})") selected_cases = cases if cases is not None else TEST_CASES print(f"📋 待执行测试: {len(selected_cases)} 个\n") results = [] start_time = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务 future_to_case = {executor.submit(run_single_case, case, model, True): case for case in selected_cases} # 收集结果 for future in as_completed(future_to_case): case = future_to_case[future] try: result = future.result() status = "✅" if result["status"] == "passed" else "❌" print(f"{status} {result['name']}") results.append(result) except Exception as e: print(f"❌ {case['name']}: {e}") results.append({ "name": case["name"], "status": "failed", "error": str(e), }) elapsed = time.time() - start_time print(f"\n⏱️ 总耗时: {elapsed:.1f}秒") _print_summary(results) return results def _print_summary(results: List[Dict[str, Any]]): """打印测试总结""" print(f"\n{'='*60}") print("📊 测试总结") print(f"{'='*60}") passed = sum(1 for r in results if r["status"] == "passed") partial = sum(1 for r in results if r["status"] == "partial") failed = sum(1 for r in results if r["status"] == "failed") print(f"✅ 通过: {passed}") if partial > 0: print(f"⚠️ 部分通过: {partial}") print(f"❌ 失败: {failed}") if results: # 通过率计算将 partial 视为 0.5 rate = (passed + partial * 0.5) / len(results) * 100 print(f"📈 通过率: {rate:.1f}%") failed_cases = [r for r in results if r.get("status") == "failed"] if failed_cases: print("\n❌ 失败用例:") for r in failed_cases: name = r.get("name", "") report = r.get("report", "") err = r.get("error", "") line = f"- {name}" if report: line += f" | {report}" if err: line += f" | {err}" print(line) def run_single_test(url: str, goal: str, model: str = "claude", headless: bool = False): """运行单个测试""" case = {"name": goal[:30], "url": url, "mode": "goal", "goal": goal} result = run_single_case(case, model=model, headless=headless) if result.get("status") == "passed": print(f"✅ 完成") else: print(f"❌ 失败: {result.get('error', 'unknown')}") if result.get("report"): print(f"📄 报告: {result['report']}") return result def _select_cases(args) -> List[Dict[str, Any]]: cases: List[Dict[str, Any]] = TEST_CASES if getattr(args, "index", None) is not None: idx = int(args.index) if idx < 1 or idx > len(TEST_CASES): raise ValueError(f"index out of range: {idx}") cases = [TEST_CASES[idx - 1]] if getattr(args, "case", None): q = str(args.case).strip().lower() cases = [c for c in cases if q in str(c.get("name", "")).lower()] return cases # ============================================================ # 主入口 # ============================================================ if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="AI Web Tester - 测试用例运行器") parser.add_argument("--url", help="单个测试的 URL") parser.add_argument("--goal", help="单个测试的目标描述") parser.add_argument("--model", default="mimo", choices=["claude", "openai", "mimo", "glm"], help="AI 模型") parser.add_argument("--headless", action="store_true", help="无头模式运行") parser.add_argument("--parallel", action="store_true", help="并行执行测试") parser.add_argument("--workers", type=int, default=3, help="并行工作线程数") parser.add_argument("--list", action="store_true", help="列出内置测试用例") parser.add_argument("--case", help="按用例名子串筛选(大小写不敏感)") parser.add_argument("--index", type=int, help="按序号选择用例(从 1 开始,基于内置用例列表)") args = parser.parse_args() if args.list: for i, c in enumerate(TEST_CASES, 1): print(f"[{i}] {c.get('name', '')} ({c.get('mode', 'goal')})") sys.exit(0) selected_cases = _select_cases(args) if args.url and args.goal: run_single_test(args.url, args.goal, args.model, args.headless) elif args.parallel: run_tests_parallel(model=args.model, max_workers=args.workers, cases=selected_cases) else: run_tests(model=args.model, headless=args.headless, cases=selected_cases)