From 3e787b395f2dd4a0b00f48ef907b5a029232e891 Mon Sep 17 00:00:00 2001 From: Tudor Date: Sat, 28 Mar 2026 18:26:15 +0000 Subject: [PATCH] chore(pipeline): add EES KS4 tap diagnostic script Co-Authored-By: Claude Sonnet 4.6 --- pipeline/scripts/diagnose_ees_ks4.py | 130 +++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 pipeline/scripts/diagnose_ees_ks4.py diff --git a/pipeline/scripts/diagnose_ees_ks4.py b/pipeline/scripts/diagnose_ees_ks4.py new file mode 100644 index 0000000..8bed07f --- /dev/null +++ b/pipeline/scripts/diagnose_ees_ks4.py @@ -0,0 +1,130 @@ +"""Diagnostic script — run inside the pipeline container to test EES API access. + +Usage: + python scripts/diagnose_ees_ks4.py + +Prints: + - Whether the EES API is reachable and returns a valid release ID + - Whether the ZIP download works + - The list of CSV files inside the ZIP (so you can verify the filename match) + - Row count and column names from the target CSV +""" + +import io +import sys +import zipfile + +import requests + +CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api" +PUBLICATION_SLUG = "key-stage-4-performance" +TARGET_FILENAME = "performance_tables_schools" +TIMEOUT = 120 + + +def main(): + # Step 1: get release ID + url = f"{CONTENT_API_BASE}/publications/{PUBLICATION_SLUG}/releases/latest" + print(f"\n[1] GET {url}") + try: + resp = requests.get(url, timeout=TIMEOUT) + resp.raise_for_status() + except Exception as e: + print(f" FAILED: {e}") + sys.exit(1) + + data = resp.json() + release_id = data.get("id") + release_title = data.get("title", "?") + print(f" OK — release_id={release_id!r} title={release_title!r}") + + if not release_id: + print(" ERROR: no 'id' field in response. Response keys:", list(data.keys())) + sys.exit(1) + + # Step 2: download the ZIP + zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files" + print(f"\n[2] GET {zip_url} (ZIP download, may be slow...)") + try: + resp2 = requests.get(zip_url, timeout=300, stream=True) + resp2.raise_for_status() + content_type = resp2.headers.get("Content-Type", "?") + content = resp2.content + print(f" OK — Content-Type={content_type!r} size={len(content):,} bytes") + except Exception as e: + print(f" FAILED: {e}") + sys.exit(1) + + # Step 3: open as ZIP + print(f"\n[3] Opening as ZIP...") + try: + zf = zipfile.ZipFile(io.BytesIO(content)) + except zipfile.BadZipFile as e: + print(f" FAILED — not a valid ZIP: {e}") + print(f" First 200 bytes of response: {content[:200]!r}") + sys.exit(1) + + all_files = zf.namelist() + csv_files = [n for n in all_files if n.endswith(".csv")] + print(f" OK — {len(all_files)} files total, {len(csv_files)} CSVs") + print(" CSV files:") + for f in csv_files: + print(f" {f}") + + # Step 4: find target file + print(f"\n[4] Looking for file matching '{TARGET_FILENAME}'...") + target = None + for name in all_files: + if TARGET_FILENAME in name and name.endswith(".csv"): + target = name + break + + if not target: + print(f" NOT FOUND. No CSV contains '{TARGET_FILENAME}'.") + print(" All CSV names listed above — update _target_filename in tap.py to match.") + sys.exit(1) + + print(f" Found: {target!r}") + + # Step 5: read and report + print(f"\n[5] Reading CSV...") + import pandas as pd + with zf.open(target) as f: + df = pd.read_csv(f, dtype=str, keep_default_na=False, nrows=5) + + print(f" Columns ({len(df.columns)}):") + for col in df.columns: + print(f" {col!r}") + + print(f"\n[6] Filtering to school level and checking breakdown values...") + with zf.open(target) as f: + df_full = pd.read_csv(f, dtype=str, keep_default_na=False) + + if "geographic_level" in df_full.columns: + school_df = df_full[df_full["geographic_level"] == "School"] + print(f" School-level rows: {len(school_df):,} / {len(df_full):,} total") + else: + school_df = df_full + print(f" No geographic_level column — total rows: {len(df_full):,}") + + for col in ["breakdown_topic", "breakdown", "sex"]: + if col in school_df.columns: + vals = school_df[col].value_counts().head(10) + print(f" {col} values: {vals.index.tolist()[:10]}") + + matching = school_df + for col, val in [("breakdown_topic", "All pupils"), ("breakdown", "Total"), ("sex", "Total")]: + if col in matching.columns: + matching = matching[matching[col] == val] + print(f"\n Rows matching breakdown_topic='All pupils', breakdown='Total', sex='Total': {len(matching):,}") + + if len(matching) > 0: + print(" LOOKS GOOD — data should flow through dbt once tap is fixed.") + else: + print(" WARNING — 0 rows match the dbt filter. Check breakdown values above.") + + print("\nDone.") + + +if __name__ == "__main__": + main()