From 6d4962639cca36563fbc96fc3d1a99a541c6117b Mon Sep 17 00:00:00 2001 From: Tudor Sitaru Date: Tue, 31 Mar 2026 14:36:41 +0100 Subject: [PATCH] feat(legacy-ks2): add stream for pre-COVID KS2 data (2015-2019) - Add LegacyKS2Stream to tap-uk-ees: downloads old DfE england_ks2final.csv files from a configurable base URL, maps 318-column wide format to the same schema as stg_ees_ks2 output - Add stg_legacy_ks2.sql staging model with safe_numeric casts - Add legacy_ks2 source to _stg_sources.yml - Update int_ks2_with_lineage.sql to union EES + legacy data - Configurable via legacy_ks2_base_url and legacy_ks2_years tap settings Co-Authored-By: Claude Opus 4.6 --- .../extractors/tap-uk-ees/tap_uk_ees/tap.py | 165 ++++++++++++++++++ .../intermediate/int_ks2_with_lineage.sql | 15 +- .../transform/models/staging/_stg_sources.yml | 3 + .../models/staging/stg_legacy_ks2.sql | 56 ++++++ 4 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 pipeline/transform/models/staging/stg_legacy_ks2.sql diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index 2097d17..16517d1 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -452,6 +452,159 @@ class EESAdmissionsStream(EESDatasetStream): # on EES. Only national and LA-level files are published. +# ── Legacy KS2 (pre-COVID wide format from DfE performance tables) ──────────── +# The DfE "Compare School Performance" site published school-level KS2 CSVs +# in a wide format (one row per school, ~300 columns). EES only has school-level +# data from 2022-23 onwards. This stream loads the 2015-16 to 2018-19 CSVs +# from a configurable base URL and maps the old column names to match the +# staging output schema so dbt can union them with the EES data. + +# Column mapping: old DfE CSV column → Singer field name (matches stg output) +_LEGACY_KS2_COLUMN_MAP = { + "URN": "urn", + "TOTPUPS": "total_pupils", + "TELIG": "eligible_pupils", + "PTRWM_EXP": "rwm_expected_pct", + "PTRWM_HIGH": "rwm_high_pct", + "PTREAD_EXP": "reading_expected_pct", + "PTREAD_HIGH": "reading_high_pct", + "READ_AVERAGE": "reading_avg_score", + "READPROG": "reading_progress", + "PTWRITTA_EXP": "writing_expected_pct", + "PTWRITTA_HIGH": "writing_high_pct", + "WRITPROG": "writing_progress", + "PTMAT_EXP": "maths_expected_pct", + "PTMAT_HIGH": "maths_high_pct", + "MAT_AVERAGE": "maths_avg_score", + "MATPROG": "maths_progress", + "PTGPS_EXP": "gps_expected_pct", + "PTGPS_HIGH": "gps_high_pct", + "GPS_AVERAGE": "gps_avg_score", + # Absence / unable to access + "PTREAD_AT": "reading_absence_pct", + "PTMAT_AT": "maths_absence_pct", + "PTGPS_AT": "gps_absence_pct", + # Gender breakdown for RWM + "PTRWM_EXP_B": "rwm_expected_boys_pct", + "PTRWM_HIGH_B": "rwm_high_boys_pct", + "PTRWM_EXP_G": "rwm_expected_girls_pct", + "PTRWM_HIGH_G": "rwm_high_girls_pct", + # Disadvantaged breakdown for RWM + "PTRWM_EXP_FSM6CLA1A": "rwm_expected_disadvantaged_pct", + "PTRWM_EXP_NotFSM6CLA1A": "rwm_expected_non_disadvantaged_pct", + "DIFFN_RWM_EXP": "disadvantaged_gap", + # Context + "PTFSM6CLA1A": "disadvantaged_pct", + "PTEALGRP2": "eal_pct", + "PSENELK": "sen_support_pct", + "PSENELE": "sen_ehcp_pct", + "PTMOBN": "stability_pct", +} + +# Default years to load (new-curriculum KS2 only — 2014-15 used Level 4+ metrics) +_LEGACY_KS2_DEFAULT_YEARS = ["201516", "201617", "201718", "201819"] + + +class LegacyKS2Stream(Stream): + """Stream for pre-COVID KS2 data from DfE performance tables CSVs. + + Downloads england_ks2final.csv for each configured year from a base URL, + maps old DfE column names to match the stg_ees_ks2 output schema, and + emits one record per school per year. + """ + + name = "legacy_ks2" + primary_keys = ["urn", "year"] + replication_key = None + + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("year", th.StringType, required=True), + th.Property("total_pupils", th.StringType), + th.Property("eligible_pupils", th.StringType), + th.Property("rwm_expected_pct", th.StringType), + th.Property("rwm_high_pct", th.StringType), + th.Property("reading_expected_pct", th.StringType), + th.Property("reading_high_pct", th.StringType), + th.Property("reading_avg_score", th.StringType), + th.Property("reading_progress", th.StringType), + th.Property("writing_expected_pct", th.StringType), + th.Property("writing_high_pct", th.StringType), + th.Property("writing_progress", th.StringType), + th.Property("maths_expected_pct", th.StringType), + th.Property("maths_high_pct", th.StringType), + th.Property("maths_avg_score", th.StringType), + th.Property("maths_progress", th.StringType), + th.Property("gps_expected_pct", th.StringType), + th.Property("gps_high_pct", th.StringType), + th.Property("gps_avg_score", th.StringType), + th.Property("reading_absence_pct", th.StringType), + th.Property("maths_absence_pct", th.StringType), + th.Property("gps_absence_pct", th.StringType), + th.Property("rwm_expected_boys_pct", th.StringType), + th.Property("rwm_high_boys_pct", th.StringType), + th.Property("rwm_expected_girls_pct", th.StringType), + th.Property("rwm_high_girls_pct", th.StringType), + th.Property("rwm_expected_disadvantaged_pct", th.StringType), + th.Property("rwm_expected_non_disadvantaged_pct", th.StringType), + th.Property("disadvantaged_gap", th.StringType), + th.Property("disadvantaged_pct", th.StringType), + th.Property("eal_pct", th.StringType), + th.Property("sen_support_pct", th.StringType), + th.Property("sen_ehcp_pct", th.StringType), + th.Property("stability_pct", th.StringType), + ).to_dict() + + def get_records(self, context): + import pandas as pd + + base_url = self.config.get("legacy_ks2_base_url", "") + if not base_url: + self.logger.warning("legacy_ks2_base_url not configured, skipping legacy KS2") + return + + years = self.config.get("legacy_ks2_years", _LEGACY_KS2_DEFAULT_YEARS) + self.logger.info("Loading legacy KS2 for years: %s from %s", years, base_url) + + for year_code in years: + # Convert 6-digit code to folder name: "201819" → "2018-2019" + folder = f"20{year_code[2:4]}-20{year_code[4:6]}" + url = f"{base_url}/{folder}/england_ks2final.csv" + + self.logger.info("Downloading %s", url) + try: + resp = requests.get(url, timeout=120) + resp.raise_for_status() + except Exception as e: + self.logger.warning("Failed to download %s: %s", url, e) + continue + + df = pd.read_csv( + io.BytesIO(resp.content), + dtype=str, + keep_default_na=False, + encoding="latin-1", + ) + + # Strip BOM + cols = list(df.columns) + if cols: + cols[0] = cols[0].lstrip("\ufeff").lstrip("") + df.columns = cols + + # Filter to valid URNs + if "URN" in df.columns: + df = df[df["URN"].str.match(r"^\d+$", na=False)] + + self.logger.info("Emitting %d schools for %s", len(df), year_code) + + for _, row in df.iterrows(): + record = {"year": year_code} + for old_col, new_col in _LEGACY_KS2_COLUMN_MAP.items(): + record[new_col] = row.get(old_col, "") + yield record + + class TapUKEES(Tap): """Singer tap for UK Explore Education Statistics.""" @@ -465,6 +618,17 @@ class TapUKEES(Tap): description="Only fetch the latest release per publication (default: False — fetches all historical releases)", default=False, ), + th.Property( + "legacy_ks2_base_url", + th.StringType, + description="Base URL for legacy KS2 CSVs (e.g. https://example.com/data). Files expected at {base_url}/{year}/england_ks2final.csv", + ), + th.Property( + "legacy_ks2_years", + th.ArrayType(th.StringType), + description="Legacy KS2 year codes to load (default: 201516-201819)", + default=_LEGACY_KS2_DEFAULT_YEARS, + ), ).to_dict() def discover_streams(self): @@ -475,6 +639,7 @@ class TapUKEES(Tap): EESKS4InfoStream(self), EESCensusStream(self), EESAdmissionsStream(self), + LegacyKS2Stream(self), ] diff --git a/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql index 68dc193..0fa7a72 100644 --- a/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql +++ b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql @@ -1,7 +1,14 @@ -- Intermediate model: KS2 data chained across academy conversions -- Maps predecessor URN data to the current active URN +-- Unions EES (2022+) and legacy (2015-2019) school-level data -with current_ks2 as ( +with all_ks2 as ( + select * from {{ ref('stg_ees_ks2') }} + union all + select * from {{ ref('stg_legacy_ks2') }} +), + +current_ks2 as ( select urn as current_urn, urn as source_urn, @@ -15,7 +22,7 @@ with current_ks2 as ( rwm_expected_boys_pct, rwm_high_boys_pct, rwm_expected_girls_pct, rwm_high_girls_pct, rwm_expected_disadvantaged_pct, rwm_expected_non_disadvantaged_pct, disadvantaged_gap, disadvantaged_pct, eal_pct, sen_support_pct, sen_ehcp_pct, stability_pct - from {{ ref('stg_ees_ks2') }} + from all_ks2 ), predecessor_ks2 as ( @@ -33,11 +40,11 @@ predecessor_ks2 as ( ks2.rwm_expected_boys_pct, ks2.rwm_high_boys_pct, ks2.rwm_expected_girls_pct, ks2.rwm_high_girls_pct, ks2.rwm_expected_disadvantaged_pct, ks2.rwm_expected_non_disadvantaged_pct, ks2.disadvantaged_gap, ks2.disadvantaged_pct, ks2.eal_pct, ks2.sen_support_pct, ks2.sen_ehcp_pct, ks2.stability_pct - from {{ ref('stg_ees_ks2') }} ks2 + from all_ks2 ks2 inner join {{ ref('int_school_lineage') }} lin on ks2.urn = lin.predecessor_urn where not exists ( - select 1 from {{ ref('stg_ees_ks2') }} curr + select 1 from all_ks2 curr where curr.urn = lin.current_urn and curr.year = ks2.year ) diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml index f4b28d4..f3976fd 100644 --- a/pipeline/transform/models/staging/_stg_sources.yml +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -30,6 +30,9 @@ sources: - name: ees_ks2_info description: KS2 school information (wide format — context/demographics per school) + - name: legacy_ks2 + description: Pre-COVID KS2 school-level data (2015-16 to 2018-19) from DfE performance tables + - name: ees_ks4_performance description: KS4 performance tables (long format — one row per school × breakdown × sex) diff --git a/pipeline/transform/models/staging/stg_legacy_ks2.sql b/pipeline/transform/models/staging/stg_legacy_ks2.sql new file mode 100644 index 0000000..c719f20 --- /dev/null +++ b/pipeline/transform/models/staging/stg_legacy_ks2.sql @@ -0,0 +1,56 @@ +{{ config(materialized='table') }} + +-- Staging model: Legacy KS2 data from pre-COVID DfE performance tables +-- The tap already maps old column names to match stg_ees_ks2 output; +-- this model just applies safe_numeric casts. + +select + cast(trim(urn) as integer) as urn, + cast(trim(year) as integer) as year, + + {{ safe_numeric('total_pupils') }}::integer as total_pupils, + {{ safe_numeric('eligible_pupils') }}::integer as eligible_pupils, + + {{ safe_numeric('rwm_expected_pct') }} as rwm_expected_pct, + {{ safe_numeric('rwm_high_pct') }} as rwm_high_pct, + {{ safe_numeric('reading_expected_pct') }} as reading_expected_pct, + {{ safe_numeric('reading_high_pct') }} as reading_high_pct, + {{ safe_numeric('reading_avg_score') }} as reading_avg_score, + {{ safe_numeric('reading_progress') }} as reading_progress, + {{ safe_numeric('writing_expected_pct') }} as writing_expected_pct, + {{ safe_numeric('writing_high_pct') }} as writing_high_pct, + {{ safe_numeric('writing_progress') }} as writing_progress, + {{ safe_numeric('maths_expected_pct') }} as maths_expected_pct, + {{ safe_numeric('maths_high_pct') }} as maths_high_pct, + {{ safe_numeric('maths_avg_score') }} as maths_avg_score, + {{ safe_numeric('maths_progress') }} as maths_progress, + {{ safe_numeric('gps_expected_pct') }} as gps_expected_pct, + {{ safe_numeric('gps_high_pct') }} as gps_high_pct, + {{ safe_numeric('gps_avg_score') }} as gps_avg_score, + null::numeric as science_expected_pct, + + {{ safe_numeric('reading_absence_pct') }} as reading_absence_pct, + null::numeric as writing_absence_pct, + {{ safe_numeric('maths_absence_pct') }} as maths_absence_pct, + {{ safe_numeric('gps_absence_pct') }} as gps_absence_pct, + null::numeric as science_absence_pct, + + {{ safe_numeric('rwm_expected_boys_pct') }} as rwm_expected_boys_pct, + {{ safe_numeric('rwm_high_boys_pct') }} as rwm_high_boys_pct, + {{ safe_numeric('rwm_expected_girls_pct') }} as rwm_expected_girls_pct, + {{ safe_numeric('rwm_high_girls_pct') }} as rwm_high_girls_pct, + + {{ safe_numeric('rwm_expected_disadvantaged_pct') }} as rwm_expected_disadvantaged_pct, + {{ safe_numeric('rwm_expected_non_disadvantaged_pct') }} as rwm_expected_non_disadvantaged_pct, + {{ safe_numeric('rwm_expected_disadvantaged_pct') }} + - {{ safe_numeric('rwm_expected_non_disadvantaged_pct') }} as disadvantaged_gap, + + {{ safe_numeric('disadvantaged_pct') }} as disadvantaged_pct, + {{ safe_numeric('eal_pct') }} as eal_pct, + {{ safe_numeric('sen_support_pct') }} as sen_support_pct, + {{ safe_numeric('sen_ehcp_pct') }} as sen_ehcp_pct, + {{ safe_numeric('stability_pct') }} as stability_pct + +from {{ source('raw', 'legacy_ks2') }} +where urn is not null + and urn ~ '^[0-9]+$'