""" Feature Explorer - AI-driven autonomous feature discovery """ from typing import List, Dict, Any, Set, Optional import json import re import logging from datetime import datetime logger = logging.getLogger(__name__) class FeatureExplorer: """AI 驱动的功能探索器 - 自主发现并记录页面功能""" def __init__(self, browser, analyzer): self.browser = browser self.analyzer = analyzer # 探索状态 self.discovered_elements: List[Dict] = [] self.visited_urls: Set[str] = set() self.site_map: Dict[str, List[str]] = {} # URL -> 子页面列表 self.bug_list: List[Dict] = [] self.action_log: List[Dict] = [] # 默认配置 self.config = { "max_depth": 3, "max_clicks": 50, "skip_patterns": [], "dangerous_patterns": ["删除", "移除", "清空", "退出", "注销"], "focus_patterns": ["新增", "添加", "创建", "搜索", "查询", "确定", "保存", "提交"], } def explore(self, config: Dict = None) -> Dict[str, Any]: """ 执行多页面深度探索 Returns: 探索结果报告 """ if config: self.config.update(config) start_url = self.browser.page.url self.visited_urls.add(start_url) click_count = 0 current_depth = 0 print(f"🔍 开始探索: {start_url}") print(f" 配置: 最大深度={self.config['max_depth']}, 最大点击={self.config['max_clicks']}") # 使用队列管理待探索的页面和元素 page_queue = [(start_url, 0)] # (url, depth) explored_pages = set() while page_queue and click_count < self.config["max_clicks"]: current_url, current_depth = page_queue.pop(0) if current_url in explored_pages: continue if current_depth > self.config["max_depth"]: print(f" ⏩ 跳过深度 {current_depth} 页面: {current_url}") continue explored_pages.add(current_url) # 导航到目标页面(如果不是当前页面) if self.browser.page.url != current_url: print(f"\n📄 导航到: {current_url}") try: self.browser.goto(current_url) self.browser.wait(1000) except: print(f" ⚠️ 导航失败") continue print(f"\n{'='*50}") print(f"📍 深度 {current_depth}: 探索页面") print(f" URL: {current_url[:60]}...") print(f"{'='*50}") # 发现当前页面元素 print(f" 正在分析页面元素...") all_elements = self._discover_elements() print(f" 发现 {len(all_elements)} 个可交互元素") if not all_elements: print(" ⚠️ 没有发现可交互元素") continue # 过滤和排序 elements = self._filter_and_sort(all_elements) print(f" 过滤后 {len(elements)} 个待探索元素") # 在当前页面探索元素 element_index = 0 while element_index < len(elements) and click_count < self.config["max_clicks"]: element = elements[element_index] element_index += 1 click_count += 1 name = element.get('name', '未知') print(f"\n [{click_count}] 探索: {name}") # 记录操作前的状态 before_url = self.browser.page.url before_count = len(elements) # 执行探索 self._explore_element(element, click_count) # 检查是否发生页面跳转 self.browser.wait(300) after_url = self.browser.page.url if before_url != after_url: print(f" 🔀 页面跳转: {after_url[:50]}...") # 添加新页面到队列 if after_url not in explored_pages and current_depth < self.config["max_depth"]: page_queue.append((after_url, current_depth + 1)) self.visited_urls.add(after_url) # 更新站点地图 if before_url not in self.site_map: self.site_map[before_url] = [] if after_url not in self.site_map[before_url]: self.site_map[before_url].append(after_url) # 返回原页面继续探索 try: self.browser.goto(before_url) self.browser.wait(500) except: break else: # 没有跳转,检查是否有新元素出现(如折叠菜单展开或 Tab 切换) is_tab_click = element.get("type") == "tab" or \ "tab" in element.get("tagName", "").lower() or \ "tab" in element.get("name", "").lower() new_elements = self._discover_elements() new_filtered = self._filter_and_sort(new_elements) # 找出新出现的元素 existing_names = {e.get("name") for e in elements} new_items = [e for e in new_filtered if e.get("name") not in existing_names] if new_items: if is_tab_click: print(f" 📑 Tab 切换,发现 {len(new_items)} 个新内容元素") else: print(f" 📋 发现 {len(new_items)} 个新元素(菜单展开)") # 将新元素插入到当前位置之后 elements = elements[:element_index] + new_items + elements[element_index:] print(f"\n✅ 探索完成:") print(f" - 点击次数: {click_count}") print(f" - 访问页面: {len(self.visited_urls)}") print(f" - 发现元素: {len(self.discovered_elements)}") # 生成报告 return self._generate_report(start_url, click_count) def _discover_elements(self, use_ai: bool = False) -> List[Dict]: """发现页面上所有可交互元素""" # 默认使用 DOM 快速发现,可选使用 AI if use_ai: return self._discover_elements_ai() else: return self._discover_elements_dom() def _discover_elements_dom(self) -> List[Dict]: """使用 DOM 快速发现可交互元素(毫秒级)""" current_url = self.browser.page.url try: result = self.browser.page.evaluate(''' () => { const elements = []; const seen = new Set(); // 查找所有可交互元素 const selectors = [ 'a[href]', // 链接 'button', // 按钮 '[role="button"]', // 角色按钮 '[role="menuitem"]', // 菜单项 '[role="tab"]', // 标签页 '[role="link"]', // 角色链接 '[role="row"]', // 表格行 '.nav-item, .menu-item', // 导航项 '[onclick]', // 点击事件 'input[type="submit"]', // 提交按钮 // Tab 内容区域常见元素 '.ant-tabs-tab', // Ant Design tabs '.el-tabs__item', // Element UI tabs '.tab-pane a, .tab-content a', // Tab 内容链接 '.card-header, .card-title', // 卡片标题 'tr[data-row-key]', // 表格可点击行 '.ant-table-row', // Ant Design 表格行 '.list-item, .list-group-item', // 列表项 // Vben Admin / Naive UI tabs '.tabs-chrome__item', // Vben tabs (右侧标签页) '.vben-tabs-content > div', // Vben tabs 容器 '.n-tabs-tab', // Naive UI tabs // 表单元素 'input:not([type="hidden"]):not([type="submit"]):not([type="button"])', 'textarea', 'select', '.ant-input, .ant-select', // 框架特定 '.el-input__inner, .el-select', ]; for (const selector of selectors) { document.querySelectorAll(selector).forEach(el => { // 改进名称提取:优先使用 textContent,其次使用 placeholder/aria-label let text = el.textContent?.trim().substring(0, 50) || ''; if (!text && (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA' || el.tagName === 'SELECT')) { text = el.getAttribute('placeholder') || el.getAttribute('aria-label') || el.getAttribute('name') || ''; // 尝试通过 label 关联查找名称 if (!text && el.id) { const label = document.querySelector(`label[for="${el.id}"]`); if (label) text = label.textContent.trim(); } } const key = text + el.tagName + el.className; if (!text || seen.has(key)) return; if (text.length < 1 || text.length > 50) return; const rect = el.getBoundingClientRect(); if (rect.width <= 0 || rect.height <= 0) return; if (rect.top < 0 || rect.left < 0) return; seen.add(key); // 推断类型 let type = 'link'; const cls = el.className || ''; if (el.tagName === 'BUTTON' || el.getAttribute('role') === 'button') type = 'button'; if (el.closest('nav') || el.classList.contains('nav-item')) type = 'navigation'; if (el.getAttribute('role') === 'menuitem') type = 'menu'; if (el.getAttribute('role') === 'tab' || cls.includes('tabs-chrome') || cls.includes('tabs-tab') || cls.includes('ant-tabs-tab') || cls.includes('el-tabs__item')) type = 'tab'; if (el.tagName === 'INPUT' || el.tagName === 'TEXTAREA') type = 'form_input'; if (el.tagName === 'SELECT') type = 'form_select'; elements.push({ name: text, type: type, tagName: el.tagName, priority: type === 'navigation' ? 8 : (type.startsWith('form_') ? 7 : 5), x: Math.round(rect.left + rect.width / 2), y: Math.round(rect.top + rect.height / 2) }); }); } // 额外查找 cursor:pointer 元素 document.querySelectorAll('*').forEach(el => { if (window.getComputedStyle(el).cursor === 'pointer') { const text = Array.from(el.childNodes) .filter(n => n.nodeType === 3) .map(n => n.textContent.trim()) .join('').substring(0, 50); if (!text || text.length < 2 || seen.has(text + el.tagName)) return; const rect = el.getBoundingClientRect(); if (rect.width <= 0 || rect.height <= 0 || rect.width > 500) return; if (rect.top < 0 || rect.left < 0) return; seen.add(text + el.tagName); elements.push({ name: text, type: 'link', tagName: el.tagName, priority: 4, x: Math.round(rect.left + rect.width / 2), y: Math.round(rect.top + rect.height / 2) }); } }); return elements; } ''') # 添加元数据 for el in result: el["source_url"] = current_url el["discovered_at"] = datetime.now().isoformat() return result except Exception as e: logger.warning(f"DOM 发现失败: {e}") return [] def _discover_elements_ai(self) -> List[Dict]: """使用 AI 发现页面元素(较慢但更智能)""" img = self.browser.screenshot_base64() current_url = self.browser.page.url prompt = """分析截图,识别可交互元素。返回 JSON: {"elements": [{"name": "文字", "type": "navigation|button|link", "priority": 1-10}]} 只返回 JSON。""" response = self.analyzer.model.analyze(img, prompt) try: match = re.search(r'\{[\s\S]*\}', response) if match: result = json.loads(match.group()) elements = result.get("elements", []) for el in elements: el["source_url"] = current_url el["discovered_at"] = datetime.now().isoformat() return elements except Exception as e: logger.warning(f"AI 解析失败: {e}") return [] def _filter_and_sort(self, elements: List[Dict]) -> List[Dict]: """过滤和排序元素""" filtered = [] for el in elements: name = el.get("name", "") # 跳过已探索的元素 if self._is_explored(el): continue # 完全跳过的模式 if any(p in name for p in self.config["skip_patterns"]): continue # 标记危险元素(记录但不执行) if any(p in name for p in self.config["dangerous_patterns"]): el["is_dangerous"] = True # 优先探索的模式 if any(p in name for p in self.config["focus_patterns"]): el["priority"] = el.get("priority", 5) + 5 filtered.append(el) # 按优先级排序 filtered.sort(key=lambda x: x.get("priority", 5), reverse=True) return filtered def _is_explored(self, element: Dict) -> bool: """检查元素是否已探索""" name = element.get("name", "") return any(d.get("name") == name for d in self.discovered_elements) def _explore_element(self, element: Dict, click_num: int) -> None: """探索单个元素""" name = element.get("name", "未知") el_type = element.get("type", "unknown") is_dangerous = element.get("is_dangerous", False) # 记录为已发现 self.discovered_elements.append(element) # 截图(操作前)- 使用较小的截图减少内存 before_url = self.browser.page.url before_shot = None # 暂不保存截图以加速 action_record = { "step": click_num, "element": element, "before_url": before_url, "action_taken": False, "success": True, "error": None } # 危险元素只记录不执行 if is_dangerous: print(f" ⚠️ 危险操作,仅记录: {name}") action_record["skipped"] = True action_record["skip_reason"] = "危险操作" self.action_log.append(action_record) return # 执行操作 (点击或填充) try: # 优先使用元素自带的坐标 if "x" in element and "y" in element: coords = (element["x"], element["y"]) else: coords = self._find_element_by_name(name) if coords: if el_type == "form_input": test_data = f"AutoTest_{name}_{click_num}" print(f" → 填充表单: {name} = {test_data}") self.browser.click_at(coords[0], coords[1]) self.browser.wait(200) self.browser.press_key("Control+A") self.browser.press_key("Backspace") self.browser.page.keyboard.type(test_data) action_record["action_type"] = "fill" else: print(f" → 点击 ({coords[0]}, {coords[1]})") self.browser.click_at(coords[0], coords[1]) action_record["action_type"] = "click" self.browser.wait(500) action_record["action_taken"] = True action_record["click_coords"] = coords # 操作后的结果验证 result_info = self._check_action_result() if result_info: print(f" 📋 结果确认: {result_info['message']}") action_record["result"] = result_info if result_info["type"] == "error": self.bug_list.append({ "type": "business_error", "element": name, "message": result_info["message"], "url": before_url }) else: print(f" ⚠️ 未找到元素") action_record["success"] = False except Exception as e: print(f" ❌ 操作失败: {e}") action_record["success"] = False action_record["error"] = str(e) self.bug_list.append({ "type": "action_failed", "element": name, "error": str(e), "url": before_url }) # 检查结果 self.browser.wait(300) after_url = self.browser.page.url action_record["after_url"] = after_url action_record["url_changed"] = before_url != after_url # 更新站点地图 if before_url != after_url: if before_url not in self.site_map: self.site_map[before_url] = [] if after_url not in self.site_map[before_url]: self.site_map[before_url].append(after_url) self.visited_urls.add(after_url) print(f" → 跳转到新页面") self.action_log.append(action_record) def _check_action_result(self) -> Optional[Dict]: """检查操作后的结果(如消息提示)""" try: # 检测常见的消息提示组件 (AntD, Element, Vben) result = self.browser.page.evaluate(''' () => { // 查找包含成功、错误关键词的消息框 const messageSelectors = [ '.ant-message-notice', '.el-message', '.vben-basic-title', '.n-message', '[role="alert"]', '.message-box' ]; for (const sel of messageSelectors) { const msg = document.querySelector(sel); if (msg && msg.innerText) { const text = msg.innerText; let type = 'info'; if (text.includes('成功') || text.includes('Success')) type = 'success'; if (text.includes('失败') || text.includes('错误') || text.includes('Error') || text.includes('fail')) type = 'error'; return { message: text, type: type }; } } return null; } ''') return result except: return None # 转义特殊字符 escaped_name = name.replace("\\", "\\\\").replace('"', '\\"').replace("'", "\\'") try: result = self.browser.page.evaluate(f''' () => {{ const searchText = "{escaped_name}"; const clickable = ['A', 'BUTTON', 'INPUT', 'LI', 'SPAN', 'DIV', 'NAV', 'LABEL']; // 收集所有匹配的元素 const matches = []; const all = document.querySelectorAll('*'); for (const el of all) {{ // 只检查直接文本内容,不包含子元素的文本 const directText = Array.from(el.childNodes) .filter(n => n.nodeType === 3) // 文本节点 .map(n => n.textContent.trim()) .join(''); // 或者元素的完整文本很短(小于搜索文本的2倍) const fullText = el.textContent?.trim() || ''; // 匹配条件 const hasDirectMatch = directText.includes(searchText); const hasShortMatch = fullText.includes(searchText) && fullText.length < searchText.length * 3; if ((hasDirectMatch || hasShortMatch) && (clickable.includes(el.tagName) || el.onclick || el.getAttribute('role') === 'button' || el.getAttribute('role') === 'menuitem' || el.getAttribute('role') === 'link' || window.getComputedStyle(el).cursor === 'pointer')) {{ const r = el.getBoundingClientRect(); if (r.width > 0 && r.height > 0 && r.width < 800 && r.height < 200) {{ matches.push({{ el: el, rect: r, textLen: fullText.length, area: r.width * r.height }}); }} }} }} if (matches.length === 0) {{ return {{ found: false }}; }} // 选择面积最小的匹配元素(最精确) matches.sort((a, b) => a.area - b.area); const best = matches[0]; return {{ found: true, x: Math.round(best.rect.left + best.rect.width / 2), y: Math.round(best.rect.top + best.rect.height / 2), tagName: best.el.tagName, text: best.el.textContent.substring(0, 50) }}; }} ''') if result.get("found"): return (result["x"], result["y"]) except Exception as e: logger.warning(f"DOM 查找失败: {e}") return None def _locate_element(self, target: str) -> Optional[tuple]: """使用 AI 定位元素(备用方法,较慢)""" img = self.browser.screenshot_base64() viewport = self.browser.page.viewport_size width = viewport["width"] if viewport else 1920 height = viewport["height"] if viewport else 1080 prompt = f"""在 {width}x{height} 像素的截图中,找到 "{target}" 的精确中心坐标。 返回 JSON: {{"x": 数字, "y": 数字, "found": true}} 如果找不到: {{"found": false}} 只返回 JSON。""" response = self.analyzer.model.analyze(img, prompt) try: match = re.search(r'\{[\s\S]*?\}', response) if match: result = json.loads(match.group()) if result.get("found"): return (result["x"], result["y"]) except: pass return None def _detect_bugs(self, action: Dict) -> None: """检测可能的 BUG""" # 检测空白页 # 检测错误提示 # 检测加载失败 # TODO: 使用 AI 分析截图检测异常 pass def _generate_report(self, start_url: str, click_count: int) -> Dict[str, Any]: """生成探索报告""" # 按类型统计元素 type_stats = {} for el in self.discovered_elements: t = el.get("type", "unknown") type_stats[t] = type_stats.get(t, 0) + 1 report = { "summary": { "start_url": start_url, "total_elements": len(self.discovered_elements), "total_clicks": click_count, "pages_visited": len(self.visited_urls), "bugs_found": len(self.bug_list), "timestamp": datetime.now().isoformat() }, "elements_by_type": type_stats, "discovered_elements": self.discovered_elements, "site_map": self.site_map, "bug_list": self.bug_list, "action_log": self.action_log, "visited_urls": list(self.visited_urls) } logger.info(f"探索完成: 发现 {len(self.discovered_elements)} 个元素, " f"访问 {len(self.visited_urls)} 个页面, " f"发现 {len(self.bug_list)} 个问题") return report