Files
wysite/__mirror/capture_cdp_page_apis.py

164 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""通过 Chrome DevTools Protocol 抓取页面接口响应。"""
from __future__ import annotations
import argparse
import asyncio
import base64
import json
import re
import urllib.request
from pathlib import Path
from typing import Any
import websockets
def load_pages() -> list[dict[str, Any]]:
with urllib.request.urlopen("http://127.0.0.1:9222/json/list") as response:
return json.loads(response.read().decode("utf-8"))
def select_page(pages: list[dict[str, Any]], page_prefix: str) -> dict[str, Any]:
for page in pages:
if page.get("type") != "page":
continue
if page.get("url", "").startswith(page_prefix):
return page
raise RuntimeError(f"未找到匹配页面前缀的标签页: {page_prefix}")
def sanitize_filename(value: str) -> str:
value = re.sub(r"[^a-zA-Z0-9._-]+", "_", value)
return value.strip("._") or "capture"
async def capture_page_apis(
ws_url: str,
navigate_url: str,
api_host_filter: str,
duration: int,
reload_page: bool,
) -> list[dict[str, Any]]:
pending: dict[str, dict[str, Any]] = {}
finished: dict[str, dict[str, Any]] = {}
next_id = 1
async with websockets.connect(ws_url, max_size=50_000_000) as ws:
async def send(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
nonlocal next_id
msg_id = next_id
next_id += 1
await ws.send(json.dumps({"id": msg_id, "method": method, "params": params or {}}))
while True:
msg = json.loads(await ws.recv())
if msg.get("id") == msg_id:
return msg
handle_event(msg)
def handle_event(msg: dict[str, Any]) -> None:
method = msg.get("method")
params = msg.get("params", {})
if method == "Network.responseReceived":
response = params.get("response", {})
url = response.get("url", "")
if api_host_filter not in url:
return
resource_type = params.get("type")
if resource_type not in ("XHR", "Fetch", "Preflight"):
return
pending[params["requestId"]] = {
"url": url,
"status": response.get("status"),
"mime_type": response.get("mimeType"),
"resource_type": resource_type,
}
elif method == "Network.loadingFinished":
request_id = params.get("requestId")
if request_id in pending:
finished[request_id] = pending[request_id]
elif method == "Network.loadingFailed":
request_id = params.get("requestId")
pending.pop(request_id, None)
await send("Network.enable", {"maxTotalBufferSize": 100000000, "maxResourceBufferSize": 50000000})
await send("Page.enable")
await send("Runtime.enable")
if reload_page:
await send("Page.reload", {"ignoreCache": True})
else:
await send("Page.navigate", {"url": navigate_url})
end_time = asyncio.get_event_loop().time() + duration
while asyncio.get_event_loop().time() < end_time:
try:
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=1))
except asyncio.TimeoutError:
continue
handle_event(msg)
results: list[dict[str, Any]] = []
for request_id, meta in finished.items():
body = ""
if meta["resource_type"] in ("XHR", "Fetch"):
reply = await send("Network.getResponseBody", {"requestId": request_id})
payload = reply.get("result", {})
body = payload.get("body", "")
if payload.get("base64Encoded"):
try:
body = base64.b64decode(body).decode("utf-8", errors="ignore")
except Exception: # noqa: BLE001
body = ""
results.append(
{
**meta,
"body_length": len(body),
"body_preview": body[:1000],
"body": body,
}
)
return results
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="抓取页面接口响应")
parser.add_argument("--page-prefix", required=True, help="已打开标签页 URL 前缀")
parser.add_argument("--navigate-url", required=True, help="要导航到的真实页面 URL")
parser.add_argument("--api-host-filter", required=True, help="接口域名关键字,例如 app-project-be.sqygj.cn")
parser.add_argument("--duration", type=int, default=10, help="导航后监听秒数")
parser.add_argument("--output", required=True, help="输出 JSON 文件路径")
parser.add_argument("--reload", action="store_true", help="不导航,直接对当前标签页执行 reload")
return parser
def main() -> int:
args = build_parser().parse_args()
try:
pages = load_pages()
page = select_page(pages, args.page_prefix)
results = asyncio.run(
capture_page_apis(
ws_url=page["webSocketDebuggerUrl"],
navigate_url=args.navigate_url,
api_host_filter=args.api_host_filter,
duration=args.duration,
reload_page=args.reload,
)
)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"已捕获 {len(results)} 条接口响应")
for item in results[:20]:
print(f"{item['status']} {item['resource_type']} {item['url']}")
return 0
except Exception as exc: # noqa: BLE001
print(f"抓取失败: {exc}")
return 1
if __name__ == "__main__":
raise SystemExit(main())