fix(tap-uk-ees): inject time_period from release slug when absent in CSV
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m37s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m37s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
Older census (and other) files don't include a time_period column. Derive it from the release slug (e.g. '2022-23' → '202223') and inject it into records so the required Singer schema field is always present. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -31,15 +31,27 @@ def get_content_release_id(publication_slug: str) -> str:
|
|||||||
return resp.json()["id"]
|
return resp.json()["id"]
|
||||||
|
|
||||||
|
|
||||||
def get_all_release_ids(publication_slug: str) -> list[str]:
|
def _slug_to_time_period(slug: str) -> str | None:
|
||||||
"""Return all release IDs for a publication, newest first."""
|
"""Convert a release slug like '2022-23' to a time_period like '202223'."""
|
||||||
|
parts = slug.split("-")
|
||||||
|
if len(parts) == 2 and len(parts[0]) == 4 and len(parts[1]) == 2:
|
||||||
|
return parts[0] + parts[1]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_releases(publication_slug: str) -> list[dict]:
|
||||||
|
"""Return all releases for a publication as dicts with 'id' and 'time_period'."""
|
||||||
url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases"
|
url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases"
|
||||||
resp = requests.get(url, timeout=TIMEOUT)
|
resp = requests.get(url, timeout=TIMEOUT)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
# API returns either a plain list or a paginated object with a "results" key
|
# API returns either a plain list or a paginated object with a "results" key
|
||||||
releases = data if isinstance(data, list) else data.get("results", [])
|
releases = data if isinstance(data, list) else data.get("results", [])
|
||||||
return [r["id"] for r in releases]
|
result = []
|
||||||
|
for r in releases:
|
||||||
|
time_period = _slug_to_time_period(r.get("slug", ""))
|
||||||
|
result.append({"id": r["id"], "time_period": time_period})
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def download_release_zip(release_id: str) -> zipfile.ZipFile:
|
def download_release_zip(release_id: str) -> zipfile.ZipFile:
|
||||||
@@ -71,15 +83,18 @@ class EESDatasetStream(Stream):
|
|||||||
|
|
||||||
latest_only = self.config.get("latest_only", False)
|
latest_only = self.config.get("latest_only", False)
|
||||||
if latest_only:
|
if latest_only:
|
||||||
release_ids = [get_content_release_id(self._publication_slug)]
|
release_id = get_content_release_id(self._publication_slug)
|
||||||
|
releases = [{"id": release_id, "time_period": None}]
|
||||||
else:
|
else:
|
||||||
release_ids = get_all_release_ids(self._publication_slug)
|
releases = get_all_releases(self._publication_slug)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Found %d release(s) for %s", len(release_ids), self._publication_slug
|
"Found %d release(s) for %s", len(releases), self._publication_slug
|
||||||
)
|
)
|
||||||
|
|
||||||
for release_id in release_ids:
|
for release in releases:
|
||||||
|
release_id = release["id"]
|
||||||
|
release_time_period = release["time_period"]
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Downloading release %s for %s", release_id, self._publication_slug
|
"Downloading release %s for %s", release_id, self._publication_slug
|
||||||
)
|
)
|
||||||
@@ -134,6 +149,10 @@ class EESDatasetStream(Stream):
|
|||||||
# Normalise URN column to 'school_urn' for consistency
|
# Normalise URN column to 'school_urn' for consistency
|
||||||
if self._urn_column in record and self._urn_column != "school_urn":
|
if self._urn_column in record and self._urn_column != "school_urn":
|
||||||
record["school_urn"] = record.pop(self._urn_column)
|
record["school_urn"] = record.pop(self._urn_column)
|
||||||
|
# Inject time_period from release metadata if absent (older files
|
||||||
|
# don't include this column — it's only in the filename/release slug)
|
||||||
|
if "time_period" not in record and release_time_period:
|
||||||
|
record["time_period"] = release_time_period
|
||||||
yield record
|
yield record
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user