feat(pipeline): add legacy KS4 backfill (2015/16–2018/19)
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 12s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 52s
Build and Push Docker Images / Trigger Portainer Update (push) Has been cancelled
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Has been cancelled
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 12s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 52s
Build and Push Docker Images / Trigger Portainer Update (push) Has been cancelled
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Has been cancelled
Mirrors the existing legacy KS2 pattern to fill the gap before EES hosted KS4 data. Four files changed: - tap-uk-ees: LegacyKS4Stream downloads each year's DfE Compare School Performance ZIP, extracts england_ks4final.csv, maps 416 legacy columns to Singer fields, strips % suffixes. Registered in discover_streams(). TapUKEES.config_jsonschema gains legacy_ks4_urls setting. - stg_legacy_ks4.sql: safe_numeric casts + NULL placeholders for columns not present in legacy format (ebacc_avg_score, gcse_grade_91_pct, prior_attainment_avg, sen_pct). - int_ks4_with_lineage.sql: adds all_ks4 CTE unioning stg_ees_ks4 and stg_legacy_ks4, matching the int_ks2_with_lineage pattern. - _stg_sources.yml + meltano.yml: source declaration and setting definition for legacy_ks4. URLs configured per-year once provided. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -712,6 +712,138 @@ class LegacyKS2Stream(Stream):
|
||||
yield record
|
||||
|
||||
|
||||
# ── Legacy KS4 (pre-EES wide format from DfE performance tables) ──────────────
|
||||
# The DfE "Compare School Performance" ZIPs include england_ks4final.csv in a
|
||||
# wide format (one row per school, ~416 columns, uppercase abbreviated names).
|
||||
# EES only hosts 2 years of KS4 data; this stream backfills 2015-16 to 2018-19.
|
||||
# Column mapping: old DfE CSV column → Singer field name (matches stg output).
|
||||
|
||||
_LEGACY_KS4_COLUMN_MAP = {
|
||||
"URN": "urn",
|
||||
"TPUP": "total_pupils",
|
||||
# Attainment 8
|
||||
"ATT8SCR": "attainment_8_score",
|
||||
# Progress 8
|
||||
"P8MEA": "progress_8_score",
|
||||
"P8CILOW": "progress_8_lower_ci",
|
||||
"P8CIUPP": "progress_8_upper_ci",
|
||||
"P8MEAENG": "progress_8_english",
|
||||
"P8MEAMAT": "progress_8_maths",
|
||||
"P8MEAEBAC": "progress_8_ebacc",
|
||||
"P8MEAOPEN": "progress_8_open",
|
||||
# English & Maths pass rates (% suffix stripped at extract time)
|
||||
"PTL2BASICS_95": "english_maths_strong_pass_pct",
|
||||
"PTL2BASICS_94": "english_maths_standard_pass_pct",
|
||||
# EBacc
|
||||
"PTEBACC_E_PTQ_EE": "ebacc_entry_pct",
|
||||
"PTEBACC_95": "ebacc_strong_pass_pct",
|
||||
"PTEBACC_94": "ebacc_standard_pass_pct",
|
||||
# Context
|
||||
"PSENSE4": "sen_ehcp_pct",
|
||||
"PSENAPK4": "sen_support_pct",
|
||||
}
|
||||
|
||||
|
||||
class LegacyKS4Stream(Stream):
|
||||
"""Stream for pre-EES KS4 data from DfE 'Compare School Performance' ZIPs.
|
||||
|
||||
Downloads ZIPs from URLs configured in legacy_ks4_urls (a mapping of
|
||||
6-digit year code → download URL), extracts england_ks4final.csv from each,
|
||||
maps old DfE column names to match stg_ees_ks4 output schema, and emits
|
||||
one record per school per year. The % suffix present on percentage columns
|
||||
(e.g. "39.60%") is stripped here so safe_numeric in dbt can cast cleanly.
|
||||
"""
|
||||
|
||||
name = "legacy_ks4"
|
||||
primary_keys = ["urn", "year"]
|
||||
replication_key = None
|
||||
|
||||
schema = th.PropertiesList(
|
||||
th.Property("urn", th.StringType, required=True),
|
||||
th.Property("year", th.StringType, required=True),
|
||||
th.Property("total_pupils", th.StringType),
|
||||
th.Property("attainment_8_score", th.StringType),
|
||||
th.Property("progress_8_score", th.StringType),
|
||||
th.Property("progress_8_lower_ci", th.StringType),
|
||||
th.Property("progress_8_upper_ci", th.StringType),
|
||||
th.Property("progress_8_english", th.StringType),
|
||||
th.Property("progress_8_maths", th.StringType),
|
||||
th.Property("progress_8_ebacc", th.StringType),
|
||||
th.Property("progress_8_open", th.StringType),
|
||||
th.Property("english_maths_strong_pass_pct", th.StringType),
|
||||
th.Property("english_maths_standard_pass_pct", th.StringType),
|
||||
th.Property("ebacc_entry_pct", th.StringType),
|
||||
th.Property("ebacc_strong_pass_pct", th.StringType),
|
||||
th.Property("ebacc_standard_pass_pct", th.StringType),
|
||||
th.Property("sen_ehcp_pct", th.StringType),
|
||||
th.Property("sen_support_pct", th.StringType),
|
||||
).to_dict()
|
||||
|
||||
def get_records(self, context):
|
||||
import pandas as pd
|
||||
|
||||
url_map = self.config.get("legacy_ks4_urls", {})
|
||||
if not url_map:
|
||||
self.logger.warning("legacy_ks4_urls not configured, skipping legacy KS4")
|
||||
return
|
||||
|
||||
self.logger.info("Loading legacy KS4 for %d year(s)", len(url_map))
|
||||
|
||||
for year_code, url in url_map.items():
|
||||
self.logger.info("Downloading %s for %s", url, year_code)
|
||||
try:
|
||||
resp = requests.get(url, timeout=120)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
self.logger.warning("Failed to download %s: %s", url, e)
|
||||
continue
|
||||
|
||||
try:
|
||||
zf = zipfile.ZipFile(io.BytesIO(resp.content))
|
||||
except zipfile.BadZipFile as e:
|
||||
self.logger.warning("Not a ZIP for %s: %s", year_code, e)
|
||||
continue
|
||||
|
||||
# Find england_ks4final.csv inside the ZIP
|
||||
target = next(
|
||||
(n for n in zf.namelist() if "ks4final" in n.lower() and n.endswith(".csv")),
|
||||
None,
|
||||
)
|
||||
if not target:
|
||||
self.logger.warning("england_ks4final.csv not found in ZIP for %s", year_code)
|
||||
continue
|
||||
|
||||
with zf.open(target) as f:
|
||||
df = pd.read_csv(
|
||||
f,
|
||||
dtype=str,
|
||||
keep_default_na=False,
|
||||
encoding="latin-1",
|
||||
)
|
||||
|
||||
# Strip BOM from first column name
|
||||
cols = list(df.columns)
|
||||
if cols:
|
||||
cols[0] = cols[0].lstrip("\ufeff").lstrip("")
|
||||
df.columns = cols
|
||||
|
||||
# Filter to school-level rows: URN must be a plain integer
|
||||
if "URN" in df.columns:
|
||||
df = df[df["URN"].str.match(r"^\d+$", na=False)]
|
||||
|
||||
self.logger.info("Emitting %d schools for %s", len(df), year_code)
|
||||
|
||||
for _, row in df.iterrows():
|
||||
record = {"year": year_code}
|
||||
for old_col, new_col in _LEGACY_KS4_COLUMN_MAP.items():
|
||||
val = row.get(old_col, "")
|
||||
# Strip % suffix — legacy DfE CSVs use "39.60%" not "39.60"
|
||||
if isinstance(val, str) and val.endswith("%"):
|
||||
val = val[:-1]
|
||||
record[new_col] = val
|
||||
yield record
|
||||
|
||||
|
||||
class TapUKEES(Tap):
|
||||
"""Singer tap for UK Explore Education Statistics."""
|
||||
|
||||
@@ -730,6 +862,11 @@ class TapUKEES(Tap):
|
||||
th.ObjectType(),
|
||||
description="Mapping of 6-digit year code to download URL for legacy KS2 CSVs (e.g. {\"201819\": \"https://...\"})",
|
||||
),
|
||||
th.Property(
|
||||
"legacy_ks4_urls",
|
||||
th.ObjectType(),
|
||||
description="Mapping of 6-digit year code to download URL for legacy KS4 ZIPs (e.g. {\"201819\": \"https://...\"})",
|
||||
),
|
||||
).to_dict()
|
||||
|
||||
def discover_streams(self):
|
||||
@@ -741,6 +878,7 @@ class TapUKEES(Tap):
|
||||
EESCensusStream(self),
|
||||
EESAdmissionsStream(self),
|
||||
LegacyKS2Stream(self),
|
||||
LegacyKS4Stream(self),
|
||||
EESKs2NationalStream(self),
|
||||
]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user