feat(tap-uk-ees): fetch all historical releases, not just latest
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m9s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m42s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m9s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m42s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
Add get_all_release_ids() to paginate /publications/{slug}/releases and
iterate over every release in get_records(). Add latest_only config flag
(default false) to restore single-release behaviour for daily runs.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -31,6 +31,24 @@ def get_content_release_id(publication_slug: str) -> str:
|
|||||||
return resp.json()["id"]
|
return resp.json()["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_release_ids(publication_slug: str) -> list[str]:
|
||||||
|
"""Return all release IDs for a publication, newest first."""
|
||||||
|
url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases"
|
||||||
|
ids = []
|
||||||
|
page = 1
|
||||||
|
while True:
|
||||||
|
resp = requests.get(url, params={"page": page, "pageSize": 20}, timeout=TIMEOUT)
|
||||||
|
resp.raise_for_status()
|
||||||
|
data = resp.json()
|
||||||
|
for r in data.get("results", []):
|
||||||
|
ids.append(r["id"])
|
||||||
|
paging = data.get("paging", {})
|
||||||
|
if page >= paging.get("totalPages", 1):
|
||||||
|
break
|
||||||
|
page += 1
|
||||||
|
return ids
|
||||||
|
|
||||||
|
|
||||||
def download_release_zip(release_id: str) -> zipfile.ZipFile:
|
def download_release_zip(release_id: str) -> zipfile.ZipFile:
|
||||||
"""Download all data files for a release as a ZIP."""
|
"""Download all data files for a release as a ZIP."""
|
||||||
url = f"{CONTENT_API_BASE}/releases/{release_id}/files"
|
url = f"{CONTENT_API_BASE}/releases/{release_id}/files"
|
||||||
@@ -58,63 +76,72 @@ class EESDatasetStream(Stream):
|
|||||||
def get_records(self, context):
|
def get_records(self, context):
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
release_id = get_content_release_id(self._publication_slug)
|
latest_only = self.config.get("latest_only", False)
|
||||||
|
if latest_only:
|
||||||
|
release_ids = [get_content_release_id(self._publication_slug)]
|
||||||
|
else:
|
||||||
|
release_ids = get_all_release_ids(self._publication_slug)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Downloading release %s for %s",
|
"Found %d release(s) for %s", len(release_ids), self._publication_slug
|
||||||
release_id,
|
|
||||||
self._publication_slug,
|
|
||||||
)
|
)
|
||||||
zf = download_release_zip(release_id)
|
|
||||||
|
|
||||||
# Find the target file (substring match)
|
for release_id in release_ids:
|
||||||
all_files = zf.namelist()
|
self.logger.info(
|
||||||
target = None
|
"Downloading release %s for %s", release_id, self._publication_slug
|
||||||
for name in all_files:
|
|
||||||
if self._target_filename in name and name.endswith(".csv"):
|
|
||||||
target = name
|
|
||||||
break
|
|
||||||
|
|
||||||
if not target:
|
|
||||||
self.logger.error(
|
|
||||||
"File matching '%s' not found in ZIP. Available: %s",
|
|
||||||
self._target_filename,
|
|
||||||
[n for n in all_files if n.endswith(".csv")],
|
|
||||||
)
|
)
|
||||||
return
|
zf = download_release_zip(release_id)
|
||||||
|
|
||||||
self.logger.info("Reading %s from ZIP", target)
|
# Find the target file (substring match)
|
||||||
with zf.open(target) as f:
|
all_files = zf.namelist()
|
||||||
df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding)
|
target = None
|
||||||
|
for name in all_files:
|
||||||
|
if self._target_filename in name and name.endswith(".csv"):
|
||||||
|
target = name
|
||||||
|
break
|
||||||
|
|
||||||
# Strip BOM from first column name — handles both:
|
if not target:
|
||||||
# - UTF-8 BOM decoded as Unicode (\ufeff) when read with utf-8/utf-8-sig
|
self.logger.warning(
|
||||||
# - UTF-8 BOM bytes decoded as Latin-1 () when read with latin-1
|
"File matching '%s' not found in release %s. Available: %s",
|
||||||
cols = list(df.columns)
|
self._target_filename,
|
||||||
if cols:
|
release_id,
|
||||||
cols[0] = cols[0].lstrip("\ufeff").lstrip("")
|
[n for n in all_files if n.endswith(".csv")],
|
||||||
df.columns = cols
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# Filter to school-level data if the column exists
|
self.logger.info("Reading %s from ZIP", target)
|
||||||
if "geographic_level" in df.columns:
|
with zf.open(target) as f:
|
||||||
df = df[df["geographic_level"] == "School"]
|
df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding)
|
||||||
|
|
||||||
# Drop rows with no URN (LA/category aggregates that slip through the level filter)
|
# Strip BOM from first column name — handles both:
|
||||||
urn_col = self._urn_column
|
# - UTF-8 BOM decoded as Unicode (\ufeff) when read with utf-8/utf-8-sig
|
||||||
if urn_col in df.columns:
|
# - UTF-8 BOM bytes decoded as Latin-1 () when read with latin-1
|
||||||
df = df[df[urn_col].notna() & (df[urn_col] != "")]
|
cols = list(df.columns)
|
||||||
|
if cols:
|
||||||
|
cols[0] = cols[0].lstrip("\ufeff").lstrip("")
|
||||||
|
df.columns = cols
|
||||||
|
|
||||||
self.logger.info("Emitting %d school-level rows", len(df))
|
# Filter to school-level data if the column exists
|
||||||
|
if "geographic_level" in df.columns:
|
||||||
|
df = df[df["geographic_level"] == "School"]
|
||||||
|
|
||||||
for _, row in df.iterrows():
|
# Drop rows with no URN (LA/category aggregates that slip through the level filter)
|
||||||
record = row.to_dict()
|
urn_col = self._urn_column
|
||||||
# Apply subclass column renames (messy CSV names → clean Singer fields)
|
if urn_col in df.columns:
|
||||||
for old, new in self._column_renames.items():
|
df = df[df[urn_col].notna() & (df[urn_col] != "")]
|
||||||
if old in record:
|
|
||||||
record[new] = record.pop(old)
|
self.logger.info("Emitting %d school-level rows from release %s", len(df), release_id)
|
||||||
# Normalise URN column to 'school_urn' for consistency
|
|
||||||
if self._urn_column in record and self._urn_column != "school_urn":
|
for _, row in df.iterrows():
|
||||||
record["school_urn"] = record.pop(self._urn_column)
|
record = row.to_dict()
|
||||||
yield record
|
# Apply subclass column renames (messy CSV names → clean Singer fields)
|
||||||
|
for old, new in self._column_renames.items():
|
||||||
|
if old in record:
|
||||||
|
record[new] = record.pop(old)
|
||||||
|
# Normalise URN column to 'school_urn' for consistency
|
||||||
|
if self._urn_column in record and self._urn_column != "school_urn":
|
||||||
|
record["school_urn"] = record.pop(self._urn_column)
|
||||||
|
yield record
|
||||||
|
|
||||||
|
|
||||||
# ── KS2 Attainment (long format: one row per school × subject × breakdown) ──
|
# ── KS2 Attainment (long format: one row per school × subject × breakdown) ──
|
||||||
@@ -414,6 +441,12 @@ class TapUKEES(Tap):
|
|||||||
|
|
||||||
config_jsonschema = th.PropertiesList(
|
config_jsonschema = th.PropertiesList(
|
||||||
th.Property("base_url", th.StringType, description="EES API base URL"),
|
th.Property("base_url", th.StringType, description="EES API base URL"),
|
||||||
|
th.Property(
|
||||||
|
"latest_only",
|
||||||
|
th.BooleanType,
|
||||||
|
description="Only fetch the latest release per publication (default: False — fetches all historical releases)",
|
||||||
|
default=False,
|
||||||
|
),
|
||||||
).to_dict()
|
).to_dict()
|
||||||
|
|
||||||
def discover_streams(self):
|
def discover_streams(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user