"""Diagnostic script — run inside the pipeline container to test EES API access. Usage: python scripts/diagnose_ees_ks4.py Prints: - Whether the EES API is reachable and returns a valid release ID - Whether the ZIP download works - The list of CSV files inside the ZIP (so you can verify the filename match) - Row count and column names from the target CSV """ import io import sys import zipfile import requests CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api" PUBLICATION_SLUG = "key-stage-4-performance" TARGET_FILENAME = "performance_tables_schools" TIMEOUT = 120 def main(): # Step 1: get release ID url = f"{CONTENT_API_BASE}/publications/{PUBLICATION_SLUG}/releases/latest" print(f"\n[1] GET {url}") try: resp = requests.get(url, timeout=TIMEOUT) resp.raise_for_status() except Exception as e: print(f" FAILED: {e}") sys.exit(1) data = resp.json() release_id = data.get("id") release_title = data.get("title", "?") print(f" OK — release_id={release_id!r} title={release_title!r}") if not release_id: print(" ERROR: no 'id' field in response. Response keys:", list(data.keys())) sys.exit(1) # Step 2: download the ZIP zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files" print(f"\n[2] GET {zip_url} (ZIP download, may be slow...)") try: resp2 = requests.get(zip_url, timeout=300, stream=True) resp2.raise_for_status() content_type = resp2.headers.get("Content-Type", "?") content = resp2.content print(f" OK — Content-Type={content_type!r} size={len(content):,} bytes") except Exception as e: print(f" FAILED: {e}") sys.exit(1) # Step 3: open as ZIP print(f"\n[3] Opening as ZIP...") try: zf = zipfile.ZipFile(io.BytesIO(content)) except zipfile.BadZipFile as e: print(f" FAILED — not a valid ZIP: {e}") print(f" First 200 bytes of response: {content[:200]!r}") sys.exit(1) all_files = zf.namelist() csv_files = [n for n in all_files if n.endswith(".csv")] print(f" OK — {len(all_files)} files total, {len(csv_files)} CSVs") print(" CSV files:") for f in csv_files: print(f" {f}") # Step 4: find target file print(f"\n[4] Looking for file matching '{TARGET_FILENAME}'...") target = None for name in all_files: if TARGET_FILENAME in name and name.endswith(".csv"): target = name break if not target: print(f" NOT FOUND. No CSV contains '{TARGET_FILENAME}'.") print(" All CSV names listed above — update _target_filename in tap.py to match.") sys.exit(1) print(f" Found: {target!r}") # Step 5: read and report print(f"\n[5] Reading CSV...") import pandas as pd with zf.open(target) as f: df = pd.read_csv(f, dtype=str, keep_default_na=False, nrows=5) print(f" Columns ({len(df.columns)}):") for col in df.columns: print(f" {col!r}") print(f"\n[6] Filtering to school level and checking breakdown values...") with zf.open(target) as f: df_full = pd.read_csv(f, dtype=str, keep_default_na=False) if "geographic_level" in df_full.columns: school_df = df_full[df_full["geographic_level"] == "School"] print(f" School-level rows: {len(school_df):,} / {len(df_full):,} total") else: school_df = df_full print(f" No geographic_level column — total rows: {len(df_full):,}") for col in ["breakdown_topic", "breakdown", "sex"]: if col in school_df.columns: vals = school_df[col].value_counts().head(10) print(f" {col} values: {vals.index.tolist()[:10]}") matching = school_df for col, val in [("breakdown_topic", "All pupils"), ("breakdown", "Total"), ("sex", "Total")]: if col in matching.columns: matching = matching[matching[col] == val] print(f"\n Rows matching breakdown_topic='All pupils', breakdown='Total', sex='Total': {len(matching):,}") if len(matching) > 0: print(" LOOKS GOOD — data should flow through dbt once tap is fixed.") else: print(" WARNING — 0 rows match the dbt filter. Check breakdown values above.") print("\nDone.") if __name__ == "__main__": main()