#!/usr/bin/env python3 """复用已登录 Chrome 会话抓取站内页面并按路径落盘。""" from __future__ import annotations import argparse import hashlib import json import os import re import subprocess import sys import time from collections import deque from html import unescape from pathlib import Path from typing import Any from urllib.parse import urljoin, urlparse from urllib.request import Request, urlopen DEFAULT_HOST = "hc-etms.sqygj.cn" DEFAULT_CDP_PORT = 9222 KNOWN_MIRROR_HOSTS = {"hc-etms.sqygj.cn", "hc-pos.sqygj.cn"} USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36" ) STATIC_HELPER_CSS = "/__mirror/static-mirror.css" STATIC_HELPER_JS = "/__mirror/static-mirror.js" def run_agent_browser(cdp_port: int, *args: str) -> str: if not args: raise RuntimeError("missing CDP command") node_script = r""" const [, mode, portRaw, payload = ""] = process.argv; const cdpPort = Number(portRaw); function fail(message) { console.error(message); process.exit(1); } async function choosePageTarget() { const response = await fetch(`http://127.0.0.1:${cdpPort}/json`); if (!response.ok) { throw new Error(`failed to query CDP targets: ${response.status}`); } const targets = await response.json(); const pages = targets.filter((target) => target.type === "page"); const preferred = pages.find((target) => { try { const current = new URL(target.url); return ["hc-pos.sqygj.cn", "hc-etms.sqygj.cn"].includes(current.host); } catch (_error) { return false; } }); return preferred || pages[0] || null; } async function main() { const target = await choosePageTarget(); if (!target || !target.webSocketDebuggerUrl) { throw new Error("no debuggable page target found"); } const ws = new WebSocket(target.webSocketDebuggerUrl); let seq = 0; const pending = new Map(); ws.addEventListener("message", (event) => { const message = JSON.parse(event.data); if (!message.id || !pending.has(message.id)) { return; } const current = pending.get(message.id); pending.delete(message.id); if (message.error) { current.reject(new Error(JSON.stringify(message.error))); return; } current.resolve(message.result || {}); }); await new Promise((resolve, reject) => { ws.addEventListener("open", () => resolve(), { once: true }); ws.addEventListener("error", (event) => reject(event.error || new Error("websocket error")), { once: true }); }); const send = (method, params = {}) => new Promise((resolve, reject) => { const id = ++seq; pending.set(id, { resolve, reject }); ws.send(JSON.stringify({ id, method, params })); }); if (mode === "open") { await send("Page.enable"); await send("Page.navigate", { url: payload }); console.log(JSON.stringify({ ok: true, url: payload })); ws.close(); return; } if (mode === "eval") { await send("Runtime.enable"); const result = await send("Runtime.evaluate", { expression: payload, awaitPromise: true, returnByValue: true, }); const value = result.result ? result.result.value : null; console.log(typeof value === "string" ? value : JSON.stringify(value)); ws.close(); return; } throw new Error(`unsupported CDP command: ${mode}`); } main().catch((error) => fail(error && error.message ? error.message : String(error))); """ cmd = ["node", "-e", node_script, args[0], str(cdp_port), args[1] if len(args) > 1 else ""] proc = subprocess.run( cmd, text=True, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if proc.returncode != 0: raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "agent-browser 执行失败") return proc.stdout.strip() def parse_json_output(raw: str) -> Any: value: Any = raw.strip() for _ in range(2): try: value = json.loads(value) except (json.JSONDecodeError, TypeError): break return value def eval_json(cdp_port: int, script: str) -> Any: raw = run_agent_browser(cdp_port, "eval", script) parsed = parse_json_output(raw) if isinstance(parsed, (dict, list)): return parsed if isinstance(parsed, str): return json.loads(parsed) raise RuntimeError(f"无法解析浏览器返回结果: {raw[:200]}") def current_page_snapshot(cdp_port: int) -> dict[str, Any]: script = r""" (() => { const app = document.querySelector("#app"); const anchors = [...document.querySelectorAll("a[href]")] .map((a) => ({ href: a.getAttribute("href"), text: (a.innerText || "").trim(), })) .filter((item) => item.href || item.text); return JSON.stringify({ href: location.href, title: document.title, readyState: document.readyState, appHtmlLen: app ? app.innerHTML.length : 0, appTextLen: app ? (app.innerText || "").trim().length : 0, loadingMasks: document.querySelectorAll(".el-loading-mask").length, anchors, }); })() """ return eval_json(cdp_port, script) def get_rendered_html(cdp_port: int) -> str: script = r"JSON.stringify(document.documentElement.outerHTML)" raw = run_agent_browser(cdp_port, "eval", script) html = parse_json_output(raw) if not isinstance(html, str): raise RuntimeError("无法获取页面 HTML") return html def normalize_internal_url(raw_url: str | None, current_url: str, allowed_host: str) -> str | None: if not raw_url: return None candidate = raw_url.strip() if not candidate: return None lowered = candidate.lower() if lowered.startswith(("javascript:", "mailto:", "tel:", "data:", "blob:")): return None if candidate.startswith("#/"): return urljoin(f"https://{allowed_host}/", candidate) if candidate.startswith("#"): return None absolute = urljoin(current_url, candidate) parsed = urlparse(absolute) if parsed.netloc != allowed_host: return None if parsed.fragment.startswith("/"): fragment = parsed.fragment elif parsed.fragment: return None else: fragment = "" clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path or '/'}" if parsed.query: clean += f"?{parsed.query}" if fragment: clean += f"#{fragment}" return clean def build_page_dir(root: Path, page_url: str) -> Path: parsed = urlparse(page_url) host_dir = root / parsed.netloc fragment = parsed.fragment route = "" route_query = "" if fragment.startswith("/"): route = fragment[1:] if "?" in route: route, route_query = route.split("?", 1) elif parsed.path not in ("", "/"): route = parsed.path.lstrip("/") if parsed.query: route_query = parsed.query target = host_dir if route: target = host_dir.joinpath(*[part for part in route.split("/") if part]) if route_query: digest = hashlib.sha1(route_query.encode("utf-8")).hexdigest()[:10] target = target / f"__query_{digest}" return target def build_asset_path(root: Path, asset_url: str) -> Path: parsed = urlparse(asset_url) host_dir = root / parsed.netloc path = parsed.path or "/" if path.endswith("/"): path = f"{path}index" local_path = host_dir / path.lstrip("/") if parsed.query: digest = hashlib.sha1(parsed.query.encode("utf-8")).hexdigest()[:10] suffix = local_path.suffix if suffix: local_path = local_path.with_name(f"{local_path.stem}__q_{digest}{suffix}") else: local_path = local_path.with_name(f"{local_path.name}__q_{digest}") return local_path def safe_write_text(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def safe_write_bytes(path: Path, content: bytes) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(content) def fetch_url(url: str, timeout: int = 30) -> bytes: req = Request(url, headers={"User-Agent": USER_AGENT}) with urlopen(req, timeout=timeout) as response: return response.read() def discover_asset_urls(html: str, page_url: str) -> set[str]: asset_urls: set[str] = set() attr_pattern = re.compile( r"""(?Phref|src|poster)=["'](?P[^"'#][^"']*)["']""", re.IGNORECASE, ) for match in attr_pattern.finditer(html): candidate = match.group("url").strip() if candidate.lower().startswith(("javascript:", "data:", "mailto:", "tel:", "blob:")): continue absolute = urljoin(page_url, candidate) parsed = urlparse(absolute) if parsed.scheme not in ("http", "https"): continue asset_urls.add(absolute) return asset_urls def build_virtual_route_url(host: str, route: str) -> str: clean_route = route.lstrip("/") if not clean_route: clean_route = "dashboard" return f"https://{host}/#/{clean_route}" def resolve_navigation_absolute_url(raw_href: str, current_url: str) -> str | None: href = unescape(raw_href).strip() if not href: return None current_host = urlparse(current_url).netloc if href in {"#", "./", "."}: return current_url if href.startswith("#/"): return build_virtual_route_url(current_host, href[2:]) absolute = urljoin(current_url, href) parsed = urlparse(absolute) if parsed.netloc not in KNOWN_MIRROR_HOSTS: return None if parsed.fragment.startswith("/"): return build_virtual_route_url(parsed.netloc, parsed.fragment[1:]) if parsed.path in ("", "/") and not parsed.query: return build_virtual_route_url(parsed.netloc, "dashboard") return absolute def resolve_local_navigation_target( raw_href: str, current_url: str, mirror_root: Path, route_map: dict[str, Path] | None = None, ) -> Path | None: href = unescape(raw_href).strip() current_host = urlparse(current_url).netloc if current_host == "hc-etms.sqygj.cn" and href == "#/goToProject": return build_page_dir(mirror_root, "https://hc-pos.sqygj.cn/#/dashboard") nav_url = resolve_navigation_absolute_url(raw_href, current_url) if not nav_url: return None if route_map and nav_url in route_map: return route_map[nav_url] return build_page_dir(mirror_root, nav_url) def rewrite_html_navigation_urls( html: str, current_url: str, page_dir: Path, mirror_root: Path, route_map: dict[str, Path] | None = None, ) -> str: def replace_anchor_href(match: re.Match[str]) -> str: prefix = match.group("prefix") quote = match.group("quote") original = match.group("url") target_dir = resolve_local_navigation_target(original, current_url, mirror_root, route_map) if not target_dir: return match.group(0) relative = os.path.relpath(target_dir, page_dir).replace(os.sep, "/") if relative == ".": relative = "./" elif not relative.endswith("/"): relative = f"{relative}/" return f"{prefix}{quote}{relative}{quote}" return re.sub( r"""(?P]*\shref=)(?P["'])(?P[^"']+)(?P=quote)""", replace_anchor_href, html, flags=re.IGNORECASE, ) def inject_static_helper_tags(html: str) -> str: css_tag = f'' js_tag = f'' if css_tag not in html: if "" in html: html = html.replace("", f"{css_tag}", 1) else: html = f"{css_tag}{html}" if js_tag not in html: if "" in html: html = html.replace("", f"{js_tag}", 1) else: html = f"{html}{js_tag}" return html def rewrite_html_asset_urls( html: str, page_url: str, page_dir: Path, mirror_root: Path, downloaded_assets: dict[str, Path], ) -> str: def replace_attr(match: re.Match[str]) -> str: attr = match.group("attr") original = unescape(match.group("url")) absolute = urljoin(page_url, original) local_path = downloaded_assets.get(absolute) if not local_path: return match.group(0) relative = os.path.relpath(local_path, page_dir).replace(os.sep, "/") quote = match.group("quote") return f'{attr}={quote}{relative}{quote}' html = re.sub( r"""(?Phref|src|poster)=(?P["'])(?P[^"']+)(?P=quote)""", replace_attr, html, flags=re.IGNORECASE, ) # 纯静态快照不保留脚本,避免本地打开后重新接管 DOM。 html = re.sub( r"]*>.*?", "", html, flags=re.IGNORECASE | re.DOTALL, ) html = rewrite_html_navigation_urls(html, page_url, page_dir, mirror_root) html = inject_static_helper_tags(html) return html def discover_css_urls(css_text: str, css_url: str) -> set[str]: found: set[str] = set() for raw in re.findall(r"url\(([^)]+)\)", css_text, flags=re.IGNORECASE): candidate = raw.strip().strip("\"'") if not candidate or candidate.startswith("data:"): continue absolute = urljoin(css_url, candidate) parsed = urlparse(absolute) if parsed.scheme not in ("http", "https"): continue found.add(absolute) return found def rewrite_css_urls(css_text: str, css_url: str, css_path: Path, downloaded_assets: dict[str, Path]) -> str: def replace_url(match: re.Match[str]) -> str: raw = match.group(1).strip() candidate = raw.strip("\"'") absolute = urljoin(css_url, candidate) local_path = downloaded_assets.get(absolute) if not local_path: return match.group(0) relative = os.path.relpath(local_path, css_path.parent).replace(os.sep, "/") return f"url('{relative}')" return re.sub(r"url\(([^)]+)\)", replace_url, css_text, flags=re.IGNORECASE) def wait_for_page(cdp_port: int, expected_url: str, timeout: int) -> dict[str, Any]: start = time.time() last_len = -1 stable_ticks = 0 state: dict[str, Any] = {} while time.time() - start < timeout: time.sleep(1) state = current_page_snapshot(cdp_port) html_len = int(state.get("appHtmlLen", 0)) loading_masks = int(state.get("loadingMasks", 0)) if html_len > 500 and loading_masks == 0: stable_ticks = stable_ticks + 1 if html_len == last_len else 0 if stable_ticks >= 1: return state last_len = html_len return state def capture_page( cdp_port: int, url: str, allowed_host: str, mirror_root: Path, downloaded_assets: dict[str, Path], asset_failures: list[dict[str, str]], timeout: int, ) -> tuple[dict[str, Any], set[str]]: run_agent_browser(cdp_port, "open", url) state = wait_for_page(cdp_port, url, timeout) html = get_rendered_html(cdp_port) current_url = state.get("href") or url page_dir = build_page_dir(mirror_root, current_url) page_dir.mkdir(parents=True, exist_ok=True) discovered_assets = discover_asset_urls(html, current_url) pending_assets = deque(sorted(discovered_assets)) local_asset_map: dict[str, Path] = {} while pending_assets: asset_url = pending_assets.popleft() if asset_url in downloaded_assets: local_asset_map[asset_url] = downloaded_assets[asset_url] continue try: payload = fetch_url(asset_url) except Exception as exc: # noqa: BLE001 asset_failures.append({"url": asset_url, "error": str(exc)}) continue local_path = build_asset_path(mirror_root, asset_url) safe_write_bytes(local_path, payload) downloaded_assets[asset_url] = local_path local_asset_map[asset_url] = local_path content_type = local_path.suffix.lower() if content_type == ".css": css_text = payload.decode("utf-8", errors="ignore") nested_assets = discover_css_urls(css_text, asset_url) for nested in sorted(nested_assets): if nested not in downloaded_assets: pending_assets.append(nested) for asset_url, local_path in list(local_asset_map.items()): if local_path.suffix.lower() != ".css": continue css_text = local_path.read_text(encoding="utf-8", errors="ignore") rewritten = rewrite_css_urls(css_text, asset_url, local_path, downloaded_assets) safe_write_text(local_path, rewritten) rewritten_html = rewrite_html_asset_urls(html, current_url, page_dir, mirror_root, downloaded_assets) html_path = page_dir / "index.html" safe_write_text(html_path, rewritten_html) next_urls: set[str] = set() for anchor in state.get("anchors", []): next_url = normalize_internal_url(anchor.get("href"), current_url, allowed_host) if next_url: next_urls.add(next_url) page_record = { "source_url": url, "final_url": current_url, "title": state.get("title", ""), "html_path": str(html_path.relative_to(mirror_root)), "anchor_count": len(state.get("anchors", [])), "app_html_len": state.get("appHtmlLen", 0), } return page_record, next_urls def crawl_site(args: argparse.Namespace) -> int: mirror_root = Path(args.output_dir).resolve() mirror_root.mkdir(parents=True, exist_ok=True) wait_for_page(args.cdp_port, "", args.timeout) seed = current_page_snapshot(args.cdp_port) start_url = seed.get("href") if not isinstance(start_url, str) or args.host not in start_url: raise RuntimeError("当前浏览器标签页不在目标站点上,请先切到已登录页面。") discovered: deque[str] = deque() seen: set[str] = set() if args.seed_url: for seed_url in args.seed_url: normalized = normalize_internal_url(seed_url, seed_url, args.host) or seed_url if normalized not in seen: discovered.append(normalized) else: start_normalized = normalize_internal_url(start_url, start_url, args.host) or start_url discovered.append(start_normalized) for anchor in seed.get("anchors", []): normalized = normalize_internal_url(anchor.get("href"), start_url, args.host) if normalized and normalized not in seen: discovered.append(normalized) downloaded_assets: dict[str, Path] = {} asset_failures: list[dict[str, str]] = [] page_failures: list[dict[str, str]] = [] pages: list[dict[str, Any]] = [] while discovered and len(seen) < args.max_pages: target = discovered.popleft() if target in seen: continue seen.add(target) print(f"[页面] {len(seen):03d} {target}", flush=True) try: page_record, next_urls = capture_page( args.cdp_port, target, args.host, mirror_root, downloaded_assets, asset_failures, args.timeout, ) pages.append(page_record) if not args.no_discover: for next_url in sorted(next_urls): if next_url not in seen: discovered.append(next_url) except Exception as exc: # noqa: BLE001 page_failures.append({"url": target, "error": str(exc)}) print(f"[失败] {target} -> {exc}", flush=True) manifest = { "host": args.host, "captured_pages": len(pages), "downloaded_assets": len(downloaded_assets), "page_failures": page_failures, "asset_failures": asset_failures, "pages": pages, } manifest_name = f"mirror-manifest-{args.host}.json" safe_write_text(mirror_root / manifest_name, json.dumps(manifest, ensure_ascii=False, indent=2)) print( f"[完成] 页面 {len(pages)} 个,资源 {len(downloaded_assets)} 个," f"页面失败 {len(page_failures)} 个,资源失败 {len(asset_failures)} 个", flush=True, ) return 0 def reconstruct_page_url_from_path(html_path: Path, mirror_root: Path) -> str | None: try: rel = html_path.relative_to(mirror_root) except ValueError: return None if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS: return None host = rel.parts[0] route_parts = list(rel.parts[1:-1]) if route_parts and route_parts[-1].startswith("__query_"): route_parts = route_parts[:-1] route = "/".join(route_parts).strip("/") return build_virtual_route_url(host, route) def load_route_map_from_manifests(mirror_root: Path) -> dict[str, Path]: route_map: dict[str, Path] = {} for manifest_path in sorted(mirror_root.glob("mirror-manifest-*.json")): data = json.loads(manifest_path.read_text(encoding="utf-8")) for item in data.get("pages", []): html_path = mirror_root / item["html_path"] page_dir = html_path.parent source_url = item.get("source_url") final_url = item.get("final_url") if source_url: route_map[source_url] = page_dir if final_url and final_url not in route_map: route_map[final_url] = page_dir hc_pos_dashboard = mirror_root / "hc-pos.sqygj.cn" / "dataPlatform" / "home" if (hc_pos_dashboard / "index.html").exists(): route_map["https://hc-pos.sqygj.cn/#/r2cockpit"] = hc_pos_dashboard return route_map def synthesize_route_url_from_target_dir(target_dir: Path, mirror_root: Path) -> str | None: try: rel = target_dir.relative_to(mirror_root) except ValueError: return None if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS: return None host = rel.parts[0] route = "/".join(rel.parts[1:]) return build_virtual_route_url(host, route) def sanitize_relative_anchor_links( html: str, html_path: Path, mirror_root: Path, route_map: dict[str, Path] | None = None, ) -> str: try: rel = html_path.relative_to(mirror_root) except ValueError: return html if not rel.parts: return html host = rel.parts[0] fallback_dir = mirror_root / host / "404" fallback_exists = (fallback_dir / "index.html").exists() page_dir = html_path.parent def replace_anchor_href(match: re.Match[str]) -> str: prefix = match.group("prefix") quote = match.group("quote") href = match.group("url") if not href.startswith(("./", "../")): return match.group(0) target_dir = (page_dir / href).resolve() target_file = target_dir / "index.html" if target_file.exists(): return match.group(0) if route_map: synthetic_url = synthesize_route_url_from_target_dir(target_dir, mirror_root) mapped_dir = route_map.get(synthetic_url) if synthetic_url else None if mapped_dir and (mapped_dir / "index.html").exists(): relative = os.path.relpath(mapped_dir, page_dir).replace(os.sep, "/") if relative == ".": relative = "./" elif not relative.endswith("/"): relative = f"{relative}/" return f"{prefix}{quote}{relative}{quote}" if fallback_exists: relative = os.path.relpath(fallback_dir, page_dir).replace(os.sep, "/") if relative == ".": relative = "./" elif not relative.endswith("/"): relative = f"{relative}/" return f"{prefix}{quote}{relative}{quote}" return match.group(0) return re.sub( r"""(?P]*\shref=)(?P["'])(?P[^"']+)(?P=quote)""", replace_anchor_href, html, flags=re.IGNORECASE, ) def rewrite_existing_html_links(mirror_root: Path) -> int: route_map = load_route_map_from_manifests(mirror_root) rewritten_count = 0 for html_path in mirror_root.rglob("index.html"): page_url = reconstruct_page_url_from_path(html_path, mirror_root) if not page_url: continue original = html_path.read_text(encoding="utf-8", errors="ignore") updated = rewrite_html_navigation_urls(original, page_url, html_path.parent, mirror_root, route_map) updated = sanitize_relative_anchor_links(updated, html_path, mirror_root, route_map) updated = inject_static_helper_tags(updated) if updated != original: safe_write_text(html_path, updated) rewritten_count += 1 return rewritten_count def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="复用已登录 Chrome 会话镜像站点") parser.add_argument("--host", default=DEFAULT_HOST, help="限制抓取的主机名") parser.add_argument("--cdp-port", type=int, default=DEFAULT_CDP_PORT, help="Chrome 远程调试端口") parser.add_argument("--output-dir", default=".", help="抓取结果输出目录") parser.add_argument("--max-pages", type=int, default=140, help="最多抓取页面数") parser.add_argument("--timeout", type=int, default=20, help="单页等待秒数") parser.add_argument("--seed-url", action="append", default=[], help="指定起始 URL,可重复传入") parser.add_argument("--seed-file", help="从文本文件读取起始 URL,每行一个") parser.add_argument("--no-discover", action="store_true", help="只抓取种子 URL,不继续从页面内发现新链接") parser.add_argument("--rewrite-links-only", action="store_true", help="仅重写现有 HTML 的本地导航链接") return parser def main() -> int: parser = build_parser() args = parser.parse_args() try: if args.seed_file: seed_path = Path(args.seed_file) seed_urls = [ line.strip() for line in seed_path.read_text(encoding="utf-8").splitlines() if line.strip() and not line.strip().startswith("#") ] args.seed_url.extend(seed_urls) if args.rewrite_links_only: count = rewrite_existing_html_links(Path(args.output_dir).resolve()) print(f"[完成] 已重写 {count} 个 HTML 文件的本地导航链接") return 0 return crawl_site(args) except Exception as exc: # noqa: BLE001 print(f"[错误] {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())