""" Shared EES (Explore Education Statistics) API client. Two APIs are available: - Statistics API: https://api.education.gov.uk/statistics/v1 (only ~13 publications) - Content API: https://content.explore-education-statistics.service.gov.uk/api Covers all publications; use this for admissions and other data not in the stats API. Download all files for a release as a ZIP from /api/releases/{id}/files. """ import io import zipfile from pathlib import Path from typing import Optional import requests STATS_API_BASE = "https://api.education.gov.uk/statistics/v1" CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api" TIMEOUT = 60 def get_publication_files(publication_slug: str) -> list[dict]: """Return list of data-set file descriptors for a publication (statistics API).""" url = f"{STATS_API_BASE}/publications/{publication_slug}/data-set-files" resp = requests.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.json().get("results", []) def get_latest_csv_url(publication_slug: str, keyword: str = "") -> Optional[str]: """ Find the most recent CSV download URL for a publication (statistics API). Optionally filter by a keyword in the file name. """ files = get_publication_files(publication_slug) for entry in files: name = entry.get("name", "").lower() if keyword and keyword.lower() not in name: continue csv_url = entry.get("csvDownloadUrl") or entry.get("file", {}).get("url") if csv_url: return csv_url return None def get_content_release_id(publication_slug: str) -> str: """Return the latest release ID for a publication via the content API.""" url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases/latest" resp = requests.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.json()["id"] def download_release_zip_csv( publication_slug: str, dest_path: Path, zip_member_keyword: str = "", ) -> Path: """ Download the full-release ZIP from the EES content API and extract one CSV. If zip_member_keyword is given, the first member whose path contains that keyword (case-insensitive) is extracted; otherwise the first .csv found is used. Returns dest_path (the extracted CSV file). """ if dest_path.exists(): print(f" EES: {dest_path.name} already exists, skipping.") return dest_path release_id = get_content_release_id(publication_slug) zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files" print(f" EES: downloading release ZIP for '{publication_slug}' ...") resp = requests.get(zip_url, timeout=300, stream=True) resp.raise_for_status() data = b"".join(resp.iter_content(chunk_size=65536)) with zipfile.ZipFile(io.BytesIO(data)) as z: members = z.namelist() target = None kw = zip_member_keyword.lower() for m in members: if m.endswith(".csv") and (not kw or kw in m.lower()): target = m break if not target: raise ValueError( f"No CSV matching '{zip_member_keyword}' in ZIP. Members: {members}" ) print(f" EES: extracting '{target}' ...") dest_path.parent.mkdir(parents=True, exist_ok=True) with z.open(target) as src, open(dest_path, "wb") as dst: dst.write(src.read()) print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)") return dest_path def download_csv(url: str, dest_path: Path) -> Path: """Download a CSV from EES to dest_path.""" if dest_path.exists(): print(f" EES: {dest_path.name} already exists, skipping.") return dest_path print(f" EES: downloading {url} ...") resp = requests.get(url, timeout=300, stream=True) resp.raise_for_status() dest_path.parent.mkdir(parents=True, exist_ok=True) with open(dest_path, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)") return dest_path