chore(pipeline): add EES KS4 tap diagnostic script
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 2m28s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m11s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m28s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-28 18:26:15 +00:00
parent 3d1c4c61c9
commit 3e787b395f

View File

@@ -0,0 +1,130 @@
"""Diagnostic script — run inside the pipeline container to test EES API access.
Usage:
python scripts/diagnose_ees_ks4.py
Prints:
- Whether the EES API is reachable and returns a valid release ID
- Whether the ZIP download works
- The list of CSV files inside the ZIP (so you can verify the filename match)
- Row count and column names from the target CSV
"""
import io
import sys
import zipfile
import requests
CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api"
PUBLICATION_SLUG = "key-stage-4-performance"
TARGET_FILENAME = "performance_tables_schools"
TIMEOUT = 120
def main():
# Step 1: get release ID
url = f"{CONTENT_API_BASE}/publications/{PUBLICATION_SLUG}/releases/latest"
print(f"\n[1] GET {url}")
try:
resp = requests.get(url, timeout=TIMEOUT)
resp.raise_for_status()
except Exception as e:
print(f" FAILED: {e}")
sys.exit(1)
data = resp.json()
release_id = data.get("id")
release_title = data.get("title", "?")
print(f" OK — release_id={release_id!r} title={release_title!r}")
if not release_id:
print(" ERROR: no 'id' field in response. Response keys:", list(data.keys()))
sys.exit(1)
# Step 2: download the ZIP
zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files"
print(f"\n[2] GET {zip_url} (ZIP download, may be slow...)")
try:
resp2 = requests.get(zip_url, timeout=300, stream=True)
resp2.raise_for_status()
content_type = resp2.headers.get("Content-Type", "?")
content = resp2.content
print(f" OK — Content-Type={content_type!r} size={len(content):,} bytes")
except Exception as e:
print(f" FAILED: {e}")
sys.exit(1)
# Step 3: open as ZIP
print(f"\n[3] Opening as ZIP...")
try:
zf = zipfile.ZipFile(io.BytesIO(content))
except zipfile.BadZipFile as e:
print(f" FAILED — not a valid ZIP: {e}")
print(f" First 200 bytes of response: {content[:200]!r}")
sys.exit(1)
all_files = zf.namelist()
csv_files = [n for n in all_files if n.endswith(".csv")]
print(f" OK — {len(all_files)} files total, {len(csv_files)} CSVs")
print(" CSV files:")
for f in csv_files:
print(f" {f}")
# Step 4: find target file
print(f"\n[4] Looking for file matching '{TARGET_FILENAME}'...")
target = None
for name in all_files:
if TARGET_FILENAME in name and name.endswith(".csv"):
target = name
break
if not target:
print(f" NOT FOUND. No CSV contains '{TARGET_FILENAME}'.")
print(" All CSV names listed above — update _target_filename in tap.py to match.")
sys.exit(1)
print(f" Found: {target!r}")
# Step 5: read and report
print(f"\n[5] Reading CSV...")
import pandas as pd
with zf.open(target) as f:
df = pd.read_csv(f, dtype=str, keep_default_na=False, nrows=5)
print(f" Columns ({len(df.columns)}):")
for col in df.columns:
print(f" {col!r}")
print(f"\n[6] Filtering to school level and checking breakdown values...")
with zf.open(target) as f:
df_full = pd.read_csv(f, dtype=str, keep_default_na=False)
if "geographic_level" in df_full.columns:
school_df = df_full[df_full["geographic_level"] == "School"]
print(f" School-level rows: {len(school_df):,} / {len(df_full):,} total")
else:
school_df = df_full
print(f" No geographic_level column — total rows: {len(df_full):,}")
for col in ["breakdown_topic", "breakdown", "sex"]:
if col in school_df.columns:
vals = school_df[col].value_counts().head(10)
print(f" {col} values: {vals.index.tolist()[:10]}")
matching = school_df
for col, val in [("breakdown_topic", "All pupils"), ("breakdown", "Total"), ("sex", "Total")]:
if col in matching.columns:
matching = matching[matching[col] == val]
print(f"\n Rows matching breakdown_topic='All pupils', breakdown='Total', sex='Total': {len(matching):,}")
if len(matching) > 0:
print(" LOOKS GOOD — data should flow through dbt once tap is fixed.")
else:
print(" WARNING — 0 rows match the dbt filter. Check breakdown values above.")
print("\nDone.")
if __name__ == "__main__":
main()