All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 2m28s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m11s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m28s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
131 lines
4.4 KiB
Python
131 lines
4.4 KiB
Python
"""Diagnostic script — run inside the pipeline container to test EES API access.
|
|
|
|
Usage:
|
|
python scripts/diagnose_ees_ks4.py
|
|
|
|
Prints:
|
|
- Whether the EES API is reachable and returns a valid release ID
|
|
- Whether the ZIP download works
|
|
- The list of CSV files inside the ZIP (so you can verify the filename match)
|
|
- Row count and column names from the target CSV
|
|
"""
|
|
|
|
import io
|
|
import sys
|
|
import zipfile
|
|
|
|
import requests
|
|
|
|
CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api"
|
|
PUBLICATION_SLUG = "key-stage-4-performance"
|
|
TARGET_FILENAME = "performance_tables_schools"
|
|
TIMEOUT = 120
|
|
|
|
|
|
def main():
|
|
# Step 1: get release ID
|
|
url = f"{CONTENT_API_BASE}/publications/{PUBLICATION_SLUG}/releases/latest"
|
|
print(f"\n[1] GET {url}")
|
|
try:
|
|
resp = requests.get(url, timeout=TIMEOUT)
|
|
resp.raise_for_status()
|
|
except Exception as e:
|
|
print(f" FAILED: {e}")
|
|
sys.exit(1)
|
|
|
|
data = resp.json()
|
|
release_id = data.get("id")
|
|
release_title = data.get("title", "?")
|
|
print(f" OK — release_id={release_id!r} title={release_title!r}")
|
|
|
|
if not release_id:
|
|
print(" ERROR: no 'id' field in response. Response keys:", list(data.keys()))
|
|
sys.exit(1)
|
|
|
|
# Step 2: download the ZIP
|
|
zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files"
|
|
print(f"\n[2] GET {zip_url} (ZIP download, may be slow...)")
|
|
try:
|
|
resp2 = requests.get(zip_url, timeout=300, stream=True)
|
|
resp2.raise_for_status()
|
|
content_type = resp2.headers.get("Content-Type", "?")
|
|
content = resp2.content
|
|
print(f" OK — Content-Type={content_type!r} size={len(content):,} bytes")
|
|
except Exception as e:
|
|
print(f" FAILED: {e}")
|
|
sys.exit(1)
|
|
|
|
# Step 3: open as ZIP
|
|
print(f"\n[3] Opening as ZIP...")
|
|
try:
|
|
zf = zipfile.ZipFile(io.BytesIO(content))
|
|
except zipfile.BadZipFile as e:
|
|
print(f" FAILED — not a valid ZIP: {e}")
|
|
print(f" First 200 bytes of response: {content[:200]!r}")
|
|
sys.exit(1)
|
|
|
|
all_files = zf.namelist()
|
|
csv_files = [n for n in all_files if n.endswith(".csv")]
|
|
print(f" OK — {len(all_files)} files total, {len(csv_files)} CSVs")
|
|
print(" CSV files:")
|
|
for f in csv_files:
|
|
print(f" {f}")
|
|
|
|
# Step 4: find target file
|
|
print(f"\n[4] Looking for file matching '{TARGET_FILENAME}'...")
|
|
target = None
|
|
for name in all_files:
|
|
if TARGET_FILENAME in name and name.endswith(".csv"):
|
|
target = name
|
|
break
|
|
|
|
if not target:
|
|
print(f" NOT FOUND. No CSV contains '{TARGET_FILENAME}'.")
|
|
print(" All CSV names listed above — update _target_filename in tap.py to match.")
|
|
sys.exit(1)
|
|
|
|
print(f" Found: {target!r}")
|
|
|
|
# Step 5: read and report
|
|
print(f"\n[5] Reading CSV...")
|
|
import pandas as pd
|
|
with zf.open(target) as f:
|
|
df = pd.read_csv(f, dtype=str, keep_default_na=False, nrows=5)
|
|
|
|
print(f" Columns ({len(df.columns)}):")
|
|
for col in df.columns:
|
|
print(f" {col!r}")
|
|
|
|
print(f"\n[6] Filtering to school level and checking breakdown values...")
|
|
with zf.open(target) as f:
|
|
df_full = pd.read_csv(f, dtype=str, keep_default_na=False)
|
|
|
|
if "geographic_level" in df_full.columns:
|
|
school_df = df_full[df_full["geographic_level"] == "School"]
|
|
print(f" School-level rows: {len(school_df):,} / {len(df_full):,} total")
|
|
else:
|
|
school_df = df_full
|
|
print(f" No geographic_level column — total rows: {len(df_full):,}")
|
|
|
|
for col in ["breakdown_topic", "breakdown", "sex"]:
|
|
if col in school_df.columns:
|
|
vals = school_df[col].value_counts().head(10)
|
|
print(f" {col} values: {vals.index.tolist()[:10]}")
|
|
|
|
matching = school_df
|
|
for col, val in [("breakdown_topic", "All pupils"), ("breakdown", "Total"), ("sex", "Total")]:
|
|
if col in matching.columns:
|
|
matching = matching[matching[col] == val]
|
|
print(f"\n Rows matching breakdown_topic='All pupils', breakdown='Total', sex='Total': {len(matching):,}")
|
|
|
|
if len(matching) > 0:
|
|
print(" LOOKS GOOD — data should flow through dbt once tap is fixed.")
|
|
else:
|
|
print(" WARNING — 0 rows match the dbt filter. Check breakdown values above.")
|
|
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|