feat: 增强测试框架功能
Some checks failed
AI Web Tester CI / test (push) Has been cancelled

主要改进:
- 新增统一测试器 (universal_tester.py) 支持多种测试模式
- 优化测试报告生成器,支持汇总报告和操作截图
- 增强探索器 DFS 算法和状态指纹识别
- 新增智能测试配置 (smart_test.yaml)
- 改进 AI 模型集成 (GLM/Gemini 支持)
- 添加开发调试工具和文档
This commit is contained in:
empty
2026-01-05 20:23:02 +08:00
parent 3447ea340a
commit 1f1cc4db9a
31 changed files with 4631 additions and 770 deletions

View File

@@ -9,6 +9,233 @@ from src import WebTester
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
import time
import re
def _ensure_sidebar_open(tester: WebTester) -> None:
try:
page = tester.browser.page
if not page:
return
# 检查是否已有侧边栏文本可见
markers = ("立项论证管理", "产品方案管理", "基础数据")
visible = False
for marker in markers:
try:
# 检查是否至少有一个 marker 在页面上可见
loc = page.get_by_text(marker, exact=False)
if loc.count() > 0 and any(loc.nth(i).is_visible() for i in range(loc.count())):
visible = True
break
except Exception:
continue
if visible:
return
print(" 📂 侧边栏未发现,尝试点击菜单切换按钮...")
# 尝试点击左上角的常见切换图标位置,或者特定的菜单按钮
toggle_selectors = [
".anticon-menu-fold", ".anticon-menu-unfold",
".el-icon-menu", ".toggle-sidebar",
".ant-layout-sider-trigger"
]
found_toggle = False
for sel in toggle_selectors:
try:
btn = page.locator(sel)
if btn.count() > 0 and btn.first.is_visible():
btn.first.click()
found_toggle = True
break
except Exception:
continue
if not found_toggle:
# 记录尝试点击坐标
tester.browser.click_at(30, 30)
tester.browser.wait(500)
tester.browser.click_at(80, 25) # 顶部面包屑左侧
tester.browser.wait(1500)
except Exception:
return
def _is_logged_in(tester: WebTester) -> bool:
try:
page = tester.browser.page
if not page:
return False
url = (page.url or "").lower()
if "#/auth/login" in url or "/auth/login" in url or "login" in url:
return False
# DOM marker: if login form is present, consider not logged-in even if URL doesn't include login
try:
if page.locator("input[type='password']").count() > 0:
if page.get_by_text("登录", exact=False).count() > 0:
return False
except Exception:
pass
for marker in ("分析概览", "待办事项", "系统管理"):
try:
if page.get_by_text(marker, exact=False).count() > 0:
return True
except Exception:
continue
# If we can't find dashboard markers, fall back to a weaker heuristic
try:
if page.locator("input[type='password']").count() > 0:
return False
except Exception:
pass
return True
except Exception:
return False
def _do_login_dom(tester: WebTester, username: str = "admin", password: str = "password") -> bool:
page = tester.browser.page
if not page:
return False
try:
user_locators = [
"input[placeholder*='用户']",
"input[placeholder*='账号']",
"input[name*='user' i]",
"input[name*='account' i]",
]
pwd_locators = [
"input[type='password']",
"input[placeholder*='密码']",
]
user_el = None
for sel in user_locators:
loc = page.locator(sel)
if loc.count() > 0:
user_el = loc.first
break
if user_el is None:
# fall back to the first visible text input
loc = page.locator("input[type='text'], input:not([type])")
if loc.count() > 0:
user_el = loc.first
pwd_el = None
for sel in pwd_locators:
loc = page.locator(sel)
if loc.count() > 0:
pwd_el = loc.first
break
if user_el is None or pwd_el is None:
return False
user_el.fill(username)
pwd_el.fill(password)
clicked = False
for text in ("登录", "登 录"):
try:
btn = page.get_by_role("button", name=re.compile(text))
if btn.count() > 0:
btn.first.click()
clicked = True
break
except Exception:
pass
try:
btn = page.get_by_text(text, exact=False)
if btn.count() > 0:
btn.first.click()
clicked = True
break
except Exception:
pass
if not clicked:
return False
tester.browser.wait_for_load_state("networkidle", timeout=15000)
for _ in range(20):
if _is_logged_in(tester):
return True
tester.browser.wait(500)
return _is_logged_in(tester)
except Exception:
return False
def _ensure_logged_in(tester: WebTester, login_goal: str = "填入账号admin 密码password登录成功") -> bool:
if _is_logged_in(tester):
return True
if _do_login_dom(tester):
return True
try:
tester.test(login_goal)
tester.browser.wait(1500)
except Exception:
pass
return _is_logged_in(tester)
def _run_hybrid_steps(tester: WebTester, steps: List[Dict[str, Any]]) -> Dict[str, Any]:
step_results: List[Dict[str, Any]] = []
for idx, step in enumerate(steps, 1):
action = step.get("action")
if action == "goal":
_ensure_sidebar_open(tester)
before_url = tester.browser.page.url
r = tester.test(step.get("goal", ""))
after_url = tester.browser.page.url
step_results.append({
"step": idx,
"action": action,
"goal": step.get("goal", ""),
"result": r,
"url_changed": before_url != after_url
})
all_passed = all(x.get("success", False) for x in r.get("results", []))
# 记录警告
if not after_url or before_url == after_url:
if any(kw in step.get("goal", "") for kw in ("点击菜单", "跳转", "进入", "研制方案", "项目")):
print(f" ⚠️ 警告: 执行 '{step.get('goal', '')}' 后 URL 似乎没有变化")
if not all_passed:
return {"passed": False, "step_results": step_results}
elif action == "explore":
_ensure_sidebar_open(tester)
try:
r = tester.explore(step.get("config", {}))
step_results.append({"step": idx, "action": action, "result": r})
# 如果探索过程中没有任何操作成功,或者发生了严重错误,可以考虑判定为失败
if not r.get("action_log") and not r.get("discovered_elements"):
return {"passed": False, "step_results": step_results, "error": "探索未发现任何元素或未执行任何操作"}
except Exception as e:
step_results.append({"step": idx, "action": action, "error": str(e)})
return {"passed": False, "step_results": step_results, "error": str(e)}
elif action == "wait":
duration = step.get("duration", 1000)
tester.browser.wait(duration)
step_results.append({"step": idx, "action": action, "duration": duration, "result": {"success": True}})
else:
step_results.append({"step": idx, "action": action, "result": {"success": False, "error": f"unknown action: {action}"}})
return {"passed": False, "step_results": step_results}
return {"passed": True, "step_results": step_results}
# ============================================================
@@ -16,29 +243,66 @@ import time
# ============================================================
TEST_CASES = [
# 目标模式: 执行指定目标
{
"name": "登录",
"url": "http://47.99.105.253:8084",
"mode": "goal", # 目标驱动模式
"goal": "填入账号admin 密码password登录成功",
},
# 探索模式: AI 自主发现功能
{
"name": "功能探索",
"name": "登录后深度探索",
"url": "http://47.99.105.253:8084",
"mode": "explore", # 探索模式
"mode": "explore",
"config": {
"max_depth": 3,
"max_clicks": 30,
"dangerous_patterns": ["删除", "移除", "退出", "注销"], # 记录但不执行
"require_login": { # 需要先登录
"max_depth": 20, # 探索深度
"max_clicks": 2000, # 总点击次数
"require_login": { # 探索前的先决条件:登录
"goal": "填入账号admin 密码password登录成功"
}
},
"focus_patterns": ["管理", "设置", "新增"], # 引导 AI 重点测试这些关键词
"dangerous_patterns": ["删除", "退出", "注销"] # 发现但不点击,防止测试中断
}
},
# 业务流程测试: 技术协议评审完整流程
{
"name": "技术协议评审流程测试",
"url": "http://47.99.105.253:8084",
"mode": "hybrid",
"steps": [
{"action": "goal", "goal": "点击立项论证管理菜单"},
{"action": "goal", "goal": "点击项目输入子菜单"},
{"action": "goal", "goal": "点击1技术协议及科研合同评审记录"},
{"action": "goal", "goal": "点击新增按钮"},
# 切换到智能探索模式完成表单填写和提交
{"action": "explore", "config": {"max_clicks": 30, "max_depth": 1}},
{"action": "wait", "duration": 2000},
{"action": "goal", "goal": "点击提交按钮"},
{"action": "wait", "duration": 2000},
{"action": "goal", "goal": "点击待办事项"},
{"action": "goal", "goal": "点击最新提交的个人待办记录"},
{"action": "goal", "goal": "点击审核/处理按钮"},
{"action": "explore", "config": {"max_clicks": 15, "max_depth": 1}}, # 智能处理审核表单
]
},
# 业务流程测试: 产品方案管理流程
{
"name": "产品方案管理流程测试",
"url": "http://47.99.105.253:8084",
"mode": "hybrid",
"steps": [
{"action": "goal", "goal": "点击一级菜单'产品方案管理'"},
{"action": "wait", "duration": 1000},
{"action": "goal", "goal": "展开二级分类'研制方案'"},
{"action": "wait", "duration": 1000},
{"action": "goal", "goal": "点击菜单项'研制方案'"},
{"action": "wait", "duration": 1000},
{"action": "goal", "goal": "点击新增按钮"},
# 使用探索模式完成表单填写和提交
{"action": "explore", "config": {"max_clicks": 20, "max_depth": 1}},
{"action": "wait", "duration": 2000},
{"action": "goal", "goal": "点击待办事项"},
{"action": "goal", "goal": "点击审批详情"},
{"action": "explore", "config": {"max_clicks": 10, "max_depth": 1}},
]
},
# 混合模式: 先执行目标,再探索
# {
# "name": "登录后探索",
@@ -73,6 +337,44 @@ def run_single_case(case: Dict[str, Any], model: str = "claude",
try:
with WebTester(model=model, headless=headless) as tester:
tester.goto(url)
# Always attempt deterministic login first (single-case mode should be robust)
_ensure_logged_in(tester)
# 检查是否需要登录(适用于所有模式)
login_needed = False
login_goal = None
# 检查配置中是否需要登录
if mode == "explore":
config = case.get("config", {})
require_login = config.get("require_login")
if require_login:
login_goal = require_login.get("goal", "")
login_needed = True
elif mode == "hybrid":
# 检查步骤中是否包含登录
for step in case.get("steps", []):
if "登录" in step.get("goal", "") or "admin" in step.get("goal", ""):
login_needed = False # 步骤中已包含登录
break
else:
# 如果步骤中没有登录,但系统可能需要登录
login_goal = "填入账号admin 密码password登录成功"
login_needed = True
elif mode == "goal":
# 目标模式检查是否需要登录
goal = case.get("goal", "")
if "登录" not in goal and "admin" not in goal:
login_goal = "填入账号admin 密码password登录成功"
login_needed = True
# 执行登录
if login_needed and login_goal:
if not _is_logged_in(tester):
_ensure_logged_in(tester, login_goal)
_ensure_sidebar_open(tester)
if mode == "goal":
# 目标模式
@@ -103,12 +405,9 @@ def run_single_case(case: Dict[str, Any], model: str = "claude",
elif mode == "hybrid":
# 混合模式
for step in case.get("steps", []):
if step.get("action") == "goal":
tester.test(step["goal"])
elif step.get("action") == "explore":
tester.explore(step.get("config", {}))
result["status"] = "passed"
hybrid_res = _run_hybrid_steps(tester, case.get("steps", []))
result["status"] = "passed" if hybrid_res.get("passed") else "failed"
result["hybrid"] = hybrid_res
except Exception as e:
result["error"] = str(e)
@@ -116,24 +415,70 @@ def run_single_case(case: Dict[str, Any], model: str = "claude",
return result
def run_tests(model: str = "claude", headless: bool = False):
def run_tests(model: str = "claude", headless: bool = False, cases: List[Dict[str, Any]] = None):
"""串行运行所有测试用例"""
results = []
selected_cases = cases if cases is not None else TEST_CASES
with WebTester(model=model, headless=headless) as tester:
for i, case in enumerate(TEST_CASES, 1):
for i, case in enumerate(selected_cases, 1):
name = case.get("name", f"Test {i}")
url = case["url"]
mode = case.get("mode", "goal")
print(f"\n{'='*60}")
print(f"🧪 [{i}/{len(TEST_CASES)}] {name}")
print(f"🧪 [{i}/{len(selected_cases)}] {name}")
print(f" URL: {url}")
print(f" Mode: {mode}")
print(f"{'='*60}")
try:
tester.goto(url)
_ensure_logged_in(tester)
# 检查是否需要登录(适用于所有模式)
login_needed = False
login_goal = None
# 检查配置中是否需要登录
if mode == "explore":
config = case.get("config", {}).copy()
require_login = config.pop("require_login", None)
if require_login:
login_goal = require_login.get("goal", "")
login_needed = True
elif mode == "hybrid":
# 检查步骤中是否包含登录
for step in case.get("steps", []):
if "登录" in step.get("goal", "") or "admin" in step.get("goal", ""):
login_needed = False # 步骤中已包含登录
break
else:
# 如果步骤中没有登录,但系统可能需要登录
login_goal = "填入账号admin 密码password登录成功"
login_needed = True
elif mode == "goal":
# 目标模式检查是否需要登录
goal = case.get("goal", "")
if "登录" not in goal and "admin" not in goal:
login_goal = "填入账号admin 密码password登录成功"
login_needed = True
# 执行登录
if login_needed and login_goal:
print(f" 🔎 检查登录状态...")
if not _is_logged_in(tester):
ok = _ensure_logged_in(tester, login_goal)
if ok:
print(f" ✅ 登录成功")
else:
print(f" ⚠️ 登录未确认成功,继续执行")
else:
print(f" ✅ 已处于登录状态,跳过登录步骤")
_ensure_sidebar_open(tester)
if mode == "goal":
goal = case.get("goal", "")
@@ -148,6 +493,12 @@ def run_tests(model: str = "claude", headless: bool = False):
status = "passed"
else:
print(f"⚠️ 部分失败: {failed_count}/{result['steps']} 步骤失败")
first_failed = next((r for r in result.get("results", []) if not r.get("success", False)), None)
if first_failed:
action_type = first_failed.get("action_type") or first_failed.get("action") or "unknown"
target = first_failed.get("target") or first_failed.get("element") or first_failed.get("description") or ""
err = first_failed.get("error") or first_failed.get("reason") or ""
print(f" ❌ 首个失败: {action_type} {target} {err}".strip())
status = "failed"
print(f"📄 报告: {result['report']}")
@@ -161,33 +512,101 @@ def run_tests(model: str = "claude", headless: bool = False):
elif mode == "explore":
config = case.get("config", {}).copy()
login_status = "skipped"
login_error = None
# 如果需要先登录
require_login = config.pop("require_login", None)
if require_login:
login_goal = require_login.get("goal", "")
if login_goal:
print(f" 🔐 执行登录...")
tester.test(login_goal)
tester.browser.wait(1000)
try:
# 检查是否已登录
print(f" 🔎 检查登录状态...")
if _is_logged_in(tester):
print(f" ✅ 已处于登录状态,跳过登录步骤")
login_status = "already_logged_in"
else:
print(f" 🔐 执行登录: {login_goal}")
login_result = tester.test(login_goal)
tester.browser.wait(3000)
# 检查登录结果
login_success = all(r.get("success", False) for r in login_result.get("results", []))
if login_success:
login_status = "success"
else:
login_status = "partial_failure"
# 登录部分失败,但仍继续探索
print(f" ⚠️ 登录步骤部分失败,继续尝试探索...")
# 等待页面加载
print(f" 🔎 确认登录跳转结果...")
tester.browser.wait_for_load_state("networkidle")
tester.browser.wait(2000)
except Exception as e:
login_error = str(e)
login_status = "error"
print(f" ⚠️ 登录过程出错: {e},继续尝试探索...")
# 执行探索
# 无论登录结果如何,都执行探索
print(f" 🔍 开始功能探索...")
result = tester.explore(config)
elements = len(result.get("discovered_elements", []))
bugs = len(result.get("bug_list", []))
print(f"✅ 探索完成: 发现 {elements} 个元素, {bugs} 个问题")
print(f"📄 报告: {result.get('report', '')}")
results.append({
"name": name,
"status": "passed",
"elements": elements,
"bugs": bugs,
"report": result.get("report", ""),
})
try:
result = tester.explore(config)
elements = len(result.get("discovered_elements", []))
bugs = len(result.get("bug_list", []))
print(f"✅ 探索完成: 发现 {elements} 个元素, {bugs} 个问题")
print(f"📄 报告: {result.get('report', '')}")
# 根据登录状态决定最终状态
final_status = "passed" if login_status in ("success", "already_logged_in", "skipped") else "partial"
results.append({
"name": name,
"status": final_status,
"login_status": login_status,
"login_error": login_error,
"elements": elements,
"bugs": bugs,
"report": result.get("report", ""),
})
except Exception as e:
print(f" ❌ 探索过程出错: {e}")
results.append({
"name": name,
"status": "failed",
"login_status": login_status,
"error": str(e),
})
elif mode == "hybrid":
try:
hybrid_res = _run_hybrid_steps(tester, case.get("steps", []))
status = "passed" if hybrid_res.get("passed") else "failed"
if status != "passed":
step_results = hybrid_res.get("step_results", [])
last = step_results[-1] if step_results else None
if last:
step_idx = last.get("step")
action = last.get("action")
goal_txt = last.get("goal", "")
print(f" ❌ Hybrid 失败在 step {step_idx}: {action} {goal_txt}".strip())
results.append({
"name": name,
"status": status,
"mode": "hybrid",
"hybrid": hybrid_res,
})
except Exception as e:
print(f"❌ 失败: {e}")
results.append({
"name": name,
"status": "failed",
"mode": "hybrid",
"error": str(e),
})
except Exception as e:
print(f"❌ 失败: {e}")
@@ -201,7 +620,7 @@ def run_tests(model: str = "claude", headless: bool = False):
return results
def run_tests_parallel(model: str = "claude", max_workers: int = 3):
def run_tests_parallel(model: str = "claude", max_workers: int = 3, cases: List[Dict[str, Any]] = None):
"""
并行运行所有测试用例
@@ -210,17 +629,15 @@ def run_tests_parallel(model: str = "claude", max_workers: int = 3):
max_workers: 最大并行数(默认 3
"""
print(f"\n🚀 并行模式启动 (workers={max_workers})")
print(f"📋 待执行测试: {len(TEST_CASES)}\n")
selected_cases = cases if cases is not None else TEST_CASES
print(f"📋 待执行测试: {len(selected_cases)}\n")
results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_case = {
executor.submit(run_single_case, case, model, True): case
for case in TEST_CASES
}
future_to_case = {executor.submit(run_single_case, case, model, True): case for case in selected_cases}
# 收集结果
for future in as_completed(future_to_case):
@@ -251,21 +668,57 @@ def _print_summary(results: List[Dict[str, Any]]):
print("📊 测试总结")
print(f"{'='*60}")
passed = sum(1 for r in results if r["status"] == "passed")
failed = len(results) - passed
partial = sum(1 for r in results if r["status"] == "partial")
failed = sum(1 for r in results if r["status"] == "failed")
print(f"✅ 通过: {passed}")
if partial > 0:
print(f"⚠️ 部分通过: {partial}")
print(f"❌ 失败: {failed}")
if results:
print(f"📈 通过率: {passed/len(results)*100:.1f}%")
# 通过率计算将 partial 视为 0.5
rate = (passed + partial * 0.5) / len(results) * 100
print(f"📈 通过率: {rate:.1f}%")
failed_cases = [r for r in results if r.get("status") == "failed"]
if failed_cases:
print("\n❌ 失败用例:")
for r in failed_cases:
name = r.get("name", "")
report = r.get("report", "")
err = r.get("error", "")
line = f"- {name}"
if report:
line += f" | {report}"
if err:
line += f" | {err}"
print(line)
def run_single_test(url: str, goal: str, model: str = "claude"):
def run_single_test(url: str, goal: str, model: str = "claude", headless: bool = False):
"""运行单个测试"""
with WebTester(model=model) as tester:
tester.goto(url)
result = tester.test(goal)
print(f"✅ 完成: {result['steps']} 步骤")
case = {"name": goal[:30], "url": url, "mode": "goal", "goal": goal}
result = run_single_case(case, model=model, headless=headless)
if result.get("status") == "passed":
print(f"✅ 完成")
else:
print(f"❌ 失败: {result.get('error', 'unknown')}")
if result.get("report"):
print(f"📄 报告: {result['report']}")
return result
return result
def _select_cases(args) -> List[Dict[str, Any]]:
cases: List[Dict[str, Any]] = TEST_CASES
if getattr(args, "index", None) is not None:
idx = int(args.index)
if idx < 1 or idx > len(TEST_CASES):
raise ValueError(f"index out of range: {idx}")
cases = [TEST_CASES[idx - 1]]
if getattr(args, "case", None):
q = str(args.case).strip().lower()
cases = [c for c in cases if q in str(c.get("name", "")).lower()]
return cases
# ============================================================
@@ -278,16 +731,26 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="AI Web Tester - 测试用例运行器")
parser.add_argument("--url", help="单个测试的 URL")
parser.add_argument("--goal", help="单个测试的目标描述")
parser.add_argument("--model", default="claude", choices=["claude", "openai"], help="AI 模型")
parser.add_argument("--model", default="mimo", choices=["claude", "openai", "mimo", "glm"], help="AI 模型")
parser.add_argument("--headless", action="store_true", help="无头模式运行")
parser.add_argument("--parallel", action="store_true", help="并行执行测试")
parser.add_argument("--workers", type=int, default=3, help="并行工作线程数")
parser.add_argument("--list", action="store_true", help="列出内置测试用例")
parser.add_argument("--case", help="按用例名子串筛选(大小写不敏感)")
parser.add_argument("--index", type=int, help="按序号选择用例(从 1 开始,基于内置用例列表)")
args = parser.parse_args()
if args.list:
for i, c in enumerate(TEST_CASES, 1):
print(f"[{i}] {c.get('name', '')} ({c.get('mode', 'goal')})")
sys.exit(0)
selected_cases = _select_cases(args)
if args.url and args.goal:
run_single_test(args.url, args.goal, args.model)
run_single_test(args.url, args.goal, args.model, args.headless)
elif args.parallel:
run_tests_parallel(model=args.model, max_workers=args.workers)
run_tests_parallel(model=args.model, max_workers=args.workers, cases=selected_cases)
else:
run_tests(model=args.model, headless=args.headless)
run_tests(model=args.model, headless=args.headless, cases=selected_cases)