Files
wysite/capture_action_states.py

550 lines
21 KiB
Python

#!/usr/bin/env python3
"""Capture safe UI states behind add/edit actions from a logged-in Chrome session."""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import deque
from pathlib import Path
from typing import Any
import mirror_logged_in_site as mirror
DEFAULT_HOST = "hc-pos.sqygj.cn"
DEFAULT_SEED_URLS = [
"https://hc-pos.sqygj.cn/#/propertySMG/basicManagement/basicsInfo",
"https://hc-pos.sqygj.cn/#/propertySMG/basicManagement/communityBasicsInfo",
"https://hc-pos.sqygj.cn/#/communitySMG/serviceProvider",
"https://hc-pos.sqygj.cn/#/propertySMG/businessTaxCank/revenueManage/price",
"https://hc-pos.sqygj.cn/#/propertySMG/businessTaxCank/revenueManage/payer",
"https://hc-pos.sqygj.cn/#/propertySMG/contractManage/contractList",
"https://hc-pos.sqygj.cn/#/propertySMG/elevatorManage/elevatorFile/elevatorArchives",
"https://hc-pos.sqygj.cn/#/propertySMG/equipmentManage/equipmentFiling/equipmentArchives",
"https://hc-pos.sqygj.cn/#/propertySMG/customerOperations/contentOperation/newOperation",
"https://hc-pos.sqygj.cn/#/propertySMG/customerOperations/customerServices/activeService/activeServicePlan",
]
ACTION_LABELS = {
"new": "新增",
"edit": "编辑",
}
def eval_json(cdp_port: int, script: str) -> Any:
return mirror.eval_json(cdp_port, script)
def collect_view_state(cdp_port: int) -> dict[str, Any]:
script = r"""
(() => {
const app = document.querySelector("#app");
const visible = (el) => {
if (!el) return false;
const rect = el.getBoundingClientRect();
const style = getComputedStyle(el);
return (
rect.width > 0 &&
rect.height > 0 &&
rect.bottom > 0 &&
rect.right > 0 &&
rect.left < window.innerWidth &&
rect.top < window.innerHeight &&
style.display !== "none" &&
style.visibility !== "hidden"
);
};
const titles = [...document.querySelectorAll(".el-dialog__title, .el-drawer__header, .drawer-title, .dialog-title, .el-message-box__title")]
.map((el) => (el.innerText || "").trim())
.filter(Boolean)
.slice(0, 8);
const visibleOverlayTitles = [...document.querySelectorAll(".el-dialog__title, .el-drawer__header, .drawer-title, .dialog-title, .el-message-box__title")]
.filter((el) => visible(el) || visible(el.closest(".el-dialog, .el-dialog__wrapper, .el-drawer, .el-message-box__wrapper, .el-message-box")))
.map((el) => (el.innerText || "").trim())
.filter(Boolean)
.slice(0, 8);
const visibleMessages = [...document.querySelectorAll(".el-message, .el-notification, .el-message-box__message")]
.filter((el) => visible(el) || visible(el.closest(".el-message-box__wrapper, .el-message-box")))
.map((el) => (el.innerText || el.textContent || "").replace(/\s+/g, " ").trim())
.filter(Boolean)
.slice(0, 8);
const visibleButtons = [...document.querySelectorAll("button, a, [role='button'], .el-button")]
.filter((el) => visible(el))
.map((el) => (el.innerText || el.textContent || "").replace(/\s+/g, " ").trim())
.filter((text) => text && text.length <= 16)
.slice(0, 40);
const forms = [...document.querySelectorAll("form, .el-form")]
.filter((el) => visible(el))
.length;
const dialogs = [...document.querySelectorAll(".el-dialog__wrapper, .el-dialog, .el-drawer, .el-message-box__wrapper, .el-message-box")]
.filter((el) => visible(el))
.length;
const routeParts = location.href.split("#/");
return JSON.stringify({
href: location.href,
title: document.title,
route: routeParts.length > 1 ? routeParts[1] : "",
appHtmlLen: app ? app.innerHTML.length : 0,
appTextLen: app ? (app.innerText || "").trim().length : 0,
loadingMasks: [...document.querySelectorAll(".el-loading-mask")].filter((el) => visible(el)).length,
visibleForms: forms,
visibleDialogs: dialogs,
overlayTitles: titles,
visibleOverlayTitles,
visibleMessages,
visibleButtons,
});
})()
"""
return eval_json(cdp_port, script)
def wait_for_route_ready(cdp_port: int, expected_url: str, timeout: int) -> dict[str, Any]:
start = time.time()
last_signature: tuple[Any, ...] | None = None
stable_ticks = 0
while time.time() - start < timeout:
time.sleep(1)
state = collect_view_state(cdp_port)
signature = (
state.get("href"),
state.get("loadingMasks"),
state.get("appHtmlLen"),
tuple(state.get("visibleButtons", [])),
)
ready = state.get("href") == expected_url and int(state.get("loadingMasks", 0)) == 0 and int(state.get("appHtmlLen", 0)) > 1000
if ready:
stable_ticks = stable_ticks + 1 if signature == last_signature else 0
if stable_ticks >= 1:
return state
else:
stable_ticks = 0
last_signature = signature
return collect_view_state(cdp_port)
def open_fresh_page(cdp_port: int, url: str, timeout: int) -> None:
dashboard_url = f"https://{DEFAULT_HOST}/#/dashboard"
if url != dashboard_url:
mirror.run_agent_browser(cdp_port, "open", dashboard_url)
wait_for_route_ready(cdp_port, dashboard_url, timeout)
mirror.run_agent_browser(cdp_port, "open", url)
wait_for_route_ready(cdp_port, url, timeout)
def dismiss_global_notices(cdp_port: int) -> None:
script = r"""
(() => {
const visible = (el) => {
if (!el) return false;
const rect = el.getBoundingClientRect();
const style = getComputedStyle(el);
return rect.width > 0 && rect.height > 0 && style.display !== "none" && style.visibility !== "hidden";
};
let closed = 0;
for (const dialog of document.querySelectorAll(".el-dialog__wrapper, .el-dialog")) {
const text = (dialog.innerText || "").replace(/\s+/g, " ").trim();
if (!visible(dialog)) continue;
if (!/(站内通知|公告详情)/.test(text)) continue;
const button = dialog.querySelector(".el-dialog__headerbtn, .el-button, button");
if (!button || !visible(button)) continue;
button.click();
closed += 1;
}
return JSON.stringify({ closed });
})()
"""
eval_json(cdp_port, script)
time.sleep(1)
def list_action_candidates(cdp_port: int) -> list[dict[str, Any]]:
script = r"""
(() => {
const visible = (el) => {
if (!el) return false;
const rect = el.getBoundingClientRect();
const style = getComputedStyle(el);
return (
rect.width >= 20 &&
rect.height >= 16 &&
rect.bottom > 0 &&
rect.right > 0 &&
rect.left < window.innerWidth &&
rect.top < window.innerHeight &&
style.display !== "none" &&
style.visibility !== "hidden" &&
style.pointerEvents !== "none"
);
};
const normalize = (text) => (text || "").replace(/\s+/g, " ").trim();
const detectType = (text) => /(新增|新建|添加|创建|立即创建)/.test(text) ? "new" : "edit";
const result = [];
const seen = new Set();
let seq = 0;
for (const raw of document.querySelectorAll("button, a, [role='button'], .el-button, span, div")) {
const text = normalize(raw.innerText || raw.textContent || "");
if (!text || text.length > 24) continue;
if (!/(新增|新建|添加|编辑|修改|创建|立即创建)/.test(text)) continue;
if (/(保存|提交|删除|移除|取消|关闭|确认|导入|导出|下载|打印)/.test(text)) continue;
const target = raw.closest("button, a, [role='button'], .el-button") || raw;
if (!visible(target)) continue;
if (seen.has(target)) continue;
if (String(target.className || "").includes("tags-view-item")) continue;
const style = getComputedStyle(target);
const clickable = target.matches("button, a, [role='button'], .el-button") || style.cursor === "pointer" || typeof target.onclick === "function";
if (!clickable) continue;
const rect = target.getBoundingClientRect();
const id = `codex-action-${++seq}`;
target.setAttribute("data-codex-action-id", id);
seen.add(target);
result.push({
id,
text,
action_type: detectType(text),
tag: target.tagName,
class_name: String(target.className || ""),
x: Math.round(rect.left),
y: Math.round(rect.top),
w: Math.round(rect.width),
h: Math.round(rect.height),
});
}
result.sort((a, b) => (a.y - b.y) || (a.x - b.x));
return JSON.stringify(result);
})()
"""
raw = eval_json(cdp_port, script)
if not isinstance(raw, list):
raise RuntimeError("failed to enumerate action candidates")
return raw
def click_action_candidate(cdp_port: int, action_id: str) -> None:
script = f"""
(() => {{
const node = document.querySelector("[data-codex-action-id='{action_id}']");
if (!node) return JSON.stringify({{"clicked": false, "reason": "missing"}})
node.scrollIntoView({{block: "center", inline: "center"}});
const rect = node.getBoundingClientRect();
const point = {{
clientX: rect.left + Math.min(rect.width / 2, Math.max(8, rect.width - 8)),
clientY: rect.top + Math.min(rect.height / 2, Math.max(8, rect.height - 8)),
bubbles: true,
}};
node.dispatchEvent(new MouseEvent("mouseenter", point));
node.dispatchEvent(new MouseEvent("mousedown", point));
node.dispatchEvent(new MouseEvent("mouseup", point));
node.click();
return JSON.stringify({{"clicked": true}});
}})()
"""
result = eval_json(cdp_port, script)
if not isinstance(result, dict) or not result.get("clicked"):
raise RuntimeError(f"failed to click action {action_id}: {result}")
def wait_for_action_change(cdp_port: int, baseline: dict[str, Any], timeout: int) -> dict[str, Any] | None:
start = time.time()
stable_ticks = 0
baseline_href = baseline.get("href")
baseline_dialogs = int(baseline.get("visibleDialogs", 0))
baseline_forms = int(baseline.get("visibleForms", 0))
baseline_len = int(baseline.get("appHtmlLen", 0))
baseline_buttons = tuple(baseline.get("visibleButtons", []))
last_signature: tuple[Any, ...] | None = None
while time.time() - start < timeout:
time.sleep(1)
current = collect_view_state(cdp_port)
signature = (
current.get("href"),
current.get("visibleDialogs"),
current.get("visibleForms"),
current.get("appHtmlLen"),
tuple(current.get("visibleButtons", [])),
tuple(current.get("overlayTitles", [])),
)
if current.get("loadingMasks", 0):
last_signature = signature
stable_ticks = 0
continue
changed = (
current.get("href") != baseline_href
or int(current.get("visibleDialogs", 0)) > baseline_dialogs
or int(current.get("visibleForms", 0)) > baseline_forms
or abs(int(current.get("appHtmlLen", 0)) - baseline_len) > 200
or tuple(current.get("visibleButtons", [])) != baseline_buttons
)
if changed:
stable_ticks = stable_ticks + 1 if signature == last_signature else 0
if stable_ticks >= 1:
return current
last_signature = signature
return None
def sanitize_action_candidates(
candidates: list[dict[str, Any]],
per_action_limit: int,
) -> list[dict[str, Any]]:
selected: list[dict[str, Any]] = []
counts = {"new": 0, "edit": 0}
seen_keys: set[tuple[str, str, int, int]] = set()
for candidate in candidates:
action_type = candidate.get("action_type")
text = str(candidate.get("text", "")).strip()
if action_type not in counts or not text:
continue
if counts[action_type] >= per_action_limit:
continue
key = (action_type, text, int(candidate.get("x", 0)), int(candidate.get("y", 0)))
if key in seen_keys:
continue
seen_keys.add(key)
counts[action_type] += 1
selected.append(candidate)
return selected
def resolve_action_candidate(cdp_port: int, wanted: dict[str, Any]) -> dict[str, Any] | None:
candidates = list_action_candidates(cdp_port)
if not candidates:
return None
wanted_type = wanted.get("action_type")
wanted_text = str(wanted.get("text", "")).strip()
wanted_x = int(wanted.get("x", 0))
wanted_y = int(wanted.get("y", 0))
exact = [item for item in candidates if item.get("action_type") == wanted_type and str(item.get("text", "")).strip() == wanted_text]
if exact:
return min(
exact,
key=lambda item: abs(int(item.get("x", 0)) - wanted_x) + abs(int(item.get("y", 0)) - wanted_y),
)
same_type = [item for item in candidates if item.get("action_type") == wanted_type]
if same_type:
return min(
same_type,
key=lambda item: abs(int(item.get("x", 0)) - wanted_x) + abs(int(item.get("y", 0)) - wanted_y),
)
return None
def capture_current_variant(
cdp_port: int,
baseline_url: str,
mirror_root: Path,
downloaded_assets: dict[str, Path],
asset_failures: list[dict[str, str]],
asset_failure_urls: set[str],
action: dict[str, Any],
action_number: int,
) -> dict[str, Any]:
html = mirror.get_rendered_html(cdp_port)
state = collect_view_state(cdp_port)
current_url = state.get("href") or baseline_url
base_dir = mirror.build_page_dir(mirror_root, baseline_url)
variant_dir = base_dir / f"__action_{action['action_type']}_{action_number:02d}"
variant_dir.mkdir(parents=True, exist_ok=True)
discovered_assets = mirror.discover_asset_urls(html, current_url)
pending_assets = deque(sorted(discovered_assets))
local_asset_map: dict[str, Path] = {}
while pending_assets:
asset_url = pending_assets.popleft()
if asset_url in downloaded_assets:
local_asset_map[asset_url] = downloaded_assets[asset_url]
continue
existing_path = mirror.build_asset_path(mirror_root, asset_url)
if existing_path.exists():
downloaded_assets[asset_url] = existing_path
local_asset_map[asset_url] = existing_path
continue
try:
payload = mirror.fetch_url(asset_url)
except Exception as exc: # noqa: BLE001
if asset_url not in asset_failure_urls:
asset_failures.append({"url": asset_url, "error": str(exc)})
asset_failure_urls.add(asset_url)
continue
local_path = mirror.build_asset_path(mirror_root, asset_url)
mirror.safe_write_bytes(local_path, payload)
downloaded_assets[asset_url] = local_path
local_asset_map[asset_url] = local_path
if local_path.suffix.lower() == ".css":
css_text = payload.decode("utf-8", errors="ignore")
for nested in sorted(mirror.discover_css_urls(css_text, asset_url)):
if nested not in downloaded_assets:
pending_assets.append(nested)
for asset_url, local_path in list(local_asset_map.items()):
if local_path.suffix.lower() != ".css":
continue
css_text = local_path.read_text(encoding="utf-8", errors="ignore")
rewritten = mirror.rewrite_css_urls(css_text, asset_url, local_path, downloaded_assets)
mirror.safe_write_text(local_path, rewritten)
rewritten_html = mirror.rewrite_html_asset_urls(html, current_url, variant_dir, mirror_root, downloaded_assets)
html_path = variant_dir / "index.html"
mirror.safe_write_text(html_path, rewritten_html)
metadata = {
"source_url": baseline_url,
"final_url": current_url,
"title": state.get("title", ""),
"action_type": action.get("action_type"),
"action_text": action.get("text"),
"action_id": action.get("id"),
"overlay_titles": state.get("overlayTitles", []),
"html_path": str(html_path.relative_to(mirror_root)),
}
mirror.safe_write_text(variant_dir / "action.json", json.dumps(metadata, ensure_ascii=False, indent=2))
return metadata
def load_seed_urls(args: argparse.Namespace) -> list[str]:
seed_urls = list(args.seed_url or [])
if args.seed_file:
seed_path = Path(args.seed_file)
seed_urls.extend(
line.strip()
for line in seed_path.read_text(encoding="utf-8").splitlines()
if line.strip() and not line.strip().startswith("#")
)
if not seed_urls:
seed_urls = list(DEFAULT_SEED_URLS)
normalized: list[str] = []
seen: set[str] = set()
for raw in seed_urls:
url = mirror.normalize_internal_url(raw, raw, args.host) or raw
if url in seen:
continue
seen.add(url)
normalized.append(url)
return normalized
def capture_actions(args: argparse.Namespace) -> int:
mirror_root = Path(args.output_dir).resolve()
mirror_root.mkdir(parents=True, exist_ok=True)
downloaded_assets: dict[str, Path] = {}
asset_failures: list[dict[str, str]] = []
asset_failure_urls: set[str] = set()
page_failures: list[dict[str, str]] = []
captured_actions: list[dict[str, Any]] = []
seed_urls = load_seed_urls(args)
for page_index, url in enumerate(seed_urls, start=1):
print(f"[page] {page_index:02d} {url}", flush=True)
try:
open_fresh_page(args.cdp_port, url, args.timeout)
dismiss_global_notices(args.cdp_port)
baseline = collect_view_state(args.cdp_port)
candidates = sanitize_action_candidates(list_action_candidates(args.cdp_port), args.per_action_limit)
if not candidates:
print(" [skip] no visible add/edit action found", flush=True)
continue
for action_number, action in enumerate(candidates, start=1):
print(
f" [action] {ACTION_LABELS.get(action['action_type'], action['action_type'])} -> {action['text']}",
flush=True,
)
open_fresh_page(args.cdp_port, url, args.timeout)
dismiss_global_notices(args.cdp_port)
baseline = collect_view_state(args.cdp_port)
resolved = resolve_action_candidate(args.cdp_port, action)
if not resolved:
print(" [skip] action not found after page reset", flush=True)
continue
action = resolved
click_action_candidate(args.cdp_port, action["id"])
changed = wait_for_action_change(args.cdp_port, baseline, args.timeout)
if not changed:
print(" [skip] no stable UI change after click", flush=True)
continue
metadata = capture_current_variant(
args.cdp_port,
url,
mirror_root,
downloaded_assets,
asset_failures,
asset_failure_urls,
action,
action_number,
)
captured_actions.append(metadata)
print(f" [saved] {metadata['html_path']}", flush=True)
except Exception as exc: # noqa: BLE001
page_failures.append({"url": url, "error": str(exc)})
print(f" [fail] {exc}", flush=True)
manifest = {
"host": args.host,
"captured_action_states": len(captured_actions),
"downloaded_assets": len(downloaded_assets),
"page_failures": page_failures,
"asset_failures": asset_failures,
"actions": captured_actions,
}
manifest_path = mirror_root / f"action-manifest-{args.host}.json"
mirror.safe_write_text(manifest_path, json.dumps(manifest, ensure_ascii=False, indent=2))
print(
f"[done] actions={len(captured_actions)} assets={len(downloaded_assets)} "
f"page_failures={len(page_failures)} asset_failures={len(asset_failures)}",
flush=True,
)
return 0 if not page_failures else 1
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Capture add/edit action states from logged-in Chrome")
parser.add_argument("--host", default=DEFAULT_HOST, help="Target host")
parser.add_argument("--cdp-port", type=int, default=9223, help="Chrome remote debugging port")
parser.add_argument("--output-dir", default=".", help="Mirror output directory")
parser.add_argument("--timeout", type=int, default=20, help="Wait time per page/action in seconds")
parser.add_argument("--seed-url", action="append", default=[], help="Seed page URL, repeatable")
parser.add_argument("--seed-file", help="Optional text file containing seed URLs")
parser.add_argument("--per-action-limit", type=int, default=1, help="Capture count per action type on each page")
return parser
def main() -> int:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
parser = build_parser()
args = parser.parse_args()
try:
return capture_actions(args)
except Exception as exc: # noqa: BLE001
print(f"[error] {exc}")
return 1
if __name__ == "__main__":
raise SystemExit(main())