#!/usr/bin/env python3 """Capture safe UI states behind add/edit actions from a logged-in Chrome session.""" from __future__ import annotations import argparse import json import sys import time from collections import deque from pathlib import Path from typing import Any import mirror_logged_in_site as mirror DEFAULT_HOST = "hc-pos.sqygj.cn" DEFAULT_SEED_URLS = [ "https://hc-pos.sqygj.cn/#/propertySMG/basicManagement/basicsInfo", "https://hc-pos.sqygj.cn/#/propertySMG/basicManagement/communityBasicsInfo", "https://hc-pos.sqygj.cn/#/communitySMG/serviceProvider", "https://hc-pos.sqygj.cn/#/propertySMG/businessTaxCank/revenueManage/price", "https://hc-pos.sqygj.cn/#/propertySMG/businessTaxCank/revenueManage/payer", "https://hc-pos.sqygj.cn/#/propertySMG/contractManage/contractList", "https://hc-pos.sqygj.cn/#/propertySMG/elevatorManage/elevatorFile/elevatorArchives", "https://hc-pos.sqygj.cn/#/propertySMG/equipmentManage/equipmentFiling/equipmentArchives", "https://hc-pos.sqygj.cn/#/propertySMG/customerOperations/contentOperation/newOperation", "https://hc-pos.sqygj.cn/#/propertySMG/customerOperations/customerServices/activeService/activeServicePlan", ] ACTION_LABELS = { "new": "新增", "edit": "编辑", } def eval_json(cdp_port: int, script: str) -> Any: return mirror.eval_json(cdp_port, script) def collect_view_state(cdp_port: int) -> dict[str, Any]: script = r""" (() => { const app = document.querySelector("#app"); const visible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = getComputedStyle(el); return ( rect.width > 0 && rect.height > 0 && rect.bottom > 0 && rect.right > 0 && rect.left < window.innerWidth && rect.top < window.innerHeight && style.display !== "none" && style.visibility !== "hidden" ); }; const titles = [...document.querySelectorAll(".el-dialog__title, .el-drawer__header, .drawer-title, .dialog-title, .el-message-box__title")] .map((el) => (el.innerText || "").trim()) .filter(Boolean) .slice(0, 8); const visibleOverlayTitles = [...document.querySelectorAll(".el-dialog__title, .el-drawer__header, .drawer-title, .dialog-title, .el-message-box__title")] .filter((el) => visible(el) || visible(el.closest(".el-dialog, .el-dialog__wrapper, .el-drawer, .el-message-box__wrapper, .el-message-box"))) .map((el) => (el.innerText || "").trim()) .filter(Boolean) .slice(0, 8); const visibleMessages = [...document.querySelectorAll(".el-message, .el-notification, .el-message-box__message")] .filter((el) => visible(el) || visible(el.closest(".el-message-box__wrapper, .el-message-box"))) .map((el) => (el.innerText || el.textContent || "").replace(/\s+/g, " ").trim()) .filter(Boolean) .slice(0, 8); const visibleButtons = [...document.querySelectorAll("button, a, [role='button'], .el-button")] .filter((el) => visible(el)) .map((el) => (el.innerText || el.textContent || "").replace(/\s+/g, " ").trim()) .filter((text) => text && text.length <= 16) .slice(0, 40); const forms = [...document.querySelectorAll("form, .el-form")] .filter((el) => visible(el)) .length; const dialogs = [...document.querySelectorAll(".el-dialog__wrapper, .el-dialog, .el-drawer, .el-message-box__wrapper, .el-message-box")] .filter((el) => visible(el)) .length; const routeParts = location.href.split("#/"); return JSON.stringify({ href: location.href, title: document.title, route: routeParts.length > 1 ? routeParts[1] : "", appHtmlLen: app ? app.innerHTML.length : 0, appTextLen: app ? (app.innerText || "").trim().length : 0, loadingMasks: [...document.querySelectorAll(".el-loading-mask")].filter((el) => visible(el)).length, visibleForms: forms, visibleDialogs: dialogs, overlayTitles: titles, visibleOverlayTitles, visibleMessages, visibleButtons, }); })() """ return eval_json(cdp_port, script) def wait_for_route_ready(cdp_port: int, expected_url: str, timeout: int) -> dict[str, Any]: start = time.time() last_signature: tuple[Any, ...] | None = None stable_ticks = 0 while time.time() - start < timeout: time.sleep(1) state = collect_view_state(cdp_port) signature = ( state.get("href"), state.get("loadingMasks"), state.get("appHtmlLen"), tuple(state.get("visibleButtons", [])), ) ready = state.get("href") == expected_url and int(state.get("loadingMasks", 0)) == 0 and int(state.get("appHtmlLen", 0)) > 1000 if ready: stable_ticks = stable_ticks + 1 if signature == last_signature else 0 if stable_ticks >= 1: return state else: stable_ticks = 0 last_signature = signature return collect_view_state(cdp_port) def open_fresh_page(cdp_port: int, url: str, timeout: int) -> None: dashboard_url = f"https://{DEFAULT_HOST}/#/dashboard" if url != dashboard_url: mirror.run_agent_browser(cdp_port, "open", dashboard_url) wait_for_route_ready(cdp_port, dashboard_url, timeout) mirror.run_agent_browser(cdp_port, "open", url) wait_for_route_ready(cdp_port, url, timeout) def dismiss_global_notices(cdp_port: int) -> None: script = r""" (() => { const visible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = getComputedStyle(el); return rect.width > 0 && rect.height > 0 && style.display !== "none" && style.visibility !== "hidden"; }; let closed = 0; for (const dialog of document.querySelectorAll(".el-dialog__wrapper, .el-dialog")) { const text = (dialog.innerText || "").replace(/\s+/g, " ").trim(); if (!visible(dialog)) continue; if (!/(站内通知|公告详情)/.test(text)) continue; const button = dialog.querySelector(".el-dialog__headerbtn, .el-button, button"); if (!button || !visible(button)) continue; button.click(); closed += 1; } return JSON.stringify({ closed }); })() """ eval_json(cdp_port, script) time.sleep(1) def list_action_candidates(cdp_port: int) -> list[dict[str, Any]]: script = r""" (() => { const visible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = getComputedStyle(el); return ( rect.width >= 20 && rect.height >= 16 && rect.bottom > 0 && rect.right > 0 && rect.left < window.innerWidth && rect.top < window.innerHeight && style.display !== "none" && style.visibility !== "hidden" && style.pointerEvents !== "none" ); }; const normalize = (text) => (text || "").replace(/\s+/g, " ").trim(); const detectType = (text) => /(新增|新建|添加|创建|立即创建)/.test(text) ? "new" : "edit"; const result = []; const seen = new Set(); let seq = 0; for (const raw of document.querySelectorAll("button, a, [role='button'], .el-button, span, div")) { const text = normalize(raw.innerText || raw.textContent || ""); if (!text || text.length > 24) continue; if (!/(新增|新建|添加|编辑|修改|创建|立即创建)/.test(text)) continue; if (/(保存|提交|删除|移除|取消|关闭|确认|导入|导出|下载|打印)/.test(text)) continue; const target = raw.closest("button, a, [role='button'], .el-button") || raw; if (!visible(target)) continue; if (seen.has(target)) continue; if (String(target.className || "").includes("tags-view-item")) continue; const style = getComputedStyle(target); const clickable = target.matches("button, a, [role='button'], .el-button") || style.cursor === "pointer" || typeof target.onclick === "function"; if (!clickable) continue; const rect = target.getBoundingClientRect(); const id = `codex-action-${++seq}`; target.setAttribute("data-codex-action-id", id); seen.add(target); result.push({ id, text, action_type: detectType(text), tag: target.tagName, class_name: String(target.className || ""), x: Math.round(rect.left), y: Math.round(rect.top), w: Math.round(rect.width), h: Math.round(rect.height), }); } result.sort((a, b) => (a.y - b.y) || (a.x - b.x)); return JSON.stringify(result); })() """ raw = eval_json(cdp_port, script) if not isinstance(raw, list): raise RuntimeError("failed to enumerate action candidates") return raw def click_action_candidate(cdp_port: int, action_id: str) -> None: script = f""" (() => {{ const node = document.querySelector("[data-codex-action-id='{action_id}']"); if (!node) return JSON.stringify({{"clicked": false, "reason": "missing"}}) node.scrollIntoView({{block: "center", inline: "center"}}); const rect = node.getBoundingClientRect(); const point = {{ clientX: rect.left + Math.min(rect.width / 2, Math.max(8, rect.width - 8)), clientY: rect.top + Math.min(rect.height / 2, Math.max(8, rect.height - 8)), bubbles: true, }}; node.dispatchEvent(new MouseEvent("mouseenter", point)); node.dispatchEvent(new MouseEvent("mousedown", point)); node.dispatchEvent(new MouseEvent("mouseup", point)); node.click(); return JSON.stringify({{"clicked": true}}); }})() """ result = eval_json(cdp_port, script) if not isinstance(result, dict) or not result.get("clicked"): raise RuntimeError(f"failed to click action {action_id}: {result}") def wait_for_action_change(cdp_port: int, baseline: dict[str, Any], timeout: int) -> dict[str, Any] | None: start = time.time() stable_ticks = 0 baseline_href = baseline.get("href") baseline_dialogs = int(baseline.get("visibleDialogs", 0)) baseline_forms = int(baseline.get("visibleForms", 0)) baseline_len = int(baseline.get("appHtmlLen", 0)) baseline_buttons = tuple(baseline.get("visibleButtons", [])) last_signature: tuple[Any, ...] | None = None while time.time() - start < timeout: time.sleep(1) current = collect_view_state(cdp_port) signature = ( current.get("href"), current.get("visibleDialogs"), current.get("visibleForms"), current.get("appHtmlLen"), tuple(current.get("visibleButtons", [])), tuple(current.get("overlayTitles", [])), ) if current.get("loadingMasks", 0): last_signature = signature stable_ticks = 0 continue changed = ( current.get("href") != baseline_href or int(current.get("visibleDialogs", 0)) > baseline_dialogs or int(current.get("visibleForms", 0)) > baseline_forms or abs(int(current.get("appHtmlLen", 0)) - baseline_len) > 200 or tuple(current.get("visibleButtons", [])) != baseline_buttons ) if changed: stable_ticks = stable_ticks + 1 if signature == last_signature else 0 if stable_ticks >= 1: return current last_signature = signature return None def sanitize_action_candidates( candidates: list[dict[str, Any]], per_action_limit: int, ) -> list[dict[str, Any]]: selected: list[dict[str, Any]] = [] counts = {"new": 0, "edit": 0} seen_keys: set[tuple[str, str, int, int]] = set() for candidate in candidates: action_type = candidate.get("action_type") text = str(candidate.get("text", "")).strip() if action_type not in counts or not text: continue if counts[action_type] >= per_action_limit: continue key = (action_type, text, int(candidate.get("x", 0)), int(candidate.get("y", 0))) if key in seen_keys: continue seen_keys.add(key) counts[action_type] += 1 selected.append(candidate) return selected def resolve_action_candidate(cdp_port: int, wanted: dict[str, Any]) -> dict[str, Any] | None: candidates = list_action_candidates(cdp_port) if not candidates: return None wanted_type = wanted.get("action_type") wanted_text = str(wanted.get("text", "")).strip() wanted_x = int(wanted.get("x", 0)) wanted_y = int(wanted.get("y", 0)) exact = [item for item in candidates if item.get("action_type") == wanted_type and str(item.get("text", "")).strip() == wanted_text] if exact: return min( exact, key=lambda item: abs(int(item.get("x", 0)) - wanted_x) + abs(int(item.get("y", 0)) - wanted_y), ) same_type = [item for item in candidates if item.get("action_type") == wanted_type] if same_type: return min( same_type, key=lambda item: abs(int(item.get("x", 0)) - wanted_x) + abs(int(item.get("y", 0)) - wanted_y), ) return None def capture_current_variant( cdp_port: int, baseline_url: str, mirror_root: Path, downloaded_assets: dict[str, Path], asset_failures: list[dict[str, str]], asset_failure_urls: set[str], action: dict[str, Any], action_number: int, ) -> dict[str, Any]: html = mirror.get_rendered_html(cdp_port) state = collect_view_state(cdp_port) current_url = state.get("href") or baseline_url base_dir = mirror.build_page_dir(mirror_root, baseline_url) variant_dir = base_dir / f"__action_{action['action_type']}_{action_number:02d}" variant_dir.mkdir(parents=True, exist_ok=True) discovered_assets = mirror.discover_asset_urls(html, current_url) pending_assets = deque(sorted(discovered_assets)) local_asset_map: dict[str, Path] = {} while pending_assets: asset_url = pending_assets.popleft() if asset_url in downloaded_assets: local_asset_map[asset_url] = downloaded_assets[asset_url] continue existing_path = mirror.build_asset_path(mirror_root, asset_url) if existing_path.exists(): downloaded_assets[asset_url] = existing_path local_asset_map[asset_url] = existing_path continue try: payload = mirror.fetch_url(asset_url) except Exception as exc: # noqa: BLE001 if asset_url not in asset_failure_urls: asset_failures.append({"url": asset_url, "error": str(exc)}) asset_failure_urls.add(asset_url) continue local_path = mirror.build_asset_path(mirror_root, asset_url) mirror.safe_write_bytes(local_path, payload) downloaded_assets[asset_url] = local_path local_asset_map[asset_url] = local_path if local_path.suffix.lower() == ".css": css_text = payload.decode("utf-8", errors="ignore") for nested in sorted(mirror.discover_css_urls(css_text, asset_url)): if nested not in downloaded_assets: pending_assets.append(nested) for asset_url, local_path in list(local_asset_map.items()): if local_path.suffix.lower() != ".css": continue css_text = local_path.read_text(encoding="utf-8", errors="ignore") rewritten = mirror.rewrite_css_urls(css_text, asset_url, local_path, downloaded_assets) mirror.safe_write_text(local_path, rewritten) rewritten_html = mirror.rewrite_html_asset_urls(html, current_url, variant_dir, mirror_root, downloaded_assets) html_path = variant_dir / "index.html" mirror.safe_write_text(html_path, rewritten_html) metadata = { "source_url": baseline_url, "final_url": current_url, "title": state.get("title", ""), "action_type": action.get("action_type"), "action_text": action.get("text"), "action_id": action.get("id"), "overlay_titles": state.get("overlayTitles", []), "html_path": str(html_path.relative_to(mirror_root)), } mirror.safe_write_text(variant_dir / "action.json", json.dumps(metadata, ensure_ascii=False, indent=2)) return metadata def load_seed_urls(args: argparse.Namespace) -> list[str]: seed_urls = list(args.seed_url or []) if args.seed_file: seed_path = Path(args.seed_file) seed_urls.extend( line.strip() for line in seed_path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ) if not seed_urls: seed_urls = list(DEFAULT_SEED_URLS) normalized: list[str] = [] seen: set[str] = set() for raw in seed_urls: url = mirror.normalize_internal_url(raw, raw, args.host) or raw if url in seen: continue seen.add(url) normalized.append(url) return normalized def capture_actions(args: argparse.Namespace) -> int: mirror_root = Path(args.output_dir).resolve() mirror_root.mkdir(parents=True, exist_ok=True) downloaded_assets: dict[str, Path] = {} asset_failures: list[dict[str, str]] = [] asset_failure_urls: set[str] = set() page_failures: list[dict[str, str]] = [] captured_actions: list[dict[str, Any]] = [] seed_urls = load_seed_urls(args) for page_index, url in enumerate(seed_urls, start=1): print(f"[page] {page_index:02d} {url}", flush=True) try: open_fresh_page(args.cdp_port, url, args.timeout) dismiss_global_notices(args.cdp_port) baseline = collect_view_state(args.cdp_port) candidates = sanitize_action_candidates(list_action_candidates(args.cdp_port), args.per_action_limit) if not candidates: print(" [skip] no visible add/edit action found", flush=True) continue for action_number, action in enumerate(candidates, start=1): print( f" [action] {ACTION_LABELS.get(action['action_type'], action['action_type'])} -> {action['text']}", flush=True, ) open_fresh_page(args.cdp_port, url, args.timeout) dismiss_global_notices(args.cdp_port) baseline = collect_view_state(args.cdp_port) resolved = resolve_action_candidate(args.cdp_port, action) if not resolved: print(" [skip] action not found after page reset", flush=True) continue action = resolved click_action_candidate(args.cdp_port, action["id"]) changed = wait_for_action_change(args.cdp_port, baseline, args.timeout) if not changed: print(" [skip] no stable UI change after click", flush=True) continue metadata = capture_current_variant( args.cdp_port, url, mirror_root, downloaded_assets, asset_failures, asset_failure_urls, action, action_number, ) captured_actions.append(metadata) print(f" [saved] {metadata['html_path']}", flush=True) except Exception as exc: # noqa: BLE001 page_failures.append({"url": url, "error": str(exc)}) print(f" [fail] {exc}", flush=True) manifest = { "host": args.host, "captured_action_states": len(captured_actions), "downloaded_assets": len(downloaded_assets), "page_failures": page_failures, "asset_failures": asset_failures, "actions": captured_actions, } manifest_path = mirror_root / f"action-manifest-{args.host}.json" mirror.safe_write_text(manifest_path, json.dumps(manifest, ensure_ascii=False, indent=2)) print( f"[done] actions={len(captured_actions)} assets={len(downloaded_assets)} " f"page_failures={len(page_failures)} asset_failures={len(asset_failures)}", flush=True, ) return 0 if not page_failures else 1 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Capture add/edit action states from logged-in Chrome") parser.add_argument("--host", default=DEFAULT_HOST, help="Target host") parser.add_argument("--cdp-port", type=int, default=9223, help="Chrome remote debugging port") parser.add_argument("--output-dir", default=".", help="Mirror output directory") parser.add_argument("--timeout", type=int, default=20, help="Wait time per page/action in seconds") parser.add_argument("--seed-url", action="append", default=[], help="Seed page URL, repeatable") parser.add_argument("--seed-file", help="Optional text file containing seed URLs") parser.add_argument("--per-action-limit", type=int, default=1, help="Capture count per action type on each page") return parser def main() -> int: if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") if hasattr(sys.stderr, "reconfigure"): sys.stderr.reconfigure(encoding="utf-8") parser = build_parser() args = parser.parse_args() try: return capture_actions(args) except Exception as exc: # noqa: BLE001 print(f"[error] {exc}") return 1 if __name__ == "__main__": raise SystemExit(main())