feat(legacy-ks2): add stream for pre-COVID KS2 data (2015-2019)
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 46s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m17s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 2m26s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s

- Add LegacyKS2Stream to tap-uk-ees: downloads old DfE england_ks2final.csv
  files from a configurable base URL, maps 318-column wide format to the
  same schema as stg_ees_ks2 output
- Add stg_legacy_ks2.sql staging model with safe_numeric casts
- Add legacy_ks2 source to _stg_sources.yml
- Update int_ks2_with_lineage.sql to union EES + legacy data
- Configurable via legacy_ks2_base_url and legacy_ks2_years tap settings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Tudor Sitaru
2026-03-31 14:36:41 +01:00
parent fc011c6547
commit 6d4962639c
4 changed files with 235 additions and 4 deletions

View File

@@ -452,6 +452,159 @@ class EESAdmissionsStream(EESDatasetStream):
# on EES. Only national and LA-level files are published.
# ── Legacy KS2 (pre-COVID wide format from DfE performance tables) ────────────
# The DfE "Compare School Performance" site published school-level KS2 CSVs
# in a wide format (one row per school, ~300 columns). EES only has school-level
# data from 2022-23 onwards. This stream loads the 2015-16 to 2018-19 CSVs
# from a configurable base URL and maps the old column names to match the
# staging output schema so dbt can union them with the EES data.
# Column mapping: old DfE CSV column → Singer field name (matches stg output)
_LEGACY_KS2_COLUMN_MAP = {
"URN": "urn",
"TOTPUPS": "total_pupils",
"TELIG": "eligible_pupils",
"PTRWM_EXP": "rwm_expected_pct",
"PTRWM_HIGH": "rwm_high_pct",
"PTREAD_EXP": "reading_expected_pct",
"PTREAD_HIGH": "reading_high_pct",
"READ_AVERAGE": "reading_avg_score",
"READPROG": "reading_progress",
"PTWRITTA_EXP": "writing_expected_pct",
"PTWRITTA_HIGH": "writing_high_pct",
"WRITPROG": "writing_progress",
"PTMAT_EXP": "maths_expected_pct",
"PTMAT_HIGH": "maths_high_pct",
"MAT_AVERAGE": "maths_avg_score",
"MATPROG": "maths_progress",
"PTGPS_EXP": "gps_expected_pct",
"PTGPS_HIGH": "gps_high_pct",
"GPS_AVERAGE": "gps_avg_score",
# Absence / unable to access
"PTREAD_AT": "reading_absence_pct",
"PTMAT_AT": "maths_absence_pct",
"PTGPS_AT": "gps_absence_pct",
# Gender breakdown for RWM
"PTRWM_EXP_B": "rwm_expected_boys_pct",
"PTRWM_HIGH_B": "rwm_high_boys_pct",
"PTRWM_EXP_G": "rwm_expected_girls_pct",
"PTRWM_HIGH_G": "rwm_high_girls_pct",
# Disadvantaged breakdown for RWM
"PTRWM_EXP_FSM6CLA1A": "rwm_expected_disadvantaged_pct",
"PTRWM_EXP_NotFSM6CLA1A": "rwm_expected_non_disadvantaged_pct",
"DIFFN_RWM_EXP": "disadvantaged_gap",
# Context
"PTFSM6CLA1A": "disadvantaged_pct",
"PTEALGRP2": "eal_pct",
"PSENELK": "sen_support_pct",
"PSENELE": "sen_ehcp_pct",
"PTMOBN": "stability_pct",
}
# Default years to load (new-curriculum KS2 only — 2014-15 used Level 4+ metrics)
_LEGACY_KS2_DEFAULT_YEARS = ["201516", "201617", "201718", "201819"]
class LegacyKS2Stream(Stream):
"""Stream for pre-COVID KS2 data from DfE performance tables CSVs.
Downloads england_ks2final.csv for each configured year from a base URL,
maps old DfE column names to match the stg_ees_ks2 output schema, and
emits one record per school per year.
"""
name = "legacy_ks2"
primary_keys = ["urn", "year"]
replication_key = None
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("year", th.StringType, required=True),
th.Property("total_pupils", th.StringType),
th.Property("eligible_pupils", th.StringType),
th.Property("rwm_expected_pct", th.StringType),
th.Property("rwm_high_pct", th.StringType),
th.Property("reading_expected_pct", th.StringType),
th.Property("reading_high_pct", th.StringType),
th.Property("reading_avg_score", th.StringType),
th.Property("reading_progress", th.StringType),
th.Property("writing_expected_pct", th.StringType),
th.Property("writing_high_pct", th.StringType),
th.Property("writing_progress", th.StringType),
th.Property("maths_expected_pct", th.StringType),
th.Property("maths_high_pct", th.StringType),
th.Property("maths_avg_score", th.StringType),
th.Property("maths_progress", th.StringType),
th.Property("gps_expected_pct", th.StringType),
th.Property("gps_high_pct", th.StringType),
th.Property("gps_avg_score", th.StringType),
th.Property("reading_absence_pct", th.StringType),
th.Property("maths_absence_pct", th.StringType),
th.Property("gps_absence_pct", th.StringType),
th.Property("rwm_expected_boys_pct", th.StringType),
th.Property("rwm_high_boys_pct", th.StringType),
th.Property("rwm_expected_girls_pct", th.StringType),
th.Property("rwm_high_girls_pct", th.StringType),
th.Property("rwm_expected_disadvantaged_pct", th.StringType),
th.Property("rwm_expected_non_disadvantaged_pct", th.StringType),
th.Property("disadvantaged_gap", th.StringType),
th.Property("disadvantaged_pct", th.StringType),
th.Property("eal_pct", th.StringType),
th.Property("sen_support_pct", th.StringType),
th.Property("sen_ehcp_pct", th.StringType),
th.Property("stability_pct", th.StringType),
).to_dict()
def get_records(self, context):
import pandas as pd
base_url = self.config.get("legacy_ks2_base_url", "")
if not base_url:
self.logger.warning("legacy_ks2_base_url not configured, skipping legacy KS2")
return
years = self.config.get("legacy_ks2_years", _LEGACY_KS2_DEFAULT_YEARS)
self.logger.info("Loading legacy KS2 for years: %s from %s", years, base_url)
for year_code in years:
# Convert 6-digit code to folder name: "201819" → "2018-2019"
folder = f"20{year_code[2:4]}-20{year_code[4:6]}"
url = f"{base_url}/{folder}/england_ks2final.csv"
self.logger.info("Downloading %s", url)
try:
resp = requests.get(url, timeout=120)
resp.raise_for_status()
except Exception as e:
self.logger.warning("Failed to download %s: %s", url, e)
continue
df = pd.read_csv(
io.BytesIO(resp.content),
dtype=str,
keep_default_na=False,
encoding="latin-1",
)
# Strip BOM
cols = list(df.columns)
if cols:
cols[0] = cols[0].lstrip("\ufeff").lstrip("")
df.columns = cols
# Filter to valid URNs
if "URN" in df.columns:
df = df[df["URN"].str.match(r"^\d+$", na=False)]
self.logger.info("Emitting %d schools for %s", len(df), year_code)
for _, row in df.iterrows():
record = {"year": year_code}
for old_col, new_col in _LEGACY_KS2_COLUMN_MAP.items():
record[new_col] = row.get(old_col, "")
yield record
class TapUKEES(Tap):
"""Singer tap for UK Explore Education Statistics."""
@@ -465,6 +618,17 @@ class TapUKEES(Tap):
description="Only fetch the latest release per publication (default: False — fetches all historical releases)",
default=False,
),
th.Property(
"legacy_ks2_base_url",
th.StringType,
description="Base URL for legacy KS2 CSVs (e.g. https://example.com/data). Files expected at {base_url}/{year}/england_ks2final.csv",
),
th.Property(
"legacy_ks2_years",
th.ArrayType(th.StringType),
description="Legacy KS2 year codes to load (default: 201516-201819)",
default=_LEGACY_KS2_DEFAULT_YEARS,
),
).to_dict()
def discover_streams(self):
@@ -475,6 +639,7 @@ class TapUKEES(Tap):
EESKS4InfoStream(self),
EESCensusStream(self),
EESAdmissionsStream(self),
LegacyKS2Stream(self),
]