From 7e6ded29e2f99f6ccd766627559a6e873a779a1a Mon Sep 17 00:00:00 2001 From: Tudor Sitaru Date: Thu, 16 Apr 2026 10:37:24 +0100 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20add=20legacy=20KS4=20backfill?= =?UTF-8?q?=20(2015/16=E2=80=932018/19)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the existing legacy KS2 pattern to fill the gap before EES hosted KS4 data. Four files changed: - tap-uk-ees: LegacyKS4Stream downloads each year's DfE Compare School Performance ZIP, extracts england_ks4final.csv, maps 416 legacy columns to Singer fields, strips % suffixes. Registered in discover_streams(). TapUKEES.config_jsonschema gains legacy_ks4_urls setting. - stg_legacy_ks4.sql: safe_numeric casts + NULL placeholders for columns not present in legacy format (ebacc_avg_score, gcse_grade_91_pct, prior_attainment_avg, sen_pct). - int_ks4_with_lineage.sql: adds all_ks4 CTE unioning stg_ees_ks4 and stg_legacy_ks4, matching the int_ks2_with_lineage pattern. - _stg_sources.yml + meltano.yml: source declaration and setting definition for legacy_ks4. URLs configured per-year once provided. Co-Authored-By: Claude Sonnet 4.6 --- pipeline/meltano.yml | 3 + .../extractors/tap-uk-ees/tap_uk_ees/tap.py | 138 ++++++++++++++++++ .../intermediate/int_ks4_with_lineage.sql | 19 ++- .../transform/models/staging/_stg_sources.yml | 3 + .../models/staging/stg_legacy_ks4.sql | 49 +++++++ 5 files changed, 206 insertions(+), 6 deletions(-) create mode 100644 pipeline/transform/models/staging/stg_legacy_ks4.sql diff --git a/pipeline/meltano.yml b/pipeline/meltano.yml index 324a596..8b6425e 100644 --- a/pipeline/meltano.yml +++ b/pipeline/meltano.yml @@ -26,6 +26,9 @@ plugins: - name: legacy_ks2_urls kind: object description: "Year code → URL mapping for legacy KS2 CSVs" + - name: legacy_ks4_urls + kind: object + description: "Year code → URL mapping for legacy KS4 ZIPs (england_ks4final.csv inside)" config: legacy_ks2_urls: "201516": "http://10.0.1.224:8081/filebrowser/api/public/dl/R9jjXFWa?inline=true" diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index b7557e4..4cddc9a 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -712,6 +712,138 @@ class LegacyKS2Stream(Stream): yield record +# ── Legacy KS4 (pre-EES wide format from DfE performance tables) ────────────── +# The DfE "Compare School Performance" ZIPs include england_ks4final.csv in a +# wide format (one row per school, ~416 columns, uppercase abbreviated names). +# EES only hosts 2 years of KS4 data; this stream backfills 2015-16 to 2018-19. +# Column mapping: old DfE CSV column → Singer field name (matches stg output). + +_LEGACY_KS4_COLUMN_MAP = { + "URN": "urn", + "TPUP": "total_pupils", + # Attainment 8 + "ATT8SCR": "attainment_8_score", + # Progress 8 + "P8MEA": "progress_8_score", + "P8CILOW": "progress_8_lower_ci", + "P8CIUPP": "progress_8_upper_ci", + "P8MEAENG": "progress_8_english", + "P8MEAMAT": "progress_8_maths", + "P8MEAEBAC": "progress_8_ebacc", + "P8MEAOPEN": "progress_8_open", + # English & Maths pass rates (% suffix stripped at extract time) + "PTL2BASICS_95": "english_maths_strong_pass_pct", + "PTL2BASICS_94": "english_maths_standard_pass_pct", + # EBacc + "PTEBACC_E_PTQ_EE": "ebacc_entry_pct", + "PTEBACC_95": "ebacc_strong_pass_pct", + "PTEBACC_94": "ebacc_standard_pass_pct", + # Context + "PSENSE4": "sen_ehcp_pct", + "PSENAPK4": "sen_support_pct", +} + + +class LegacyKS4Stream(Stream): + """Stream for pre-EES KS4 data from DfE 'Compare School Performance' ZIPs. + + Downloads ZIPs from URLs configured in legacy_ks4_urls (a mapping of + 6-digit year code → download URL), extracts england_ks4final.csv from each, + maps old DfE column names to match stg_ees_ks4 output schema, and emits + one record per school per year. The % suffix present on percentage columns + (e.g. "39.60%") is stripped here so safe_numeric in dbt can cast cleanly. + """ + + name = "legacy_ks4" + primary_keys = ["urn", "year"] + replication_key = None + + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("year", th.StringType, required=True), + th.Property("total_pupils", th.StringType), + th.Property("attainment_8_score", th.StringType), + th.Property("progress_8_score", th.StringType), + th.Property("progress_8_lower_ci", th.StringType), + th.Property("progress_8_upper_ci", th.StringType), + th.Property("progress_8_english", th.StringType), + th.Property("progress_8_maths", th.StringType), + th.Property("progress_8_ebacc", th.StringType), + th.Property("progress_8_open", th.StringType), + th.Property("english_maths_strong_pass_pct", th.StringType), + th.Property("english_maths_standard_pass_pct", th.StringType), + th.Property("ebacc_entry_pct", th.StringType), + th.Property("ebacc_strong_pass_pct", th.StringType), + th.Property("ebacc_standard_pass_pct", th.StringType), + th.Property("sen_ehcp_pct", th.StringType), + th.Property("sen_support_pct", th.StringType), + ).to_dict() + + def get_records(self, context): + import pandas as pd + + url_map = self.config.get("legacy_ks4_urls", {}) + if not url_map: + self.logger.warning("legacy_ks4_urls not configured, skipping legacy KS4") + return + + self.logger.info("Loading legacy KS4 for %d year(s)", len(url_map)) + + for year_code, url in url_map.items(): + self.logger.info("Downloading %s for %s", url, year_code) + try: + resp = requests.get(url, timeout=120) + resp.raise_for_status() + except Exception as e: + self.logger.warning("Failed to download %s: %s", url, e) + continue + + try: + zf = zipfile.ZipFile(io.BytesIO(resp.content)) + except zipfile.BadZipFile as e: + self.logger.warning("Not a ZIP for %s: %s", year_code, e) + continue + + # Find england_ks4final.csv inside the ZIP + target = next( + (n for n in zf.namelist() if "ks4final" in n.lower() and n.endswith(".csv")), + None, + ) + if not target: + self.logger.warning("england_ks4final.csv not found in ZIP for %s", year_code) + continue + + with zf.open(target) as f: + df = pd.read_csv( + f, + dtype=str, + keep_default_na=False, + encoding="latin-1", + ) + + # Strip BOM from first column name + cols = list(df.columns) + if cols: + cols[0] = cols[0].lstrip("\ufeff").lstrip("") + df.columns = cols + + # Filter to school-level rows: URN must be a plain integer + if "URN" in df.columns: + df = df[df["URN"].str.match(r"^\d+$", na=False)] + + self.logger.info("Emitting %d schools for %s", len(df), year_code) + + for _, row in df.iterrows(): + record = {"year": year_code} + for old_col, new_col in _LEGACY_KS4_COLUMN_MAP.items(): + val = row.get(old_col, "") + # Strip % suffix — legacy DfE CSVs use "39.60%" not "39.60" + if isinstance(val, str) and val.endswith("%"): + val = val[:-1] + record[new_col] = val + yield record + + class TapUKEES(Tap): """Singer tap for UK Explore Education Statistics.""" @@ -730,6 +862,11 @@ class TapUKEES(Tap): th.ObjectType(), description="Mapping of 6-digit year code to download URL for legacy KS2 CSVs (e.g. {\"201819\": \"https://...\"})", ), + th.Property( + "legacy_ks4_urls", + th.ObjectType(), + description="Mapping of 6-digit year code to download URL for legacy KS4 ZIPs (e.g. {\"201819\": \"https://...\"})", + ), ).to_dict() def discover_streams(self): @@ -741,6 +878,7 @@ class TapUKEES(Tap): EESCensusStream(self), EESAdmissionsStream(self), LegacyKS2Stream(self), + LegacyKS4Stream(self), EESKs2NationalStream(self), ] diff --git a/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql index 533052a..4156d0e 100644 --- a/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql +++ b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql @@ -1,6 +1,13 @@ -- Intermediate model: KS4 data chained across academy conversions +-- Unions EES (2023/24 onwards) and legacy (2015/16–2018/19) school-level data -with current_ks4 as ( +with all_ks4 as ( + select * from {{ ref('stg_ees_ks4') }} + union all + select * from {{ ref('stg_legacy_ks4') }} +), + +current_ks4 as ( select urn as current_urn, urn as source_urn, @@ -11,8 +18,8 @@ with current_ks4 as ( english_maths_strong_pass_pct, english_maths_standard_pass_pct, ebacc_entry_pct, ebacc_strong_pass_pct, ebacc_standard_pass_pct, ebacc_avg_score, gcse_grade_91_pct, - sen_pct, sen_ehcp_pct, sen_support_pct - from {{ ref('stg_ees_ks4') }} + sen_pct, sen_support_pct, sen_ehcp_pct + from all_ks4 ), predecessor_ks4 as ( @@ -27,12 +34,12 @@ predecessor_ks4 as ( ks4.english_maths_strong_pass_pct, ks4.english_maths_standard_pass_pct, ks4.ebacc_entry_pct, ks4.ebacc_strong_pass_pct, ks4.ebacc_standard_pass_pct, ks4.ebacc_avg_score, ks4.gcse_grade_91_pct, - ks4.sen_pct, ks4.sen_ehcp_pct, ks4.sen_support_pct - from {{ ref('stg_ees_ks4') }} ks4 + ks4.sen_pct, ks4.sen_support_pct, ks4.sen_ehcp_pct + from all_ks4 ks4 inner join {{ ref('int_school_lineage') }} lin on ks4.urn = lin.predecessor_urn where not exists ( - select 1 from {{ ref('stg_ees_ks4') }} curr + select 1 from all_ks4 curr where curr.urn = lin.current_urn and curr.year = ks4.year ) diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml index b4b852e..491a27b 100644 --- a/pipeline/transform/models/staging/_stg_sources.yml +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -39,6 +39,9 @@ sources: - name: ees_ks4_info description: KS4 school information (wide format — context/demographics per school) + - name: legacy_ks4 + description: Pre-EES KS4 school-level data (2015/16–2018/19) from DfE Compare School Performance ZIPs + - name: ees_census description: School census pupil characteristics diff --git a/pipeline/transform/models/staging/stg_legacy_ks4.sql b/pipeline/transform/models/staging/stg_legacy_ks4.sql new file mode 100644 index 0000000..2254070 --- /dev/null +++ b/pipeline/transform/models/staging/stg_legacy_ks4.sql @@ -0,0 +1,49 @@ +{{ config(materialized='table') }} + +-- Staging model: Legacy KS4 data from pre-EES DfE performance tables +-- Covers 2015/16 – 2018/19; EES provides 2023/24 onwards. +-- The tap already maps old column names and strips % suffixes; +-- this model just applies safe_numeric casts and adds NULL placeholders +-- for columns not available in the legacy format. + +select + cast(trim(urn) as integer) as urn, + cast(trim(year) as integer) as year, + + {{ safe_numeric('total_pupils') }}::integer as total_pupils, + {{ safe_numeric('total_pupils') }}::integer as eligible_pupils, + null::numeric as prior_attainment_avg, + + -- Attainment 8 + {{ safe_numeric('attainment_8_score') }} as attainment_8_score, + + -- Progress 8 + {{ safe_numeric('progress_8_score') }} as progress_8_score, + {{ safe_numeric('progress_8_lower_ci') }} as progress_8_lower_ci, + {{ safe_numeric('progress_8_upper_ci') }} as progress_8_upper_ci, + {{ safe_numeric('progress_8_english') }} as progress_8_english, + {{ safe_numeric('progress_8_maths') }} as progress_8_maths, + {{ safe_numeric('progress_8_ebacc') }} as progress_8_ebacc, + {{ safe_numeric('progress_8_open') }} as progress_8_open, + + -- English & Maths pass rates + {{ safe_numeric('english_maths_strong_pass_pct') }} as english_maths_strong_pass_pct, + {{ safe_numeric('english_maths_standard_pass_pct') }} as english_maths_standard_pass_pct, + + -- EBacc + {{ safe_numeric('ebacc_entry_pct') }} as ebacc_entry_pct, + {{ safe_numeric('ebacc_strong_pass_pct') }} as ebacc_strong_pass_pct, + {{ safe_numeric('ebacc_standard_pass_pct') }} as ebacc_standard_pass_pct, + null::numeric as ebacc_avg_score, + + -- GCSE grade 9-1 (not published in legacy format) + null::numeric as gcse_grade_91_pct, + + -- SEN + null::numeric as sen_pct, + {{ safe_numeric('sen_support_pct') }} as sen_support_pct, + {{ safe_numeric('sen_ehcp_pct') }} as sen_ehcp_pct + +from {{ source('raw', 'legacy_ks4') }} +where urn is not null + and urn ~ '^[0-9]+$'