diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index 0718fc4..7e71b97 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -31,15 +31,27 @@ def get_content_release_id(publication_slug: str) -> str: return resp.json()["id"] -def get_all_release_ids(publication_slug: str) -> list[str]: - """Return all release IDs for a publication, newest first.""" +def _slug_to_time_period(slug: str) -> str | None: + """Convert a release slug like '2022-23' to a time_period like '202223'.""" + parts = slug.split("-") + if len(parts) == 2 and len(parts[0]) == 4 and len(parts[1]) == 2: + return parts[0] + parts[1] + return None + + +def get_all_releases(publication_slug: str) -> list[dict]: + """Return all releases for a publication as dicts with 'id' and 'time_period'.""" url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases" resp = requests.get(url, timeout=TIMEOUT) resp.raise_for_status() data = resp.json() # API returns either a plain list or a paginated object with a "results" key releases = data if isinstance(data, list) else data.get("results", []) - return [r["id"] for r in releases] + result = [] + for r in releases: + time_period = _slug_to_time_period(r.get("slug", "")) + result.append({"id": r["id"], "time_period": time_period}) + return result def download_release_zip(release_id: str) -> zipfile.ZipFile: @@ -71,15 +83,18 @@ class EESDatasetStream(Stream): latest_only = self.config.get("latest_only", False) if latest_only: - release_ids = [get_content_release_id(self._publication_slug)] + release_id = get_content_release_id(self._publication_slug) + releases = [{"id": release_id, "time_period": None}] else: - release_ids = get_all_release_ids(self._publication_slug) + releases = get_all_releases(self._publication_slug) self.logger.info( - "Found %d release(s) for %s", len(release_ids), self._publication_slug + "Found %d release(s) for %s", len(releases), self._publication_slug ) - for release_id in release_ids: + for release in releases: + release_id = release["id"] + release_time_period = release["time_period"] self.logger.info( "Downloading release %s for %s", release_id, self._publication_slug ) @@ -134,6 +149,10 @@ class EESDatasetStream(Stream): # Normalise URN column to 'school_urn' for consistency if self._urn_column in record and self._urn_column != "school_urn": record["school_urn"] = record.pop(self._urn_column) + # Inject time_period from release metadata if absent (older files + # don't include this column — it's only in the filename/release slug) + if "time_period" not in record and release_time_period: + record["time_period"] = release_time_period yield record