#!/usr/bin/env python3 """Validation harness for the production ADI engine (family-v1). Checks, in order: 1. cross_process_reproducibility — runs compute_adi.py twice as separate processes and compares sha256 of both written artifacts. 2. engine_gates — every gate recorded in the written output passed. 3. seam_independent — recomputes the seam statistic from the published rows and matches it to the engine's recorded numbers. 4. gfc_independent — recomputes the worst 8-quarter window from the rows. 5. orientation — re-runs the perturbation gate in-process (sign read from each JSON's direction field), and asserts savings_rate is the only lower_is_worse input. 6. weights — registry-derived, uniform, sum to 1, zero hand-typed values. 7. hazen_parity — fixed tie-containing corpus through this engine's transform AND the sibling SDI / CDI implementations; all must agree. Until the SDI/CDI production ports land, the siblings are the committed prototypes under drafts/index_rebuild_2026-06-09/; the family parity test supersedes this check when those ports land. 8. coverage — published range contiguous; members_present bounds; the trailing unpublished quarters and which domains block them. The prototype-era analyses (old-engine overlay, rank-mean degeneracy, state panel cross-check) are recorded in the 2026-06-09 cold reviews and in the output's methodology block; they are point-in-time audit artifacts, not recurring validation, and do not run here. Run from the repo root: PYTHONPATH=. python3 scripts/indexes/validate_adi.py Writes data/audit/index_family/adi_validation.json. Exit code 0 only when every check passes. """ from __future__ import annotations import hashlib import importlib.util import json import logging import math import os import subprocess import sys import types from pathlib import Path from typing import Dict, List, Sequence, Tuple THIS_DIR = Path(__file__).resolve().parent REPO_ROOT = THIS_DIR.parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from scripts.indexes.family_normalization import hazen_percentiles_series # noqa: E402 from scripts.indexes import compute_adi # noqa: E402 logger = logging.getLogger("validate_adi") OUTPUT_JSON = REPO_ROOT / "data" / "indexes" / "adi.json" OUTPUT_CSV = REPO_ROOT / "data" / "indexes" / "adi.csv" REPORT_PATH = REPO_ROOT / "data" / "audit" / "index_family" / "adi_validation.json" SIBLING_PROTOTYPES = REPO_ROOT / "drafts" / "index_rebuild_2026-06-09" def sha256(path: Path) -> str: return hashlib.sha256(path.read_bytes()).hexdigest() def q_index(q: str) -> int: year, qq = q.split("-Q") return int(year) * 4 + int(qq) - 1 # --------------------------------------------------------------------------- def check_cross_process_reproducibility() -> Dict[str, object]: env = {**os.environ, "PYTHONPATH": "."} hashes: List[Dict[str, str]] = [] for run in (1, 2): proc = subprocess.run( [sys.executable, str(THIS_DIR / "compute_adi.py")], cwd=REPO_ROOT, env=env, capture_output=True, text=True, ) if proc.returncode != 0: return {"pass": False, "error": f"run {run} exited {proc.returncode}", "stderr": proc.stderr[-2000:]} hashes.append({p.name: sha256(p) for p in (OUTPUT_JSON, OUTPUT_CSV)}) return { "pass": hashes[0] == hashes[1], "run1": hashes[0], "run2": hashes[1], "rule": "two separate-process runs write byte-identical artifacts", } def check_engine_gates(output: Dict) -> Dict[str, object]: gates = {k: bool(v["pass"]) for k, v in output["validation"].items()} return {"pass": all(gates.values()), "gates": gates} def check_seam_independent(output: Dict) -> Dict[str, object]: rows = output["data"] comp = {r["quarter"]: r["composite"] for r in rows} qs = [r["quarter"] for r in rows] deltas = [abs(comp[b] - comp[a]) for a, b in zip(qs, qs[1:]) if q_index(b) - q_index(a) == 1] threshold = compute_adi.quantile_linear(deltas, 0.95) seam = abs(comp["2015-Q1"] - comp["2014-Q4"]) engine = output["validation"]["seam"] agree = (abs(seam - engine["seam_abs_delta_2014Q4_to_2015Q1"]) < 0.05 and abs(threshold - engine["threshold_p95_of_all_abs_deltas"]) < 0.05) return { "pass": seam <= threshold and agree, "seam_from_published_rows": round(seam, 4), "threshold_from_published_rows": round(threshold, 4), "matches_engine_numbers_within_rounding": agree, } def check_gfc_independent(output: Dict) -> Dict[str, object]: rows = output["data"] comp = {r["quarter"]: r["composite"] for r in rows} qs = [r["quarter"] for r in rows] window = 8 best_i, best_mean = 0, -1.0 for i in range(len(qs) - window + 1): m = sum(comp[q] for q in qs[i:i + window]) / window if m > best_mean: best_mean, best_i = m, i start = qs[best_i] in_gfc = q_index("2008-Q1") <= q_index(start) <= q_index("2010-Q4") return { "pass": in_gfc, "worst_8q_window": f"{start} to {qs[best_i + window - 1]}", "worst_8q_window_mean": round(best_mean, 2), "matches_engine_window": f"{start} to {qs[best_i + window - 1]}" == output["validation"]["gfc"]["worst_8q_window"], } def check_orientation() -> Dict[str, object]: loaded = compute_adi.load_inputs(REPO_ROOT) inversions = [iid for iid, li in loaded.items() if li.direction == "lower_is_worse"] baseline = compute_adi.build(loaded) gate = compute_adi.gate_orientation(loaded, baseline) only_savings = inversions == ["savings_rate"] return { "pass": bool(gate["pass"]) and only_savings, "inverted_inputs": inversions, "only_inversion_is_savings_rate": only_savings, "per_input": gate["per_input"], } def check_weights(output: Dict) -> Dict[str, object]: meth = output["methodology"] n_domains = len(meth["domains"]) ok = abs(meth["domain_weight"] - 1.0 / n_domains) < 1e-6 member_detail = {} total = 0.0 for d, members in meth["domains"].items(): w_m = 1.0 / len(members) member_detail[d] = {"n_members": len(members), "member_weight": round(w_m, 6)} total += meth["domain_weight"] * w_m * len(members) ok = ok and abs(total - 1.0) < 1e-6 return {"pass": ok, "domain_weight": meth["domain_weight"], "members": member_detail, "total_weight": round(total, 12)} def _import_sibling(name: str, path: Path) -> types.ModuleType: spec = importlib.util.spec_from_file_location(name, path) mod = importlib.util.module_from_spec(spec) sys.modules[name] = mod # dataclass processing requires the module registered spec.loader.exec_module(mod) return mod def check_hazen_parity() -> Dict[str, object]: corpora = { "ties_mixed": [3.2, 1.1, 3.2, 0.5, 9.9, 3.2, 1.1, 7.0], "all_equal": [2.0, 2.0, 2.0], "single": [7.7], "negatives": [-4.0, 0.0, -4.0, 12.5], } expected = { "ties_mixed": [56.25, 25.0, 56.25, 6.25, 93.75, 56.25, 25.0, 81.25], "all_equal": [50.0, 50.0, 50.0], "single": [50.0], "negatives": [25.0, 62.5, 25.0, 87.5], } detail: Dict[str, object] = {} ok = True for name, corpus in corpora.items(): mine = hazen_percentiles_series(corpus) match = all(abs(a - b) < 1e-9 for a, b in zip(mine, expected[name])) detail[f"adi_{name}"] = {"values": [round(v, 4) for v in mine], "matches_expected": match} ok = ok and match # Sibling engines for sib, fname, adapter in ( ("sdi", "compute_sdi.py", "dict"), ("cdi", "compute_cdi.py", "ndarray"), ): path = SIBLING_PROTOTYPES / sib / fname try: mod = _import_sibling(f"sibling_{sib}", path) fn = mod.hazen_percentiles corpus = corpora["ties_mixed"] if adapter == "dict": keyed = {f"k{i:02d}": v for i, v in enumerate(corpus)} got_map = fn(keyed, True) got = [got_map[f"k{i:02d}"] for i in range(len(corpus))] else: import numpy as np got = [float(x) for x in fn(np.array(corpus, dtype=float))] match = all(abs(a - b) < 1e-9 for a, b in zip(got, expected["ties_mixed"])) detail[sib] = {"values": [round(v, 4) for v in got], "matches_expected": match} ok = ok and match except Exception as exc: # noqa: BLE001 — parity must not crash the harness detail[sib] = {"skipped": f"{type(exc).__name__}: {exc}"} ok = False return {"pass": ok, "corpus": corpora["ties_mixed"], "expected": expected["ties_mixed"], "detail": detail, "rule": "one family transform: ADI, SDI, CDI implementations must agree on a tie-containing corpus"} def check_coverage(output: Dict) -> Dict[str, object]: rows = output["data"] qs = [r["quarter"] for r in rows] contiguous = all(q_index(b) - q_index(a) == 1 for a, b in zip(qs, qs[1:])) mp_bounds = {} for d in compute_adi.DOMAIN_IDS: counts = [r["domains"][d]["members_present"] for r in rows] mp_bounds[d] = {"min": min(counts), "max": max(counts), "registry_members": len(compute_adi.DOMAIN_MEMBERS[d])} # Trailing quarters with data but no publication, and the domains that block them loaded = compute_adi.load_inputs(REPO_ROOT) result = compute_adi.build(loaded) last_pub = q_index(qs[-1]) all_q = sorted({q for m in result.member_pct.values() for q in m}) trailing = [] for q in all_q: if q_index(q) <= last_pub: continue present = {d for d in compute_adi.DOMAIN_IDS if any(q in result.member_pct[m] for m in compute_adi.DOMAIN_MEMBERS[d])} trailing.append({"quarter": q, "domains_present": sorted(present), "domains_missing": sorted(set(compute_adi.DOMAIN_IDS) - present)}) return { "pass": contiguous and qs[0] == "2005-Q1", "published": f"{qs[0]} to {qs[-1]} ({len(qs)} quarters, contiguous={contiguous})", "members_present_bounds": mp_bounds, "trailing_unpublished": trailing, } # --------------------------------------------------------------------------- def main() -> int: logging.basicConfig(level=logging.INFO, format="%(message)s") checks: Dict[str, Dict[str, object]] = {} checks["cross_process_reproducibility"] = check_cross_process_reproducibility() output = json.loads(OUTPUT_JSON.read_text()) checks["engine_gates"] = check_engine_gates(output) checks["seam_independent"] = check_seam_independent(output) checks["gfc_independent"] = check_gfc_independent(output) checks["orientation"] = check_orientation() checks["weights"] = check_weights(output) checks["hazen_parity"] = check_hazen_parity() checks["coverage"] = check_coverage(output) REPORT_PATH.parent.mkdir(parents=True, exist_ok=True) REPORT_PATH.write_text(json.dumps(checks, indent=2, ensure_ascii=False) + "\n") all_pass = True for name, c in checks.items(): verdict = "PASS" if c["pass"] else "FAIL" logger.info("%-32s %s", name, verdict) if not c["pass"]: all_pass = False logger.info("report: %s", REPORT_PATH) return 0 if all_pass else 1 if __name__ == "__main__": raise SystemExit(main())