802 lines
27 KiB
Python
Executable File
802 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""复用已登录 Chrome 会话抓取站内页面并按路径落盘。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from collections import deque
|
||
from html import unescape
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib.parse import urljoin, urlparse
|
||
from urllib.request import Request, urlopen
|
||
|
||
|
||
DEFAULT_HOST = "hc-etms.sqygj.cn"
|
||
DEFAULT_CDP_PORT = 9222
|
||
KNOWN_MIRROR_HOSTS = {"hc-etms.sqygj.cn", "hc-pos.sqygj.cn"}
|
||
USER_AGENT = (
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
|
||
)
|
||
STATIC_HELPER_CSS = "/__mirror/static-mirror.css"
|
||
STATIC_HELPER_JS = "/__mirror/static-mirror.js"
|
||
|
||
|
||
def run_agent_browser(cdp_port: int, *args: str) -> str:
|
||
if not args:
|
||
raise RuntimeError("missing CDP command")
|
||
|
||
node_script = r"""
|
||
const [, mode, portRaw, payload = ""] = process.argv;
|
||
const cdpPort = Number(portRaw);
|
||
|
||
function fail(message) {
|
||
console.error(message);
|
||
process.exit(1);
|
||
}
|
||
|
||
async function choosePageTarget() {
|
||
const response = await fetch(`http://127.0.0.1:${cdpPort}/json`);
|
||
if (!response.ok) {
|
||
throw new Error(`failed to query CDP targets: ${response.status}`);
|
||
}
|
||
const targets = await response.json();
|
||
const pages = targets.filter((target) => target.type === "page");
|
||
const preferred = pages.find((target) => {
|
||
try {
|
||
const current = new URL(target.url);
|
||
return ["hc-pos.sqygj.cn", "hc-etms.sqygj.cn"].includes(current.host);
|
||
} catch (_error) {
|
||
return false;
|
||
}
|
||
});
|
||
return preferred || pages[0] || null;
|
||
}
|
||
|
||
async function main() {
|
||
const target = await choosePageTarget();
|
||
if (!target || !target.webSocketDebuggerUrl) {
|
||
throw new Error("no debuggable page target found");
|
||
}
|
||
|
||
const ws = new WebSocket(target.webSocketDebuggerUrl);
|
||
let seq = 0;
|
||
const pending = new Map();
|
||
|
||
ws.addEventListener("message", (event) => {
|
||
const message = JSON.parse(event.data);
|
||
if (!message.id || !pending.has(message.id)) {
|
||
return;
|
||
}
|
||
const current = pending.get(message.id);
|
||
pending.delete(message.id);
|
||
if (message.error) {
|
||
current.reject(new Error(JSON.stringify(message.error)));
|
||
return;
|
||
}
|
||
current.resolve(message.result || {});
|
||
});
|
||
|
||
await new Promise((resolve, reject) => {
|
||
ws.addEventListener("open", () => resolve(), { once: true });
|
||
ws.addEventListener("error", (event) => reject(event.error || new Error("websocket error")), { once: true });
|
||
});
|
||
|
||
const send = (method, params = {}) => new Promise((resolve, reject) => {
|
||
const id = ++seq;
|
||
pending.set(id, { resolve, reject });
|
||
ws.send(JSON.stringify({ id, method, params }));
|
||
});
|
||
|
||
if (mode === "open") {
|
||
await send("Page.enable");
|
||
await send("Page.navigate", { url: payload });
|
||
console.log(JSON.stringify({ ok: true, url: payload }));
|
||
ws.close();
|
||
return;
|
||
}
|
||
|
||
if (mode === "eval") {
|
||
await send("Runtime.enable");
|
||
const result = await send("Runtime.evaluate", {
|
||
expression: payload,
|
||
awaitPromise: true,
|
||
returnByValue: true,
|
||
});
|
||
const value = result.result ? result.result.value : null;
|
||
console.log(typeof value === "string" ? value : JSON.stringify(value));
|
||
ws.close();
|
||
return;
|
||
}
|
||
|
||
throw new Error(`unsupported CDP command: ${mode}`);
|
||
}
|
||
|
||
main().catch((error) => fail(error && error.message ? error.message : String(error)));
|
||
"""
|
||
|
||
cmd = ["node", "-e", node_script, args[0], str(cdp_port), args[1] if len(args) > 1 else ""]
|
||
proc = subprocess.run(
|
||
cmd,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
check=False,
|
||
)
|
||
if proc.returncode != 0:
|
||
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "agent-browser 执行失败")
|
||
return proc.stdout.strip()
|
||
|
||
|
||
def parse_json_output(raw: str) -> Any:
|
||
value: Any = raw.strip()
|
||
for _ in range(2):
|
||
try:
|
||
value = json.loads(value)
|
||
except (json.JSONDecodeError, TypeError):
|
||
break
|
||
return value
|
||
|
||
|
||
def eval_json(cdp_port: int, script: str) -> Any:
|
||
raw = run_agent_browser(cdp_port, "eval", script)
|
||
parsed = parse_json_output(raw)
|
||
if isinstance(parsed, (dict, list)):
|
||
return parsed
|
||
if isinstance(parsed, str):
|
||
return json.loads(parsed)
|
||
raise RuntimeError(f"无法解析浏览器返回结果: {raw[:200]}")
|
||
|
||
|
||
def current_page_snapshot(cdp_port: int) -> dict[str, Any]:
|
||
script = r"""
|
||
(() => {
|
||
const app = document.querySelector("#app");
|
||
const anchors = [...document.querySelectorAll("a[href]")]
|
||
.map((a) => ({
|
||
href: a.getAttribute("href"),
|
||
text: (a.innerText || "").trim(),
|
||
}))
|
||
.filter((item) => item.href || item.text);
|
||
|
||
return JSON.stringify({
|
||
href: location.href,
|
||
title: document.title,
|
||
readyState: document.readyState,
|
||
appHtmlLen: app ? app.innerHTML.length : 0,
|
||
appTextLen: app ? (app.innerText || "").trim().length : 0,
|
||
loadingMasks: document.querySelectorAll(".el-loading-mask").length,
|
||
anchors,
|
||
});
|
||
})()
|
||
"""
|
||
return eval_json(cdp_port, script)
|
||
|
||
|
||
def get_rendered_html(cdp_port: int) -> str:
|
||
script = r"JSON.stringify(document.documentElement.outerHTML)"
|
||
raw = run_agent_browser(cdp_port, "eval", script)
|
||
html = parse_json_output(raw)
|
||
if not isinstance(html, str):
|
||
raise RuntimeError("无法获取页面 HTML")
|
||
return html
|
||
|
||
|
||
def normalize_internal_url(raw_url: str | None, current_url: str, allowed_host: str) -> str | None:
|
||
if not raw_url:
|
||
return None
|
||
candidate = raw_url.strip()
|
||
if not candidate:
|
||
return None
|
||
lowered = candidate.lower()
|
||
if lowered.startswith(("javascript:", "mailto:", "tel:", "data:", "blob:")):
|
||
return None
|
||
if candidate.startswith("#/"):
|
||
return urljoin(f"https://{allowed_host}/", candidate)
|
||
if candidate.startswith("#"):
|
||
return None
|
||
|
||
absolute = urljoin(current_url, candidate)
|
||
parsed = urlparse(absolute)
|
||
if parsed.netloc != allowed_host:
|
||
return None
|
||
|
||
if parsed.fragment.startswith("/"):
|
||
fragment = parsed.fragment
|
||
elif parsed.fragment:
|
||
return None
|
||
else:
|
||
fragment = ""
|
||
|
||
clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path or '/'}"
|
||
if parsed.query:
|
||
clean += f"?{parsed.query}"
|
||
if fragment:
|
||
clean += f"#{fragment}"
|
||
return clean
|
||
|
||
|
||
def build_page_dir(root: Path, page_url: str) -> Path:
|
||
parsed = urlparse(page_url)
|
||
host_dir = root / parsed.netloc
|
||
|
||
fragment = parsed.fragment
|
||
route = ""
|
||
route_query = ""
|
||
if fragment.startswith("/"):
|
||
route = fragment[1:]
|
||
if "?" in route:
|
||
route, route_query = route.split("?", 1)
|
||
elif parsed.path not in ("", "/"):
|
||
route = parsed.path.lstrip("/")
|
||
if parsed.query:
|
||
route_query = parsed.query
|
||
|
||
target = host_dir
|
||
if route:
|
||
target = host_dir.joinpath(*[part for part in route.split("/") if part])
|
||
if route_query:
|
||
digest = hashlib.sha1(route_query.encode("utf-8")).hexdigest()[:10]
|
||
target = target / f"__query_{digest}"
|
||
return target
|
||
|
||
|
||
def build_asset_path(root: Path, asset_url: str) -> Path:
|
||
parsed = urlparse(asset_url)
|
||
host_dir = root / parsed.netloc
|
||
path = parsed.path or "/"
|
||
if path.endswith("/"):
|
||
path = f"{path}index"
|
||
local_path = host_dir / path.lstrip("/")
|
||
if parsed.query:
|
||
digest = hashlib.sha1(parsed.query.encode("utf-8")).hexdigest()[:10]
|
||
suffix = local_path.suffix
|
||
if suffix:
|
||
local_path = local_path.with_name(f"{local_path.stem}__q_{digest}{suffix}")
|
||
else:
|
||
local_path = local_path.with_name(f"{local_path.name}__q_{digest}")
|
||
return local_path
|
||
|
||
|
||
def safe_write_text(path: Path, content: str) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(content, encoding="utf-8")
|
||
|
||
|
||
def safe_write_bytes(path: Path, content: bytes) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_bytes(content)
|
||
|
||
|
||
def fetch_url(url: str, timeout: int = 30) -> bytes:
|
||
req = Request(url, headers={"User-Agent": USER_AGENT})
|
||
with urlopen(req, timeout=timeout) as response:
|
||
return response.read()
|
||
|
||
|
||
def discover_asset_urls(html: str, page_url: str) -> set[str]:
|
||
asset_urls: set[str] = set()
|
||
|
||
attr_pattern = re.compile(
|
||
r"""(?P<attr>href|src|poster)=["'](?P<url>[^"'#][^"']*)["']""",
|
||
re.IGNORECASE,
|
||
)
|
||
for match in attr_pattern.finditer(html):
|
||
candidate = match.group("url").strip()
|
||
if candidate.lower().startswith(("javascript:", "data:", "mailto:", "tel:", "blob:")):
|
||
continue
|
||
absolute = urljoin(page_url, candidate)
|
||
parsed = urlparse(absolute)
|
||
if parsed.scheme not in ("http", "https"):
|
||
continue
|
||
asset_urls.add(absolute)
|
||
return asset_urls
|
||
|
||
|
||
def build_virtual_route_url(host: str, route: str) -> str:
|
||
clean_route = route.lstrip("/")
|
||
if not clean_route:
|
||
clean_route = "dashboard"
|
||
return f"https://{host}/#/{clean_route}"
|
||
|
||
|
||
def resolve_navigation_absolute_url(raw_href: str, current_url: str) -> str | None:
|
||
href = unescape(raw_href).strip()
|
||
if not href:
|
||
return None
|
||
|
||
current_host = urlparse(current_url).netloc
|
||
if href in {"#", "./", "."}:
|
||
return current_url
|
||
|
||
if href.startswith("#/"):
|
||
return build_virtual_route_url(current_host, href[2:])
|
||
|
||
absolute = urljoin(current_url, href)
|
||
parsed = urlparse(absolute)
|
||
if parsed.netloc not in KNOWN_MIRROR_HOSTS:
|
||
return None
|
||
|
||
if parsed.fragment.startswith("/"):
|
||
return build_virtual_route_url(parsed.netloc, parsed.fragment[1:])
|
||
|
||
if parsed.path in ("", "/") and not parsed.query:
|
||
return build_virtual_route_url(parsed.netloc, "dashboard")
|
||
|
||
return absolute
|
||
|
||
|
||
def resolve_local_navigation_target(
|
||
raw_href: str,
|
||
current_url: str,
|
||
mirror_root: Path,
|
||
route_map: dict[str, Path] | None = None,
|
||
) -> Path | None:
|
||
href = unescape(raw_href).strip()
|
||
current_host = urlparse(current_url).netloc
|
||
|
||
if current_host == "hc-etms.sqygj.cn" and href == "#/goToProject":
|
||
return build_page_dir(mirror_root, "https://hc-pos.sqygj.cn/#/dashboard")
|
||
|
||
nav_url = resolve_navigation_absolute_url(raw_href, current_url)
|
||
if not nav_url:
|
||
return None
|
||
|
||
if route_map and nav_url in route_map:
|
||
return route_map[nav_url]
|
||
|
||
return build_page_dir(mirror_root, nav_url)
|
||
|
||
|
||
def rewrite_html_navigation_urls(
|
||
html: str,
|
||
current_url: str,
|
||
page_dir: Path,
|
||
mirror_root: Path,
|
||
route_map: dict[str, Path] | None = None,
|
||
) -> str:
|
||
def replace_anchor_href(match: re.Match[str]) -> str:
|
||
prefix = match.group("prefix")
|
||
quote = match.group("quote")
|
||
original = match.group("url")
|
||
target_dir = resolve_local_navigation_target(original, current_url, mirror_root, route_map)
|
||
if not target_dir:
|
||
return match.group(0)
|
||
relative = os.path.relpath(target_dir, page_dir).replace(os.sep, "/")
|
||
if relative == ".":
|
||
relative = "./"
|
||
elif not relative.endswith("/"):
|
||
relative = f"{relative}/"
|
||
return f"{prefix}{quote}{relative}{quote}"
|
||
|
||
return re.sub(
|
||
r"""(?P<prefix><a\b[^>]*\shref=)(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
|
||
replace_anchor_href,
|
||
html,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def inject_static_helper_tags(html: str) -> str:
|
||
css_tag = f'<link rel="stylesheet" href="{STATIC_HELPER_CSS}">'
|
||
js_tag = f'<script src="{STATIC_HELPER_JS}"></script>'
|
||
|
||
if css_tag not in html:
|
||
if "</head>" in html:
|
||
html = html.replace("</head>", f"{css_tag}</head>", 1)
|
||
else:
|
||
html = f"{css_tag}{html}"
|
||
|
||
if js_tag not in html:
|
||
if "</body>" in html:
|
||
html = html.replace("</body>", f"{js_tag}</body>", 1)
|
||
else:
|
||
html = f"{html}{js_tag}"
|
||
return html
|
||
|
||
|
||
def rewrite_html_asset_urls(
|
||
html: str,
|
||
page_url: str,
|
||
page_dir: Path,
|
||
mirror_root: Path,
|
||
downloaded_assets: dict[str, Path],
|
||
) -> str:
|
||
def replace_attr(match: re.Match[str]) -> str:
|
||
attr = match.group("attr")
|
||
original = unescape(match.group("url"))
|
||
absolute = urljoin(page_url, original)
|
||
local_path = downloaded_assets.get(absolute)
|
||
if not local_path:
|
||
return match.group(0)
|
||
relative = os.path.relpath(local_path, page_dir).replace(os.sep, "/")
|
||
quote = match.group("quote")
|
||
return f'{attr}={quote}{relative}{quote}'
|
||
|
||
html = re.sub(
|
||
r"""(?P<attr>href|src|poster)=(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
|
||
replace_attr,
|
||
html,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
# 纯静态快照不保留脚本,避免本地打开后重新接管 DOM。
|
||
html = re.sub(
|
||
r"<script\b[^>]*>.*?</script>",
|
||
"",
|
||
html,
|
||
flags=re.IGNORECASE | re.DOTALL,
|
||
)
|
||
html = rewrite_html_navigation_urls(html, page_url, page_dir, mirror_root)
|
||
html = inject_static_helper_tags(html)
|
||
return html
|
||
|
||
|
||
def discover_css_urls(css_text: str, css_url: str) -> set[str]:
|
||
found: set[str] = set()
|
||
for raw in re.findall(r"url\(([^)]+)\)", css_text, flags=re.IGNORECASE):
|
||
candidate = raw.strip().strip("\"'")
|
||
if not candidate or candidate.startswith("data:"):
|
||
continue
|
||
absolute = urljoin(css_url, candidate)
|
||
parsed = urlparse(absolute)
|
||
if parsed.scheme not in ("http", "https"):
|
||
continue
|
||
found.add(absolute)
|
||
return found
|
||
|
||
|
||
def rewrite_css_urls(css_text: str, css_url: str, css_path: Path, downloaded_assets: dict[str, Path]) -> str:
|
||
def replace_url(match: re.Match[str]) -> str:
|
||
raw = match.group(1).strip()
|
||
candidate = raw.strip("\"'")
|
||
absolute = urljoin(css_url, candidate)
|
||
local_path = downloaded_assets.get(absolute)
|
||
if not local_path:
|
||
return match.group(0)
|
||
relative = os.path.relpath(local_path, css_path.parent).replace(os.sep, "/")
|
||
return f"url('{relative}')"
|
||
|
||
return re.sub(r"url\(([^)]+)\)", replace_url, css_text, flags=re.IGNORECASE)
|
||
|
||
|
||
def wait_for_page(cdp_port: int, expected_url: str, timeout: int) -> dict[str, Any]:
|
||
start = time.time()
|
||
last_len = -1
|
||
stable_ticks = 0
|
||
state: dict[str, Any] = {}
|
||
while time.time() - start < timeout:
|
||
time.sleep(1)
|
||
state = current_page_snapshot(cdp_port)
|
||
html_len = int(state.get("appHtmlLen", 0))
|
||
loading_masks = int(state.get("loadingMasks", 0))
|
||
if html_len > 500 and loading_masks == 0:
|
||
stable_ticks = stable_ticks + 1 if html_len == last_len else 0
|
||
if stable_ticks >= 1:
|
||
return state
|
||
last_len = html_len
|
||
return state
|
||
|
||
|
||
def capture_page(
|
||
cdp_port: int,
|
||
url: str,
|
||
allowed_host: str,
|
||
mirror_root: Path,
|
||
downloaded_assets: dict[str, Path],
|
||
asset_failures: list[dict[str, str]],
|
||
timeout: int,
|
||
) -> tuple[dict[str, Any], set[str]]:
|
||
run_agent_browser(cdp_port, "open", url)
|
||
state = wait_for_page(cdp_port, url, timeout)
|
||
html = get_rendered_html(cdp_port)
|
||
current_url = state.get("href") or url
|
||
page_dir = build_page_dir(mirror_root, current_url)
|
||
page_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
discovered_assets = discover_asset_urls(html, current_url)
|
||
pending_assets = deque(sorted(discovered_assets))
|
||
local_asset_map: dict[str, Path] = {}
|
||
|
||
while pending_assets:
|
||
asset_url = pending_assets.popleft()
|
||
if asset_url in downloaded_assets:
|
||
local_asset_map[asset_url] = downloaded_assets[asset_url]
|
||
continue
|
||
|
||
try:
|
||
payload = fetch_url(asset_url)
|
||
except Exception as exc: # noqa: BLE001
|
||
asset_failures.append({"url": asset_url, "error": str(exc)})
|
||
continue
|
||
|
||
local_path = build_asset_path(mirror_root, asset_url)
|
||
safe_write_bytes(local_path, payload)
|
||
downloaded_assets[asset_url] = local_path
|
||
local_asset_map[asset_url] = local_path
|
||
|
||
content_type = local_path.suffix.lower()
|
||
if content_type == ".css":
|
||
css_text = payload.decode("utf-8", errors="ignore")
|
||
nested_assets = discover_css_urls(css_text, asset_url)
|
||
for nested in sorted(nested_assets):
|
||
if nested not in downloaded_assets:
|
||
pending_assets.append(nested)
|
||
|
||
for asset_url, local_path in list(local_asset_map.items()):
|
||
if local_path.suffix.lower() != ".css":
|
||
continue
|
||
css_text = local_path.read_text(encoding="utf-8", errors="ignore")
|
||
rewritten = rewrite_css_urls(css_text, asset_url, local_path, downloaded_assets)
|
||
safe_write_text(local_path, rewritten)
|
||
|
||
rewritten_html = rewrite_html_asset_urls(html, current_url, page_dir, mirror_root, downloaded_assets)
|
||
html_path = page_dir / "index.html"
|
||
safe_write_text(html_path, rewritten_html)
|
||
|
||
next_urls: set[str] = set()
|
||
for anchor in state.get("anchors", []):
|
||
next_url = normalize_internal_url(anchor.get("href"), current_url, allowed_host)
|
||
if next_url:
|
||
next_urls.add(next_url)
|
||
|
||
page_record = {
|
||
"source_url": url,
|
||
"final_url": current_url,
|
||
"title": state.get("title", ""),
|
||
"html_path": str(html_path.relative_to(mirror_root)),
|
||
"anchor_count": len(state.get("anchors", [])),
|
||
"app_html_len": state.get("appHtmlLen", 0),
|
||
}
|
||
return page_record, next_urls
|
||
|
||
|
||
def crawl_site(args: argparse.Namespace) -> int:
|
||
mirror_root = Path(args.output_dir).resolve()
|
||
mirror_root.mkdir(parents=True, exist_ok=True)
|
||
|
||
wait_for_page(args.cdp_port, "", args.timeout)
|
||
seed = current_page_snapshot(args.cdp_port)
|
||
start_url = seed.get("href")
|
||
if not isinstance(start_url, str) or args.host not in start_url:
|
||
raise RuntimeError("当前浏览器标签页不在目标站点上,请先切到已登录页面。")
|
||
|
||
discovered: deque[str] = deque()
|
||
seen: set[str] = set()
|
||
if args.seed_url:
|
||
for seed_url in args.seed_url:
|
||
normalized = normalize_internal_url(seed_url, seed_url, args.host) or seed_url
|
||
if normalized not in seen:
|
||
discovered.append(normalized)
|
||
else:
|
||
start_normalized = normalize_internal_url(start_url, start_url, args.host) or start_url
|
||
discovered.append(start_normalized)
|
||
|
||
for anchor in seed.get("anchors", []):
|
||
normalized = normalize_internal_url(anchor.get("href"), start_url, args.host)
|
||
if normalized and normalized not in seen:
|
||
discovered.append(normalized)
|
||
|
||
downloaded_assets: dict[str, Path] = {}
|
||
asset_failures: list[dict[str, str]] = []
|
||
page_failures: list[dict[str, str]] = []
|
||
pages: list[dict[str, Any]] = []
|
||
|
||
while discovered and len(seen) < args.max_pages:
|
||
target = discovered.popleft()
|
||
if target in seen:
|
||
continue
|
||
seen.add(target)
|
||
|
||
print(f"[页面] {len(seen):03d} {target}", flush=True)
|
||
try:
|
||
page_record, next_urls = capture_page(
|
||
args.cdp_port,
|
||
target,
|
||
args.host,
|
||
mirror_root,
|
||
downloaded_assets,
|
||
asset_failures,
|
||
args.timeout,
|
||
)
|
||
pages.append(page_record)
|
||
if not args.no_discover:
|
||
for next_url in sorted(next_urls):
|
||
if next_url not in seen:
|
||
discovered.append(next_url)
|
||
except Exception as exc: # noqa: BLE001
|
||
page_failures.append({"url": target, "error": str(exc)})
|
||
print(f"[失败] {target} -> {exc}", flush=True)
|
||
|
||
manifest = {
|
||
"host": args.host,
|
||
"captured_pages": len(pages),
|
||
"downloaded_assets": len(downloaded_assets),
|
||
"page_failures": page_failures,
|
||
"asset_failures": asset_failures,
|
||
"pages": pages,
|
||
}
|
||
manifest_name = f"mirror-manifest-{args.host}.json"
|
||
safe_write_text(mirror_root / manifest_name, json.dumps(manifest, ensure_ascii=False, indent=2))
|
||
|
||
print(
|
||
f"[完成] 页面 {len(pages)} 个,资源 {len(downloaded_assets)} 个,"
|
||
f"页面失败 {len(page_failures)} 个,资源失败 {len(asset_failures)} 个",
|
||
flush=True,
|
||
)
|
||
return 0
|
||
|
||
|
||
def reconstruct_page_url_from_path(html_path: Path, mirror_root: Path) -> str | None:
|
||
try:
|
||
rel = html_path.relative_to(mirror_root)
|
||
except ValueError:
|
||
return None
|
||
|
||
if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS:
|
||
return None
|
||
|
||
host = rel.parts[0]
|
||
route_parts = list(rel.parts[1:-1])
|
||
if route_parts and route_parts[-1].startswith("__query_"):
|
||
route_parts = route_parts[:-1]
|
||
|
||
route = "/".join(route_parts).strip("/")
|
||
return build_virtual_route_url(host, route)
|
||
|
||
|
||
def load_route_map_from_manifests(mirror_root: Path) -> dict[str, Path]:
|
||
route_map: dict[str, Path] = {}
|
||
for manifest_path in sorted(mirror_root.glob("mirror-manifest-*.json")):
|
||
data = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||
for item in data.get("pages", []):
|
||
html_path = mirror_root / item["html_path"]
|
||
page_dir = html_path.parent
|
||
source_url = item.get("source_url")
|
||
final_url = item.get("final_url")
|
||
if source_url:
|
||
route_map[source_url] = page_dir
|
||
if final_url and final_url not in route_map:
|
||
route_map[final_url] = page_dir
|
||
hc_pos_dashboard = mirror_root / "hc-pos.sqygj.cn" / "dataPlatform" / "home"
|
||
if (hc_pos_dashboard / "index.html").exists():
|
||
route_map["https://hc-pos.sqygj.cn/#/r2cockpit"] = hc_pos_dashboard
|
||
return route_map
|
||
|
||
|
||
def synthesize_route_url_from_target_dir(target_dir: Path, mirror_root: Path) -> str | None:
|
||
try:
|
||
rel = target_dir.relative_to(mirror_root)
|
||
except ValueError:
|
||
return None
|
||
if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS:
|
||
return None
|
||
host = rel.parts[0]
|
||
route = "/".join(rel.parts[1:])
|
||
return build_virtual_route_url(host, route)
|
||
|
||
|
||
def sanitize_relative_anchor_links(
|
||
html: str,
|
||
html_path: Path,
|
||
mirror_root: Path,
|
||
route_map: dict[str, Path] | None = None,
|
||
) -> str:
|
||
try:
|
||
rel = html_path.relative_to(mirror_root)
|
||
except ValueError:
|
||
return html
|
||
if not rel.parts:
|
||
return html
|
||
|
||
host = rel.parts[0]
|
||
fallback_dir = mirror_root / host / "404"
|
||
fallback_exists = (fallback_dir / "index.html").exists()
|
||
|
||
page_dir = html_path.parent
|
||
|
||
def replace_anchor_href(match: re.Match[str]) -> str:
|
||
prefix = match.group("prefix")
|
||
quote = match.group("quote")
|
||
href = match.group("url")
|
||
if not href.startswith(("./", "../")):
|
||
return match.group(0)
|
||
|
||
target_dir = (page_dir / href).resolve()
|
||
target_file = target_dir / "index.html"
|
||
if target_file.exists():
|
||
return match.group(0)
|
||
|
||
if route_map:
|
||
synthetic_url = synthesize_route_url_from_target_dir(target_dir, mirror_root)
|
||
mapped_dir = route_map.get(synthetic_url) if synthetic_url else None
|
||
if mapped_dir and (mapped_dir / "index.html").exists():
|
||
relative = os.path.relpath(mapped_dir, page_dir).replace(os.sep, "/")
|
||
if relative == ".":
|
||
relative = "./"
|
||
elif not relative.endswith("/"):
|
||
relative = f"{relative}/"
|
||
return f"{prefix}{quote}{relative}{quote}"
|
||
|
||
if fallback_exists:
|
||
relative = os.path.relpath(fallback_dir, page_dir).replace(os.sep, "/")
|
||
if relative == ".":
|
||
relative = "./"
|
||
elif not relative.endswith("/"):
|
||
relative = f"{relative}/"
|
||
return f"{prefix}{quote}{relative}{quote}"
|
||
return match.group(0)
|
||
|
||
return re.sub(
|
||
r"""(?P<prefix><a\b[^>]*\shref=)(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
|
||
replace_anchor_href,
|
||
html,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def rewrite_existing_html_links(mirror_root: Path) -> int:
|
||
route_map = load_route_map_from_manifests(mirror_root)
|
||
rewritten_count = 0
|
||
for html_path in mirror_root.rglob("index.html"):
|
||
page_url = reconstruct_page_url_from_path(html_path, mirror_root)
|
||
if not page_url:
|
||
continue
|
||
original = html_path.read_text(encoding="utf-8", errors="ignore")
|
||
updated = rewrite_html_navigation_urls(original, page_url, html_path.parent, mirror_root, route_map)
|
||
updated = sanitize_relative_anchor_links(updated, html_path, mirror_root, route_map)
|
||
updated = inject_static_helper_tags(updated)
|
||
if updated != original:
|
||
safe_write_text(html_path, updated)
|
||
rewritten_count += 1
|
||
return rewritten_count
|
||
|
||
|
||
def build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(description="复用已登录 Chrome 会话镜像站点")
|
||
parser.add_argument("--host", default=DEFAULT_HOST, help="限制抓取的主机名")
|
||
parser.add_argument("--cdp-port", type=int, default=DEFAULT_CDP_PORT, help="Chrome 远程调试端口")
|
||
parser.add_argument("--output-dir", default=".", help="抓取结果输出目录")
|
||
parser.add_argument("--max-pages", type=int, default=140, help="最多抓取页面数")
|
||
parser.add_argument("--timeout", type=int, default=20, help="单页等待秒数")
|
||
parser.add_argument("--seed-url", action="append", default=[], help="指定起始 URL,可重复传入")
|
||
parser.add_argument("--seed-file", help="从文本文件读取起始 URL,每行一个")
|
||
parser.add_argument("--no-discover", action="store_true", help="只抓取种子 URL,不继续从页面内发现新链接")
|
||
parser.add_argument("--rewrite-links-only", action="store_true", help="仅重写现有 HTML 的本地导航链接")
|
||
return parser
|
||
|
||
|
||
def main() -> int:
|
||
parser = build_parser()
|
||
args = parser.parse_args()
|
||
try:
|
||
if args.seed_file:
|
||
seed_path = Path(args.seed_file)
|
||
seed_urls = [
|
||
line.strip()
|
||
for line in seed_path.read_text(encoding="utf-8").splitlines()
|
||
if line.strip() and not line.strip().startswith("#")
|
||
]
|
||
args.seed_url.extend(seed_urls)
|
||
if args.rewrite_links_only:
|
||
count = rewrite_existing_html_links(Path(args.output_dir).resolve())
|
||
print(f"[完成] 已重写 {count} 个 HTML 文件的本地导航链接")
|
||
return 0
|
||
return crawl_site(args)
|
||
except Exception as exc: # noqa: BLE001
|
||
print(f"[错误] {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|