Files
wysite/mirror_logged_in_site.py

802 lines
27 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""复用已登录 Chrome 会话抓取站内页面并按路径落盘。"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import subprocess
import sys
import time
from collections import deque
from html import unescape
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse
from urllib.request import Request, urlopen
DEFAULT_HOST = "hc-etms.sqygj.cn"
DEFAULT_CDP_PORT = 9222
KNOWN_MIRROR_HOSTS = {"hc-etms.sqygj.cn", "hc-pos.sqygj.cn"}
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
)
STATIC_HELPER_CSS = "/__mirror/static-mirror.css"
STATIC_HELPER_JS = "/__mirror/static-mirror.js"
def run_agent_browser(cdp_port: int, *args: str) -> str:
if not args:
raise RuntimeError("missing CDP command")
node_script = r"""
const [, mode, portRaw, payload = ""] = process.argv;
const cdpPort = Number(portRaw);
function fail(message) {
console.error(message);
process.exit(1);
}
async function choosePageTarget() {
const response = await fetch(`http://127.0.0.1:${cdpPort}/json`);
if (!response.ok) {
throw new Error(`failed to query CDP targets: ${response.status}`);
}
const targets = await response.json();
const pages = targets.filter((target) => target.type === "page");
const preferred = pages.find((target) => {
try {
const current = new URL(target.url);
return ["hc-pos.sqygj.cn", "hc-etms.sqygj.cn"].includes(current.host);
} catch (_error) {
return false;
}
});
return preferred || pages[0] || null;
}
async function main() {
const target = await choosePageTarget();
if (!target || !target.webSocketDebuggerUrl) {
throw new Error("no debuggable page target found");
}
const ws = new WebSocket(target.webSocketDebuggerUrl);
let seq = 0;
const pending = new Map();
ws.addEventListener("message", (event) => {
const message = JSON.parse(event.data);
if (!message.id || !pending.has(message.id)) {
return;
}
const current = pending.get(message.id);
pending.delete(message.id);
if (message.error) {
current.reject(new Error(JSON.stringify(message.error)));
return;
}
current.resolve(message.result || {});
});
await new Promise((resolve, reject) => {
ws.addEventListener("open", () => resolve(), { once: true });
ws.addEventListener("error", (event) => reject(event.error || new Error("websocket error")), { once: true });
});
const send = (method, params = {}) => new Promise((resolve, reject) => {
const id = ++seq;
pending.set(id, { resolve, reject });
ws.send(JSON.stringify({ id, method, params }));
});
if (mode === "open") {
await send("Page.enable");
await send("Page.navigate", { url: payload });
console.log(JSON.stringify({ ok: true, url: payload }));
ws.close();
return;
}
if (mode === "eval") {
await send("Runtime.enable");
const result = await send("Runtime.evaluate", {
expression: payload,
awaitPromise: true,
returnByValue: true,
});
const value = result.result ? result.result.value : null;
console.log(typeof value === "string" ? value : JSON.stringify(value));
ws.close();
return;
}
throw new Error(`unsupported CDP command: ${mode}`);
}
main().catch((error) => fail(error && error.message ? error.message : String(error)));
"""
cmd = ["node", "-e", node_script, args[0], str(cdp_port), args[1] if len(args) > 1 else ""]
proc = subprocess.run(
cmd,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "agent-browser 执行失败")
return proc.stdout.strip()
def parse_json_output(raw: str) -> Any:
value: Any = raw.strip()
for _ in range(2):
try:
value = json.loads(value)
except (json.JSONDecodeError, TypeError):
break
return value
def eval_json(cdp_port: int, script: str) -> Any:
raw = run_agent_browser(cdp_port, "eval", script)
parsed = parse_json_output(raw)
if isinstance(parsed, (dict, list)):
return parsed
if isinstance(parsed, str):
return json.loads(parsed)
raise RuntimeError(f"无法解析浏览器返回结果: {raw[:200]}")
def current_page_snapshot(cdp_port: int) -> dict[str, Any]:
script = r"""
(() => {
const app = document.querySelector("#app");
const anchors = [...document.querySelectorAll("a[href]")]
.map((a) => ({
href: a.getAttribute("href"),
text: (a.innerText || "").trim(),
}))
.filter((item) => item.href || item.text);
return JSON.stringify({
href: location.href,
title: document.title,
readyState: document.readyState,
appHtmlLen: app ? app.innerHTML.length : 0,
appTextLen: app ? (app.innerText || "").trim().length : 0,
loadingMasks: document.querySelectorAll(".el-loading-mask").length,
anchors,
});
})()
"""
return eval_json(cdp_port, script)
def get_rendered_html(cdp_port: int) -> str:
script = r"JSON.stringify(document.documentElement.outerHTML)"
raw = run_agent_browser(cdp_port, "eval", script)
html = parse_json_output(raw)
if not isinstance(html, str):
raise RuntimeError("无法获取页面 HTML")
return html
def normalize_internal_url(raw_url: str | None, current_url: str, allowed_host: str) -> str | None:
if not raw_url:
return None
candidate = raw_url.strip()
if not candidate:
return None
lowered = candidate.lower()
if lowered.startswith(("javascript:", "mailto:", "tel:", "data:", "blob:")):
return None
if candidate.startswith("#/"):
return urljoin(f"https://{allowed_host}/", candidate)
if candidate.startswith("#"):
return None
absolute = urljoin(current_url, candidate)
parsed = urlparse(absolute)
if parsed.netloc != allowed_host:
return None
if parsed.fragment.startswith("/"):
fragment = parsed.fragment
elif parsed.fragment:
return None
else:
fragment = ""
clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path or '/'}"
if parsed.query:
clean += f"?{parsed.query}"
if fragment:
clean += f"#{fragment}"
return clean
def build_page_dir(root: Path, page_url: str) -> Path:
parsed = urlparse(page_url)
host_dir = root / parsed.netloc
fragment = parsed.fragment
route = ""
route_query = ""
if fragment.startswith("/"):
route = fragment[1:]
if "?" in route:
route, route_query = route.split("?", 1)
elif parsed.path not in ("", "/"):
route = parsed.path.lstrip("/")
if parsed.query:
route_query = parsed.query
target = host_dir
if route:
target = host_dir.joinpath(*[part for part in route.split("/") if part])
if route_query:
digest = hashlib.sha1(route_query.encode("utf-8")).hexdigest()[:10]
target = target / f"__query_{digest}"
return target
def build_asset_path(root: Path, asset_url: str) -> Path:
parsed = urlparse(asset_url)
host_dir = root / parsed.netloc
path = parsed.path or "/"
if path.endswith("/"):
path = f"{path}index"
local_path = host_dir / path.lstrip("/")
if parsed.query:
digest = hashlib.sha1(parsed.query.encode("utf-8")).hexdigest()[:10]
suffix = local_path.suffix
if suffix:
local_path = local_path.with_name(f"{local_path.stem}__q_{digest}{suffix}")
else:
local_path = local_path.with_name(f"{local_path.name}__q_{digest}")
return local_path
def safe_write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
def safe_write_bytes(path: Path, content: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(content)
def fetch_url(url: str, timeout: int = 30) -> bytes:
req = Request(url, headers={"User-Agent": USER_AGENT})
with urlopen(req, timeout=timeout) as response:
return response.read()
def discover_asset_urls(html: str, page_url: str) -> set[str]:
asset_urls: set[str] = set()
attr_pattern = re.compile(
r"""(?P<attr>href|src|poster)=["'](?P<url>[^"'#][^"']*)["']""",
re.IGNORECASE,
)
for match in attr_pattern.finditer(html):
candidate = match.group("url").strip()
if candidate.lower().startswith(("javascript:", "data:", "mailto:", "tel:", "blob:")):
continue
absolute = urljoin(page_url, candidate)
parsed = urlparse(absolute)
if parsed.scheme not in ("http", "https"):
continue
asset_urls.add(absolute)
return asset_urls
def build_virtual_route_url(host: str, route: str) -> str:
clean_route = route.lstrip("/")
if not clean_route:
clean_route = "dashboard"
return f"https://{host}/#/{clean_route}"
def resolve_navigation_absolute_url(raw_href: str, current_url: str) -> str | None:
href = unescape(raw_href).strip()
if not href:
return None
current_host = urlparse(current_url).netloc
if href in {"#", "./", "."}:
return current_url
if href.startswith("#/"):
return build_virtual_route_url(current_host, href[2:])
absolute = urljoin(current_url, href)
parsed = urlparse(absolute)
if parsed.netloc not in KNOWN_MIRROR_HOSTS:
return None
if parsed.fragment.startswith("/"):
return build_virtual_route_url(parsed.netloc, parsed.fragment[1:])
if parsed.path in ("", "/") and not parsed.query:
return build_virtual_route_url(parsed.netloc, "dashboard")
return absolute
def resolve_local_navigation_target(
raw_href: str,
current_url: str,
mirror_root: Path,
route_map: dict[str, Path] | None = None,
) -> Path | None:
href = unescape(raw_href).strip()
current_host = urlparse(current_url).netloc
if current_host == "hc-etms.sqygj.cn" and href == "#/goToProject":
return build_page_dir(mirror_root, "https://hc-pos.sqygj.cn/#/dashboard")
nav_url = resolve_navigation_absolute_url(raw_href, current_url)
if not nav_url:
return None
if route_map and nav_url in route_map:
return route_map[nav_url]
return build_page_dir(mirror_root, nav_url)
def rewrite_html_navigation_urls(
html: str,
current_url: str,
page_dir: Path,
mirror_root: Path,
route_map: dict[str, Path] | None = None,
) -> str:
def replace_anchor_href(match: re.Match[str]) -> str:
prefix = match.group("prefix")
quote = match.group("quote")
original = match.group("url")
target_dir = resolve_local_navigation_target(original, current_url, mirror_root, route_map)
if not target_dir:
return match.group(0)
relative = os.path.relpath(target_dir, page_dir).replace(os.sep, "/")
if relative == ".":
relative = "./"
elif not relative.endswith("/"):
relative = f"{relative}/"
return f"{prefix}{quote}{relative}{quote}"
return re.sub(
r"""(?P<prefix><a\b[^>]*\shref=)(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
replace_anchor_href,
html,
flags=re.IGNORECASE,
)
def inject_static_helper_tags(html: str) -> str:
css_tag = f'<link rel="stylesheet" href="{STATIC_HELPER_CSS}">'
js_tag = f'<script src="{STATIC_HELPER_JS}"></script>'
if css_tag not in html:
if "</head>" in html:
html = html.replace("</head>", f"{css_tag}</head>", 1)
else:
html = f"{css_tag}{html}"
if js_tag not in html:
if "</body>" in html:
html = html.replace("</body>", f"{js_tag}</body>", 1)
else:
html = f"{html}{js_tag}"
return html
def rewrite_html_asset_urls(
html: str,
page_url: str,
page_dir: Path,
mirror_root: Path,
downloaded_assets: dict[str, Path],
) -> str:
def replace_attr(match: re.Match[str]) -> str:
attr = match.group("attr")
original = unescape(match.group("url"))
absolute = urljoin(page_url, original)
local_path = downloaded_assets.get(absolute)
if not local_path:
return match.group(0)
relative = os.path.relpath(local_path, page_dir).replace(os.sep, "/")
quote = match.group("quote")
return f'{attr}={quote}{relative}{quote}'
html = re.sub(
r"""(?P<attr>href|src|poster)=(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
replace_attr,
html,
flags=re.IGNORECASE,
)
# 纯静态快照不保留脚本,避免本地打开后重新接管 DOM。
html = re.sub(
r"<script\b[^>]*>.*?</script>",
"",
html,
flags=re.IGNORECASE | re.DOTALL,
)
html = rewrite_html_navigation_urls(html, page_url, page_dir, mirror_root)
html = inject_static_helper_tags(html)
return html
def discover_css_urls(css_text: str, css_url: str) -> set[str]:
found: set[str] = set()
for raw in re.findall(r"url\(([^)]+)\)", css_text, flags=re.IGNORECASE):
candidate = raw.strip().strip("\"'")
if not candidate or candidate.startswith("data:"):
continue
absolute = urljoin(css_url, candidate)
parsed = urlparse(absolute)
if parsed.scheme not in ("http", "https"):
continue
found.add(absolute)
return found
def rewrite_css_urls(css_text: str, css_url: str, css_path: Path, downloaded_assets: dict[str, Path]) -> str:
def replace_url(match: re.Match[str]) -> str:
raw = match.group(1).strip()
candidate = raw.strip("\"'")
absolute = urljoin(css_url, candidate)
local_path = downloaded_assets.get(absolute)
if not local_path:
return match.group(0)
relative = os.path.relpath(local_path, css_path.parent).replace(os.sep, "/")
return f"url('{relative}')"
return re.sub(r"url\(([^)]+)\)", replace_url, css_text, flags=re.IGNORECASE)
def wait_for_page(cdp_port: int, expected_url: str, timeout: int) -> dict[str, Any]:
start = time.time()
last_len = -1
stable_ticks = 0
state: dict[str, Any] = {}
while time.time() - start < timeout:
time.sleep(1)
state = current_page_snapshot(cdp_port)
html_len = int(state.get("appHtmlLen", 0))
loading_masks = int(state.get("loadingMasks", 0))
if html_len > 500 and loading_masks == 0:
stable_ticks = stable_ticks + 1 if html_len == last_len else 0
if stable_ticks >= 1:
return state
last_len = html_len
return state
def capture_page(
cdp_port: int,
url: str,
allowed_host: str,
mirror_root: Path,
downloaded_assets: dict[str, Path],
asset_failures: list[dict[str, str]],
timeout: int,
) -> tuple[dict[str, Any], set[str]]:
run_agent_browser(cdp_port, "open", url)
state = wait_for_page(cdp_port, url, timeout)
html = get_rendered_html(cdp_port)
current_url = state.get("href") or url
page_dir = build_page_dir(mirror_root, current_url)
page_dir.mkdir(parents=True, exist_ok=True)
discovered_assets = discover_asset_urls(html, current_url)
pending_assets = deque(sorted(discovered_assets))
local_asset_map: dict[str, Path] = {}
while pending_assets:
asset_url = pending_assets.popleft()
if asset_url in downloaded_assets:
local_asset_map[asset_url] = downloaded_assets[asset_url]
continue
try:
payload = fetch_url(asset_url)
except Exception as exc: # noqa: BLE001
asset_failures.append({"url": asset_url, "error": str(exc)})
continue
local_path = build_asset_path(mirror_root, asset_url)
safe_write_bytes(local_path, payload)
downloaded_assets[asset_url] = local_path
local_asset_map[asset_url] = local_path
content_type = local_path.suffix.lower()
if content_type == ".css":
css_text = payload.decode("utf-8", errors="ignore")
nested_assets = discover_css_urls(css_text, asset_url)
for nested in sorted(nested_assets):
if nested not in downloaded_assets:
pending_assets.append(nested)
for asset_url, local_path in list(local_asset_map.items()):
if local_path.suffix.lower() != ".css":
continue
css_text = local_path.read_text(encoding="utf-8", errors="ignore")
rewritten = rewrite_css_urls(css_text, asset_url, local_path, downloaded_assets)
safe_write_text(local_path, rewritten)
rewritten_html = rewrite_html_asset_urls(html, current_url, page_dir, mirror_root, downloaded_assets)
html_path = page_dir / "index.html"
safe_write_text(html_path, rewritten_html)
next_urls: set[str] = set()
for anchor in state.get("anchors", []):
next_url = normalize_internal_url(anchor.get("href"), current_url, allowed_host)
if next_url:
next_urls.add(next_url)
page_record = {
"source_url": url,
"final_url": current_url,
"title": state.get("title", ""),
"html_path": str(html_path.relative_to(mirror_root)),
"anchor_count": len(state.get("anchors", [])),
"app_html_len": state.get("appHtmlLen", 0),
}
return page_record, next_urls
def crawl_site(args: argparse.Namespace) -> int:
mirror_root = Path(args.output_dir).resolve()
mirror_root.mkdir(parents=True, exist_ok=True)
wait_for_page(args.cdp_port, "", args.timeout)
seed = current_page_snapshot(args.cdp_port)
start_url = seed.get("href")
if not isinstance(start_url, str) or args.host not in start_url:
raise RuntimeError("当前浏览器标签页不在目标站点上,请先切到已登录页面。")
discovered: deque[str] = deque()
seen: set[str] = set()
if args.seed_url:
for seed_url in args.seed_url:
normalized = normalize_internal_url(seed_url, seed_url, args.host) or seed_url
if normalized not in seen:
discovered.append(normalized)
else:
start_normalized = normalize_internal_url(start_url, start_url, args.host) or start_url
discovered.append(start_normalized)
for anchor in seed.get("anchors", []):
normalized = normalize_internal_url(anchor.get("href"), start_url, args.host)
if normalized and normalized not in seen:
discovered.append(normalized)
downloaded_assets: dict[str, Path] = {}
asset_failures: list[dict[str, str]] = []
page_failures: list[dict[str, str]] = []
pages: list[dict[str, Any]] = []
while discovered and len(seen) < args.max_pages:
target = discovered.popleft()
if target in seen:
continue
seen.add(target)
print(f"[页面] {len(seen):03d} {target}", flush=True)
try:
page_record, next_urls = capture_page(
args.cdp_port,
target,
args.host,
mirror_root,
downloaded_assets,
asset_failures,
args.timeout,
)
pages.append(page_record)
if not args.no_discover:
for next_url in sorted(next_urls):
if next_url not in seen:
discovered.append(next_url)
except Exception as exc: # noqa: BLE001
page_failures.append({"url": target, "error": str(exc)})
print(f"[失败] {target} -> {exc}", flush=True)
manifest = {
"host": args.host,
"captured_pages": len(pages),
"downloaded_assets": len(downloaded_assets),
"page_failures": page_failures,
"asset_failures": asset_failures,
"pages": pages,
}
manifest_name = f"mirror-manifest-{args.host}.json"
safe_write_text(mirror_root / manifest_name, json.dumps(manifest, ensure_ascii=False, indent=2))
print(
f"[完成] 页面 {len(pages)} 个,资源 {len(downloaded_assets)} 个,"
f"页面失败 {len(page_failures)} 个,资源失败 {len(asset_failures)}",
flush=True,
)
return 0
def reconstruct_page_url_from_path(html_path: Path, mirror_root: Path) -> str | None:
try:
rel = html_path.relative_to(mirror_root)
except ValueError:
return None
if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS:
return None
host = rel.parts[0]
route_parts = list(rel.parts[1:-1])
if route_parts and route_parts[-1].startswith("__query_"):
route_parts = route_parts[:-1]
route = "/".join(route_parts).strip("/")
return build_virtual_route_url(host, route)
def load_route_map_from_manifests(mirror_root: Path) -> dict[str, Path]:
route_map: dict[str, Path] = {}
for manifest_path in sorted(mirror_root.glob("mirror-manifest-*.json")):
data = json.loads(manifest_path.read_text(encoding="utf-8"))
for item in data.get("pages", []):
html_path = mirror_root / item["html_path"]
page_dir = html_path.parent
source_url = item.get("source_url")
final_url = item.get("final_url")
if source_url:
route_map[source_url] = page_dir
if final_url and final_url not in route_map:
route_map[final_url] = page_dir
hc_pos_dashboard = mirror_root / "hc-pos.sqygj.cn" / "dataPlatform" / "home"
if (hc_pos_dashboard / "index.html").exists():
route_map["https://hc-pos.sqygj.cn/#/r2cockpit"] = hc_pos_dashboard
return route_map
def synthesize_route_url_from_target_dir(target_dir: Path, mirror_root: Path) -> str | None:
try:
rel = target_dir.relative_to(mirror_root)
except ValueError:
return None
if not rel.parts or rel.parts[0] not in KNOWN_MIRROR_HOSTS:
return None
host = rel.parts[0]
route = "/".join(rel.parts[1:])
return build_virtual_route_url(host, route)
def sanitize_relative_anchor_links(
html: str,
html_path: Path,
mirror_root: Path,
route_map: dict[str, Path] | None = None,
) -> str:
try:
rel = html_path.relative_to(mirror_root)
except ValueError:
return html
if not rel.parts:
return html
host = rel.parts[0]
fallback_dir = mirror_root / host / "404"
fallback_exists = (fallback_dir / "index.html").exists()
page_dir = html_path.parent
def replace_anchor_href(match: re.Match[str]) -> str:
prefix = match.group("prefix")
quote = match.group("quote")
href = match.group("url")
if not href.startswith(("./", "../")):
return match.group(0)
target_dir = (page_dir / href).resolve()
target_file = target_dir / "index.html"
if target_file.exists():
return match.group(0)
if route_map:
synthetic_url = synthesize_route_url_from_target_dir(target_dir, mirror_root)
mapped_dir = route_map.get(synthetic_url) if synthetic_url else None
if mapped_dir and (mapped_dir / "index.html").exists():
relative = os.path.relpath(mapped_dir, page_dir).replace(os.sep, "/")
if relative == ".":
relative = "./"
elif not relative.endswith("/"):
relative = f"{relative}/"
return f"{prefix}{quote}{relative}{quote}"
if fallback_exists:
relative = os.path.relpath(fallback_dir, page_dir).replace(os.sep, "/")
if relative == ".":
relative = "./"
elif not relative.endswith("/"):
relative = f"{relative}/"
return f"{prefix}{quote}{relative}{quote}"
return match.group(0)
return re.sub(
r"""(?P<prefix><a\b[^>]*\shref=)(?P<quote>["'])(?P<url>[^"']+)(?P=quote)""",
replace_anchor_href,
html,
flags=re.IGNORECASE,
)
def rewrite_existing_html_links(mirror_root: Path) -> int:
route_map = load_route_map_from_manifests(mirror_root)
rewritten_count = 0
for html_path in mirror_root.rglob("index.html"):
page_url = reconstruct_page_url_from_path(html_path, mirror_root)
if not page_url:
continue
original = html_path.read_text(encoding="utf-8", errors="ignore")
updated = rewrite_html_navigation_urls(original, page_url, html_path.parent, mirror_root, route_map)
updated = sanitize_relative_anchor_links(updated, html_path, mirror_root, route_map)
updated = inject_static_helper_tags(updated)
if updated != original:
safe_write_text(html_path, updated)
rewritten_count += 1
return rewritten_count
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="复用已登录 Chrome 会话镜像站点")
parser.add_argument("--host", default=DEFAULT_HOST, help="限制抓取的主机名")
parser.add_argument("--cdp-port", type=int, default=DEFAULT_CDP_PORT, help="Chrome 远程调试端口")
parser.add_argument("--output-dir", default=".", help="抓取结果输出目录")
parser.add_argument("--max-pages", type=int, default=140, help="最多抓取页面数")
parser.add_argument("--timeout", type=int, default=20, help="单页等待秒数")
parser.add_argument("--seed-url", action="append", default=[], help="指定起始 URL可重复传入")
parser.add_argument("--seed-file", help="从文本文件读取起始 URL每行一个")
parser.add_argument("--no-discover", action="store_true", help="只抓取种子 URL不继续从页面内发现新链接")
parser.add_argument("--rewrite-links-only", action="store_true", help="仅重写现有 HTML 的本地导航链接")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
if args.seed_file:
seed_path = Path(args.seed_file)
seed_urls = [
line.strip()
for line in seed_path.read_text(encoding="utf-8").splitlines()
if line.strip() and not line.strip().startswith("#")
]
args.seed_url.extend(seed_urls)
if args.rewrite_links_only:
count = rewrite_existing_html_links(Path(args.output_dir).resolve())
print(f"[完成] 已重写 {count} 个 HTML 文件的本地导航链接")
return 0
return crawl_site(args)
except Exception as exc: # noqa: BLE001
print(f"[错误] {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())