Files
hvm-docs/scrape/runner.py

340 lines
13 KiB
Python

"""Scrape HVM doc bundles into corpus/<slug>/<page_id>.{md,json}.
Reads bundles.json (produced by scrape.bundles), then for each bundle:
- mode="toc": walks the TOC tree, fetches each page via the render
endpoint, converts page_html to markdown, writes
<page_id>.md + <page_id>.json sidecar.
- mode="single": fetches /document/{docId} directly, treats the whole
body as one page with page_id = doc_id.
After all bundles are on disk, runs a finalize pass that synthesizes
topic_cluster.clustered_topics for each page by looking up the same
GUID in sibling bundles (HPE GUIDs are stable across versions — see
reference_hpe_docs_portal_api.md).
Usage:
python -m scrape.runner --all
python -m scrape.runner --bundle hvm_user_manual_8_1_2
python -m scrape.runner --all --force # re-download already-on-disk pages
python -m scrape.runner --finalize-only # only redo the topic_cluster pass
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md
API = "https://support.hpe.com/hpesc/public/api/document"
DOC_URL = "https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}&page={page_id}.html"
DOC_URL_SINGLE = "https://support.hpe.com/hpesc/public/docDisplay?docId={doc_id}"
UA = "hvm-docs-mcp/0.1 (+https://git.jpaul.io/justin/hvm-docs; admin@jpaul.io)"
ROOT = Path(__file__).resolve().parent.parent
CORPUS = ROOT / "corpus"
BUNDLES_JSON = ROOT / "bundles.json"
GUID_RE = re.compile(r"page=(GUID-[A-F0-9-]+)\.html")
@dataclass
class TocEntry:
page_id: str
title: str
ordinal: int
parent_title: str | None
def _session() -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": UA, "Accept": "application/json, text/html"})
return s
def _get(s: requests.Session, url: str, expect_json: bool = False, retries: int = 4) -> Any:
delay = 1.0
for attempt in range(retries):
r = s.get(url, timeout=30)
if r.status_code == 200:
return r.json() if expect_json else r.text
if r.status_code == 404:
return None
if r.status_code in (429, 500, 502, 503, 504):
time.sleep(delay)
delay *= 2
continue
r.raise_for_status()
raise RuntimeError(f"GET failed after {retries} retries: {url}")
def _flatten_toc(toc: list[dict]) -> list[TocEntry]:
out: list[TocEntry] = []
ordinal = 0
def walk(nodes: list[dict] | None, parent_title: str | None) -> None:
nonlocal ordinal
for node in nodes or []:
title = node.get("topicName") or ""
link = node.get("topicLink") or ""
m = GUID_RE.search(link)
if m:
ordinal += 1
out.append(TocEntry(page_id=m.group(1), title=title, ordinal=ordinal, parent_title=parent_title))
walk(node.get("children"), title or parent_title)
walk(toc, None)
return out
def _strip_dita_wrappers(html: str) -> str:
"""Remove the outer <main class="ditasrc">, drop the trademark Notices section,
and unwrap aria-only span markup so markdownify produces clean text.
DITA's notices boilerplate repeats across every doc; if we leave it in,
every page chunk inherits the same trademark text and pollutes retrieval."""
soup = BeautifulSoup(html, "html.parser")
# Drop the Notices/Acknowledgments/Abstract boilerplate by section heading.
# Every doc on the portal carries the same legal Notices and trademark
# Acknowledgments; if we leave them in, every chunk inherits the same
# text and pollutes retrieval. Abstract is one-line marketing.
boilerplate = {"Notices", "Acknowledgments", "Abstract"}
# Wrapped form: <article>/<section>/<div> whose first heading child is boilerplate.
for sec in soup.select("article, section, div"):
h = sec.find(["h1", "h2"], recursive=False)
if h and h.get_text(strip=True) in boilerplate:
sec.decompose()
# Unwrapped form: bare <h1>/<h2>Boilerplate</h2> followed by its .desc/.body sibling.
for h in soup.find_all(["h1", "h2"]):
if h.get_text(strip=True) in boilerplate:
sib = h.find_next_sibling()
if sib and (sib.name in {"div", "section"}):
cls = " ".join(sib.get("class", []) or [])
if "desc" in cls or "body" in cls or "notices" in cls:
sib.decompose()
h.decompose()
main = soup.find("main")
return str(main) if main else str(soup)
def html_to_md(page_html: str) -> str:
cleaned = _strip_dita_wrappers(page_html)
text = md(cleaned, heading_style="ATX", bullets="-")
# collapse runs of blank lines
text = re.sub(r"\n{3,}", "\n\n", text).strip()
return text + "\n"
def fetch_toc_page(s: requests.Session, doc_id: str, page_id: str) -> str:
payload = _get(s, f"{API}/{doc_id}/render?page={page_id}.html", expect_json=True)
if not payload:
return ""
return payload.get("page_html") or ""
def fetch_single_doc(s: requests.Session, doc_id: str) -> tuple[str, str]:
"""Returns (page_html, title) for a single-doc-shape bundle."""
html = _get(s, f"{API}/{doc_id}")
if not html:
return "", ""
soup = BeautifulSoup(html, "html.parser")
h1 = soup.select_one("h1.title.topictitle1")
title = h1.get_text(" ", strip=True) if h1 else doc_id
return html, title
def write_page(bundle_dir: Path, page_id: str, body_md: str, sidecar: dict[str, Any], force: bool) -> bool:
bundle_dir.mkdir(parents=True, exist_ok=True)
md_path = bundle_dir / f"{page_id}.md"
json_path = bundle_dir / f"{page_id}.json"
if not force and md_path.exists() and json_path.exists():
return False
md_path.write_text(body_md)
json_path.write_text(json.dumps(sidecar, indent=2) + "\n")
return True
def scrape_toc_bundle(s: requests.Session, bundle: dict, force: bool, concurrency: int) -> int:
doc_id = bundle["doc_id"]
slug = bundle["slug"]
bundle_dir = CORPUS / slug
toc = _get(s, f"{API}/{doc_id}/toc", expect_json=True) or []
entries = _flatten_toc(toc)
print(f" {slug}: {len(entries)} pages", file=sys.stderr)
written = 0
def do_one(entry: TocEntry) -> bool:
page_html = fetch_toc_page(s, doc_id, entry.page_id)
if not page_html:
return False
body_md = html_to_md(page_html)
sidecar = {
"bundle_id": slug,
"page_id": entry.page_id,
"title": entry.title,
"ordinal": entry.ordinal,
"parent_title": entry.parent_title,
"doc_id": doc_id,
"version": bundle.get("version"),
"product": bundle.get("product"),
"source_url": DOC_URL.format(doc_id=doc_id, page_id=entry.page_id),
# topic_cluster filled in by finalize()
}
return write_page(bundle_dir, entry.page_id, body_md, sidecar, force)
with ThreadPoolExecutor(max_workers=concurrency) as pool:
for fut in as_completed(pool.submit(do_one, e) for e in entries):
if fut.result():
written += 1
return written
def scrape_single_bundle(s: requests.Session, bundle: dict, force: bool) -> int:
doc_id = bundle["doc_id"]
slug = bundle["slug"]
bundle_dir = CORPUS / slug
html, title = fetch_single_doc(s, doc_id)
if not html:
print(f" ! {slug}: empty body", file=sys.stderr)
return 0
body_md = html_to_md(html)
sidecar = {
"bundle_id": slug,
"page_id": doc_id,
"title": title or bundle["title"],
"ordinal": 1,
"parent_title": None,
"doc_id": doc_id,
"version": bundle.get("version"),
"product": bundle.get("product"),
"source_url": DOC_URL_SINGLE.format(doc_id=doc_id),
}
print(f" {slug}: 1 page (single-doc)", file=sys.stderr)
return 1 if write_page(bundle_dir, doc_id, body_md, sidecar, force) else 0
def finalize_clusters(bundles: list[dict]) -> int:
"""Cross-link sibling pages with the same GUID across version bundles.
For TOC bundles, page_id == GUID; same GUID across two bundles = same
underlying topic. For single-doc bundles (page_id == doc_id), peer them
by matching product+version-sibling on the `product` field."""
# GUID → list[(slug, sidecar_path, sidecar_dict)]
guid_to_pages: dict[str, list[tuple[str, Path, dict]]] = {}
# product → list[(slug, sidecar_path, sidecar_dict)] for single-doc peering
product_to_pages: dict[str, list[tuple[str, Path, dict]]] = {}
for b in bundles:
slug = b["slug"]
bundle_dir = CORPUS / slug
if not bundle_dir.exists():
continue
for jp in bundle_dir.glob("*.json"):
data = json.loads(jp.read_text())
pid = data["page_id"]
if pid.startswith("GUID-"):
guid_to_pages.setdefault(pid, []).append((slug, jp, data))
else:
product_to_pages.setdefault(b["product"], []).append((slug, jp, data))
updated = 0
# TOC pages — cluster by GUID
for guid, peers in guid_to_pages.items():
if len(peers) < 2:
continue
for slug, jp, data in peers:
others = [
{"bundle_id": s2, "page_id": guid, "clustering_title": d2.get("title", "")}
for s2, _, d2 in peers if s2 != slug
]
data["topic_cluster"] = {"clustering_title": data.get("title", ""), "clustered_topics": others}
jp.write_text(json.dumps(data, indent=2) + "\n")
updated += 1
# Single-doc pages — cluster by product (e.g. Release Notes 8.1.0/.1/.2)
for product, peers in product_to_pages.items():
if len(peers) < 2:
continue
for slug, jp, data in peers:
others = [
{"bundle_id": s2, "page_id": d2["page_id"], "clustering_title": d2.get("title", "")}
for s2, _, d2 in peers if s2 != slug
]
data["topic_cluster"] = {"clustering_title": data.get("title", ""), "clustered_topics": others}
jp.write_text(json.dumps(data, indent=2) + "\n")
updated += 1
return updated
def main() -> int:
p = argparse.ArgumentParser(description="Scrape HVM bundles into corpus/.")
p.add_argument("--all", action="store_true", help="scrape every bundle in bundles.json")
p.add_argument("--bundle", action="append", help="scrape one bundle by slug (repeatable)")
p.add_argument("--force", action="store_true", help="re-fetch pages already on disk")
p.add_argument("--concurrency", type=int, default=6)
p.add_argument("--finalize-only", action="store_true", help="only rebuild topic_cluster sidecar fields")
args = p.parse_args()
if not BUNDLES_JSON.exists():
print(f"bundles.json missing — run `python -m scrape.bundles` first", file=sys.stderr)
return 2
bundles = json.loads(BUNDLES_JSON.read_text())
if args.finalize_only:
n = finalize_clusters(bundles)
print(f"finalize: updated topic_cluster on {n} sidecars", file=sys.stderr)
return 0
if args.bundle:
bundles = [b for b in bundles if b["slug"] in args.bundle]
if not bundles:
print(f"no bundles matched: {args.bundle}", file=sys.stderr)
return 2
elif not args.all:
print("specify --all or --bundle <slug>", file=sys.stderr)
return 2
s = _session()
total = 0
for b in bundles:
mode = b.get("mode")
if mode == "single":
total += scrape_single_bundle(s, b, args.force)
elif mode == "html-file":
# Live-scrape HPE collateral (QuickSpecs) via curl_cffi; falls back
# to scrape/quickspecs/<doc_id>.html fixture if the edge blocks us.
from scrape.quickspecs import scrape_quickspecs
ok = scrape_quickspecs(
doc_id=b["doc_id"], bundle_id=b["slug"],
title=b.get("title", b["doc_id"]),
version=b.get("version"),
product=b.get("product", "QuickSpecs"),
source_url=b.get("source_url"),
force=args.force,
)
total += 1 if ok else 0
else:
total += scrape_toc_bundle(s, b, args.force, args.concurrency)
print(f"scraped {total} new/updated pages", file=sys.stderr)
# Always finalize after a scrape so sidecars are consistent.
all_bundles = json.loads(BUNDLES_JSON.read_text())
n = finalize_clusters(all_bundles)
print(f"finalize: updated topic_cluster on {n} sidecars", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())