diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index e95f19d..554824f 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -31,6 +31,24 @@ def get_content_release_id(publication_slug: str) -> str: return resp.json()["id"] +def get_all_release_ids(publication_slug: str) -> list[str]: + """Return all release IDs for a publication, newest first.""" + url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases" + ids = [] + page = 1 + while True: + resp = requests.get(url, params={"page": page, "pageSize": 20}, timeout=TIMEOUT) + resp.raise_for_status() + data = resp.json() + for r in data.get("results", []): + ids.append(r["id"]) + paging = data.get("paging", {}) + if page >= paging.get("totalPages", 1): + break + page += 1 + return ids + + def download_release_zip(release_id: str) -> zipfile.ZipFile: """Download all data files for a release as a ZIP.""" url = f"{CONTENT_API_BASE}/releases/{release_id}/files" @@ -58,63 +76,72 @@ class EESDatasetStream(Stream): def get_records(self, context): import pandas as pd - release_id = get_content_release_id(self._publication_slug) + latest_only = self.config.get("latest_only", False) + if latest_only: + release_ids = [get_content_release_id(self._publication_slug)] + else: + release_ids = get_all_release_ids(self._publication_slug) + self.logger.info( - "Downloading release %s for %s", - release_id, - self._publication_slug, + "Found %d release(s) for %s", len(release_ids), self._publication_slug ) - zf = download_release_zip(release_id) - # Find the target file (substring match) - all_files = zf.namelist() - target = None - for name in all_files: - if self._target_filename in name and name.endswith(".csv"): - target = name - break - - if not target: - self.logger.error( - "File matching '%s' not found in ZIP. Available: %s", - self._target_filename, - [n for n in all_files if n.endswith(".csv")], + for release_id in release_ids: + self.logger.info( + "Downloading release %s for %s", release_id, self._publication_slug ) - return + zf = download_release_zip(release_id) - self.logger.info("Reading %s from ZIP", target) - with zf.open(target) as f: - df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding) + # Find the target file (substring match) + all_files = zf.namelist() + target = None + for name in all_files: + if self._target_filename in name and name.endswith(".csv"): + target = name + break - # Strip BOM from first column name — handles both: - # - UTF-8 BOM decoded as Unicode (\ufeff) when read with utf-8/utf-8-sig - # - UTF-8 BOM bytes decoded as Latin-1 () when read with latin-1 - cols = list(df.columns) - if cols: - cols[0] = cols[0].lstrip("\ufeff").lstrip("") - df.columns = cols + if not target: + self.logger.warning( + "File matching '%s' not found in release %s. Available: %s", + self._target_filename, + release_id, + [n for n in all_files if n.endswith(".csv")], + ) + continue - # Filter to school-level data if the column exists - if "geographic_level" in df.columns: - df = df[df["geographic_level"] == "School"] + self.logger.info("Reading %s from ZIP", target) + with zf.open(target) as f: + df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding) - # Drop rows with no URN (LA/category aggregates that slip through the level filter) - urn_col = self._urn_column - if urn_col in df.columns: - df = df[df[urn_col].notna() & (df[urn_col] != "")] + # Strip BOM from first column name — handles both: + # - UTF-8 BOM decoded as Unicode (\ufeff) when read with utf-8/utf-8-sig + # - UTF-8 BOM bytes decoded as Latin-1 () when read with latin-1 + cols = list(df.columns) + if cols: + cols[0] = cols[0].lstrip("\ufeff").lstrip("") + df.columns = cols - self.logger.info("Emitting %d school-level rows", len(df)) + # Filter to school-level data if the column exists + if "geographic_level" in df.columns: + df = df[df["geographic_level"] == "School"] - for _, row in df.iterrows(): - record = row.to_dict() - # Apply subclass column renames (messy CSV names → clean Singer fields) - for old, new in self._column_renames.items(): - if old in record: - record[new] = record.pop(old) - # Normalise URN column to 'school_urn' for consistency - if self._urn_column in record and self._urn_column != "school_urn": - record["school_urn"] = record.pop(self._urn_column) - yield record + # Drop rows with no URN (LA/category aggregates that slip through the level filter) + urn_col = self._urn_column + if urn_col in df.columns: + df = df[df[urn_col].notna() & (df[urn_col] != "")] + + self.logger.info("Emitting %d school-level rows from release %s", len(df), release_id) + + for _, row in df.iterrows(): + record = row.to_dict() + # Apply subclass column renames (messy CSV names → clean Singer fields) + for old, new in self._column_renames.items(): + if old in record: + record[new] = record.pop(old) + # Normalise URN column to 'school_urn' for consistency + if self._urn_column in record and self._urn_column != "school_urn": + record["school_urn"] = record.pop(self._urn_column) + yield record # ── KS2 Attainment (long format: one row per school × subject × breakdown) ── @@ -414,6 +441,12 @@ class TapUKEES(Tap): config_jsonschema = th.PropertiesList( th.Property("base_url", th.StringType, description="EES API base URL"), + th.Property( + "latest_only", + th.BooleanType, + description="Only fetch the latest release per publication (default: False — fetches all historical releases)", + default=False, + ), ).to_dict() def discover_streams(self):