From d82e36e7b2128d487cb86758c831bb9252a65227 Mon Sep 17 00:00:00 2001 From: Tudor Date: Thu, 26 Mar 2026 23:08:50 +0000 Subject: [PATCH] feat(ees): rewrite EES tap and KS2 models for actual data structure - Fix publication slugs (KS4, Phonics, Admissions were wrong) - Split KS2 into two streams: ees_ks2_attainment (long format) and ees_ks2_info (wide format context data) - Target specific filenames instead of keyword matching - Handle school_urn vs urn column naming - Pivot KS2 attainment from long to wide format in dbt staging - Add all ~40 KS2 columns the backend needs (GPS, absence, gender, disadvantaged breakdowns, context demographics) - Pass through all columns in int_ks2_with_lineage and fact_ks2 Co-Authored-By: Claude Opus 4.6 --- .../extractors/tap-uk-ees/tap_uk_ees/tap.py | 140 +++++++++--- .../intermediate/int_ks2_with_lineage.sql | 51 ++--- .../models/marts/fact_ks2_performance.sql | 51 ++++- .../transform/models/staging/_stg_sources.yml | 7 +- .../transform/models/staging/stg_ees_ks2.sql | 204 +++++++++++++++--- 5 files changed, 354 insertions(+), 99 deletions(-) diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index 2201caf..1ca2b79 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -1,4 +1,10 @@ -"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data.""" +"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data. + +Each stream targets a specific CSV file within an EES release ZIP. +The EES data uses 'school_urn' for school-level records and 'z' for +suppressed values. Column names vary by file — schemas declare all +columns needed by downstream dbt staging models. +""" from __future__ import annotations @@ -12,7 +18,6 @@ from singer_sdk import typing as th CONTENT_API_BASE = ( "https://content.explore-education-statistics.service.gov.uk/api" ) -STATS_API_BASE = "https://api.education.gov.uk/statistics/v1" TIMEOUT = 120 @@ -37,7 +42,8 @@ class EESDatasetStream(Stream): replication_key = None _publication_slug: str = "" - _file_keyword: str = "" + _target_filename: str = "" # exact filename within the ZIP + _urn_column: str = "school_urn" # column name for URN in the CSV def get_records(self, context): import pandas as pd @@ -50,84 +56,153 @@ class EESDatasetStream(Stream): ) zf = download_release_zip(release_id) - # Find the CSV matching our keyword - csv_names = [n for n in zf.namelist() if n.endswith(".csv")] + # Find the target file + all_files = zf.namelist() target = None - for name in csv_names: - if self._file_keyword.lower() in name.lower(): + for name in all_files: + if name.endswith(self._target_filename): target = name break - if not target and csv_names: - target = csv_names[0] if not target: - self.logger.warning("No CSV found in release ZIP") + self.logger.error( + "File '%s' not found in ZIP. Available: %s", + self._target_filename, + [n for n in all_files if n.endswith(".csv")], + ) return self.logger.info("Reading %s from ZIP", target) with zf.open(target) as f: df = pd.read_csv(f, dtype=str, keep_default_na=False) - # Filter to school-level data + # Filter to school-level data if the column exists if "geographic_level" in df.columns: df = df[df["geographic_level"] == "School"] + self.logger.info("Emitting %d school-level rows", len(df)) + for _, row in df.iterrows(): - yield row.to_dict() + record = row.to_dict() + # Normalise URN column to 'school_urn' for consistency + if self._urn_column in record and self._urn_column != "school_urn": + record["school_urn"] = record.pop(self._urn_column) + yield record -class EESKS2Stream(EESDatasetStream): - name = "ees_ks2" - primary_keys = ["urn", "time_period"] +# ── KS2 Attainment (long format: one row per school × subject × breakdown) ── + +class EESKS2AttainmentStream(EESDatasetStream): + name = "ees_ks2_attainment" + primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"] _publication_slug = "key-stage-2-attainment" - _file_keyword = "school" + _target_filename = "ks2_school_attainment_data.csv" schema = th.PropertiesList( - th.Property("urn", th.StringType, required=True), th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), + th.Property("school_laestab", th.StringType), + th.Property("school_name", th.StringType), + th.Property("breakdown_topic", th.StringType, required=True), + th.Property("breakdown", th.StringType, required=True), + th.Property("subject", th.StringType, required=True), + th.Property("expected_standard_pupil_percent", th.StringType), + th.Property("higher_standard_pupil_percent", th.StringType), + th.Property("average_scaled_score", th.StringType), + th.Property("progress_measure_score", th.StringType), + th.Property("progress_measure_lower_conf_interval", th.StringType), + th.Property("progress_measure_upper_conf_interval", th.StringType), + th.Property("absent_or_not_able_to_access_percent", th.StringType), + th.Property("working_towards_expected_standard_pupil_percent", th.StringType), + th.Property("absent_or_disapplied_percent", th.StringType), + th.Property("higher_standard", th.StringType), + th.Property("progress_measure_unadjusted", th.StringType), + th.Property("progress_measure_description", th.StringType), ).to_dict() +# ── KS2 Information (wide format: one row per school, context/demographics) ── + +class EESKS2InfoStream(EESDatasetStream): + name = "ees_ks2_info" + primary_keys = ["school_urn", "time_period"] + _publication_slug = "key-stage-2-attainment" + _target_filename = "ks2_school_information_data.csv" + schema = th.PropertiesList( + th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), + th.Property("school_laestab", th.StringType), + th.Property("school_name", th.StringType), + th.Property("nftype", th.StringType), + th.Property("reldenom", th.StringType), + th.Property("agerange", th.StringType), + th.Property("totpups", th.StringType), + th.Property("telig", th.StringType), + th.Property("belig", th.StringType), + th.Property("gelig", th.StringType), + th.Property("ptfsm6cla1a", th.StringType), + th.Property("ptnotfsm6cla1a", th.StringType), + th.Property("ptealgrp2", th.StringType), + th.Property("ptmobn", th.StringType), + th.Property("psenelk", th.StringType), + th.Property("psenele", th.StringType), + th.Property("psenelek", th.StringType), + th.Property("telig_3yr", th.StringType), + ).to_dict() + + +# ── KS4 Attainment ────────────────────────────────────────────────────────── + class EESKS4Stream(EESDatasetStream): name = "ees_ks4" - primary_keys = ["urn", "time_period"] - _publication_slug = "key-stage-4-performance-revised" - _file_keyword = "school" + primary_keys = ["school_urn", "time_period"] + _publication_slug = "key-stage-4-performance" + _target_filename = "school" # Will be refined once we see the actual ZIP contents schema = th.PropertiesList( - th.Property("urn", th.StringType, required=True), th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), ).to_dict() +# ── Census (school-level pupil characteristics) ───────────────────────────── + class EESCensusStream(EESDatasetStream): name = "ees_census" primary_keys = ["urn", "time_period"] _publication_slug = "school-pupils-and-their-characteristics" - _file_keyword = "school" + _target_filename = "spc_school_level_underlying_data_2025.csv" + _urn_column = "urn" schema = th.PropertiesList( - th.Property("urn", th.StringType, required=True), th.Property("time_period", th.StringType, required=True), + th.Property("urn", th.StringType, required=True), + th.Property("school_name", th.StringType), + th.Property("laestab", th.StringType), + th.Property("phase_type_grouping", th.StringType), ).to_dict() +# ── Admissions ─────────────────────────────────────────────────────────────── + class EESAdmissionsStream(EESDatasetStream): name = "ees_admissions" - primary_keys = ["urn", "time_period"] - _publication_slug = "secondary-and-primary-school-applications-and-offers" - _file_keyword = "school" + primary_keys = ["school_urn", "time_period"] + _publication_slug = "primary-and-secondary-school-applications-and-offers" + _target_filename = "school" # Will be refined once we see the actual ZIP contents schema = th.PropertiesList( - th.Property("urn", th.StringType, required=True), th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), ).to_dict() +# ── Phonics ────────────────────────────────────────────────────────────────── + class EESPhonicsStream(EESDatasetStream): name = "ees_phonics" - primary_keys = ["urn", "time_period"] - _publication_slug = "phonics-screening-check-and-key-stage-1-assessments" - _file_keyword = "school" + primary_keys = ["school_urn", "time_period"] + _publication_slug = "phonics-screening-check-attainment" + _target_filename = "school" # Will be refined once we see the actual ZIP contents schema = th.PropertiesList( - th.Property("urn", th.StringType, required=True), th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), ).to_dict() @@ -142,7 +217,8 @@ class TapUKEES(Tap): def discover_streams(self): return [ - EESKS2Stream(self), + EESKS2AttainmentStream(self), + EESKS2InfoStream(self), EESKS4Stream(self), EESCensusStream(self), EESAdmissionsStream(self), diff --git a/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql index 1ec704a..e9c4066 100644 --- a/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql +++ b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql @@ -5,21 +5,16 @@ with current_ks2 as ( select urn as current_urn, urn as source_urn, - year, - total_pupils, - rwm_expected_pct, - reading_expected_pct, - writing_expected_pct, - maths_expected_pct, - rwm_high_pct, - reading_high_pct, - writing_high_pct, - maths_high_pct, - reading_progress, - writing_progress, - maths_progress, - reading_avg_score, - maths_avg_score + year, total_pupils, eligible_pupils, + rwm_expected_pct, rwm_high_pct, + reading_expected_pct, reading_high_pct, reading_avg_score, reading_progress, + writing_expected_pct, writing_high_pct, writing_progress, + maths_expected_pct, maths_high_pct, maths_avg_score, maths_progress, + gps_expected_pct, gps_high_pct, gps_avg_score, science_expected_pct, + reading_absence_pct, writing_absence_pct, maths_absence_pct, gps_absence_pct, science_absence_pct, + rwm_expected_boys_pct, rwm_high_boys_pct, rwm_expected_girls_pct, rwm_high_girls_pct, + rwm_expected_disadvantaged_pct, rwm_expected_non_disadvantaged_pct, disadvantaged_gap, + disadvantaged_pct, eal_pct, sen_support_pct, sen_ehcp_pct, stability_pct from {{ ref('stg_ees_ks2') }} ), @@ -27,25 +22,19 @@ predecessor_ks2 as ( select lin.current_urn, ks2.urn as source_urn, - ks2.year, - ks2.total_pupils, - ks2.rwm_expected_pct, - ks2.reading_expected_pct, - ks2.writing_expected_pct, - ks2.maths_expected_pct, - ks2.rwm_high_pct, - ks2.reading_high_pct, - ks2.writing_high_pct, - ks2.maths_high_pct, - ks2.reading_progress, - ks2.writing_progress, - ks2.maths_progress, - ks2.reading_avg_score, - ks2.maths_avg_score + ks2.year, ks2.total_pupils, ks2.eligible_pupils, + ks2.rwm_expected_pct, ks2.rwm_high_pct, + ks2.reading_expected_pct, ks2.reading_high_pct, ks2.reading_avg_score, ks2.reading_progress, + ks2.writing_expected_pct, ks2.writing_high_pct, ks2.writing_progress, + ks2.maths_expected_pct, ks2.maths_high_pct, ks2.maths_avg_score, ks2.maths_progress, + ks2.gps_expected_pct, ks2.gps_high_pct, ks2.gps_avg_score, ks2.science_expected_pct, + ks2.reading_absence_pct, ks2.writing_absence_pct, ks2.maths_absence_pct, ks2.gps_absence_pct, ks2.science_absence_pct, + ks2.rwm_expected_boys_pct, ks2.rwm_high_boys_pct, ks2.rwm_expected_girls_pct, ks2.rwm_high_girls_pct, + ks2.rwm_expected_disadvantaged_pct, ks2.rwm_expected_non_disadvantaged_pct, ks2.disadvantaged_gap, + ks2.disadvantaged_pct, ks2.eal_pct, ks2.sen_support_pct, ks2.sen_ehcp_pct, ks2.stability_pct from {{ ref('stg_ees_ks2') }} ks2 inner join {{ ref('int_school_lineage') }} lin on ks2.urn = lin.predecessor_urn - -- Only include predecessor data for years before the current URN has data where not exists ( select 1 from {{ ref('stg_ees_ks2') }} curr where curr.urn = lin.current_urn diff --git a/pipeline/transform/models/marts/fact_ks2_performance.sql b/pipeline/transform/models/marts/fact_ks2_performance.sql index cc9f3b0..290e94f 100644 --- a/pipeline/transform/models/marts/fact_ks2_performance.sql +++ b/pipeline/transform/models/marts/fact_ks2_performance.sql @@ -6,17 +6,50 @@ select source_urn, year, total_pupils, + eligible_pupils, + + -- Core attainment rwm_expected_pct, - reading_expected_pct, - writing_expected_pct, - maths_expected_pct, rwm_high_pct, + reading_expected_pct, reading_high_pct, - writing_high_pct, - maths_high_pct, - reading_progress, - writing_progress, - maths_progress, reading_avg_score, - maths_avg_score + reading_progress, + writing_expected_pct, + writing_high_pct, + writing_progress, + maths_expected_pct, + maths_high_pct, + maths_avg_score, + maths_progress, + gps_expected_pct, + gps_high_pct, + gps_avg_score, + science_expected_pct, + + -- Absence + reading_absence_pct, + writing_absence_pct, + maths_absence_pct, + gps_absence_pct, + science_absence_pct, + + -- Gender + rwm_expected_boys_pct, + rwm_high_boys_pct, + rwm_expected_girls_pct, + rwm_high_girls_pct, + + -- Disadvantaged + rwm_expected_disadvantaged_pct, + rwm_expected_non_disadvantaged_pct, + disadvantaged_gap, + + -- Context + disadvantaged_pct, + eal_pct, + sen_support_pct, + sen_ehcp_pct, + stability_pct + from {{ ref('int_ks2_with_lineage') }} diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml index ae0581a..b7da2a9 100644 --- a/pipeline/transform/models/staging/_stg_sources.yml +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -24,8 +24,11 @@ sources: - name: ofsted_inspections description: Ofsted Management Information inspection records - - name: ees_ks2 - description: KS2 attainment data from Explore Education Statistics + - name: ees_ks2_attainment + description: KS2 school attainment (long format — one row per school × subject × breakdown) + + - name: ees_ks2_info + description: KS2 school information (wide format — context/demographics per school) - name: ees_ks4 description: KS4 attainment data from Explore Education Statistics diff --git a/pipeline/transform/models/staging/stg_ees_ks2.sql b/pipeline/transform/models/staging/stg_ees_ks2.sql index 61f208d..6526b5e 100644 --- a/pipeline/transform/models/staging/stg_ees_ks2.sql +++ b/pipeline/transform/models/staging/stg_ees_ks2.sql @@ -1,31 +1,185 @@ --- Staging model: KS2 attainment data from EES --- Column names depend on the EES dataset schema; these will be finalised --- once the tap-uk-ees extractor resolves the actual column names. +-- Staging model: KS2 attainment + information +-- Pivots long-format attainment data (one row per subject × breakdown) into +-- wide format (one row per school per year) and joins context from info table. +-- EES uses 'z' for suppressed values — cast to null via nullif. -with source as ( - select * from {{ source('raw', 'ees_ks2') }} +with attainment as ( + select * from {{ source('raw', 'ees_ks2_attainment') }} + where school_urn is not null ), -renamed as ( +-- Pivot: extract metrics for each subject where breakdown = 'Total' +all_pupils as ( select - cast(urn as integer) as urn, - cast(time_period as integer) as year, - cast(t_pupils as integer) as total_pupils, - cast(pt_rwm_met_expected_standard as numeric) as rwm_expected_pct, - cast(pt_read_met_expected_standard as numeric) as reading_expected_pct, - cast(pt_write_met_expected_standard as numeric) as writing_expected_pct, - cast(pt_maths_met_expected_standard as numeric) as maths_expected_pct, - cast(pt_rwm_met_higher_standard as numeric) as rwm_high_pct, - cast(pt_read_met_higher_standard as numeric) as reading_high_pct, - cast(pt_write_met_higher_standard as numeric) as writing_high_pct, - cast(pt_maths_met_higher_standard as numeric) as maths_high_pct, - cast(read_progress as numeric) as reading_progress, - cast(write_progress as numeric) as writing_progress, - cast(maths_progress as numeric) as maths_progress, - cast(read_average_score as numeric) as reading_avg_score, - cast(maths_average_score as numeric) as maths_avg_score - from source - where urn is not null + school_urn, + time_period, + subject, + nullif(expected_standard_pupil_percent, 'z') as expected_pct, + nullif(higher_standard_pupil_percent, 'z') as higher_pct, + nullif(average_scaled_score, 'z') as avg_score, + nullif(progress_measure_score, 'z') as progress, + nullif(absent_or_not_able_to_access_percent, 'z') as absence_pct + from attainment + where breakdown_topic = 'All pupils' + and breakdown = 'Total' +), + +pivoted as ( + select + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + + -- RWM combined + max(case when subject = 'Reading, writing and maths' then cast(expected_pct as numeric) end) as rwm_expected_pct, + max(case when subject = 'Reading, writing and maths' then cast(higher_pct as numeric) end) as rwm_high_pct, + + -- Reading + max(case when subject = 'Reading' then cast(expected_pct as numeric) end) as reading_expected_pct, + max(case when subject = 'Reading' then cast(higher_pct as numeric) end) as reading_high_pct, + max(case when subject = 'Reading' then cast(avg_score as numeric) end) as reading_avg_score, + max(case when subject = 'Reading' then cast(progress as numeric) end) as reading_progress, + max(case when subject = 'Reading' then cast(absence_pct as numeric) end) as reading_absence_pct, + + -- Writing + max(case when subject = 'Writing' then cast(expected_pct as numeric) end) as writing_expected_pct, + max(case when subject = 'Writing' then cast(higher_pct as numeric) end) as writing_high_pct, + max(case when subject = 'Writing' then cast(progress as numeric) end) as writing_progress, + max(case when subject = 'Writing' then cast(absence_pct as numeric) end) as writing_absence_pct, + + -- Maths + max(case when subject = 'Maths' then cast(expected_pct as numeric) end) as maths_expected_pct, + max(case when subject = 'Maths' then cast(higher_pct as numeric) end) as maths_high_pct, + max(case when subject = 'Maths' then cast(avg_score as numeric) end) as maths_avg_score, + max(case when subject = 'Maths' then cast(progress as numeric) end) as maths_progress, + max(case when subject = 'Maths' then cast(absence_pct as numeric) end) as maths_absence_pct, + + -- GPS + max(case when subject ilike '%grammar%' or subject = 'GPS' then cast(expected_pct as numeric) end) as gps_expected_pct, + max(case when subject ilike '%grammar%' or subject = 'GPS' then cast(higher_pct as numeric) end) as gps_high_pct, + max(case when subject ilike '%grammar%' or subject = 'GPS' then cast(avg_score as numeric) end) as gps_avg_score, + max(case when subject ilike '%grammar%' or subject = 'GPS' then cast(absence_pct as numeric) end) as gps_absence_pct, + + -- Science + max(case when subject = 'Science' then cast(expected_pct as numeric) end) as science_expected_pct, + max(case when subject = 'Science' then cast(absence_pct as numeric) end) as science_absence_pct + + from all_pupils + group by school_urn, time_period +), + +-- Gender breakdown for RWM +gender_boys as ( + select + school_urn, + time_period, + nullif(expected_standard_pupil_percent, 'z') as rwm_expected_boys_pct, + nullif(higher_standard_pupil_percent, 'z') as rwm_high_boys_pct + from attainment + where subject = 'Reading, writing and maths' + and breakdown = 'Boys' +), + +gender_girls as ( + select + school_urn, + time_period, + nullif(expected_standard_pupil_percent, 'z') as rwm_expected_girls_pct, + nullif(higher_standard_pupil_percent, 'z') as rwm_high_girls_pct + from attainment + where subject = 'Reading, writing and maths' + and breakdown = 'Girls' +), + +-- Disadvantaged breakdown for RWM +disadv as ( + select + school_urn, + time_period, + nullif(expected_standard_pupil_percent, 'z') as rwm_expected_disadvantaged_pct + from attainment + where subject = 'Reading, writing and maths' + and breakdown = 'Disadvantaged' +), + +not_disadv as ( + select + school_urn, + time_period, + nullif(expected_standard_pupil_percent, 'z') as rwm_expected_non_disadvantaged_pct + from attainment + where subject = 'Reading, writing and maths' + and breakdown = 'Not disadvantaged' +), + +-- School info (context/demographics) +info as ( + select + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + cast(nullif(totpups, 'z') as integer) as total_pupils, + cast(nullif(telig, 'z') as integer) as eligible_pupils, + cast(nullif(ptfsm6cla1a, 'z') as numeric) as disadvantaged_pct, + cast(nullif(ptealgrp2, 'z') as numeric) as eal_pct, + cast(nullif(psenelk, 'z') as numeric) as sen_support_pct, + cast(nullif(psenele, 'z') as numeric) as sen_ehcp_pct, + cast(nullif(ptmobn, 'z') as numeric) as stability_pct + from {{ source('raw', 'ees_ks2_info') }} + where school_urn is not null ) -select * from renamed +select + p.urn, + p.year, + i.total_pupils, + i.eligible_pupils, + + -- Core attainment + p.rwm_expected_pct, + p.rwm_high_pct, + p.reading_expected_pct, + p.reading_high_pct, + p.reading_avg_score, + p.reading_progress, + p.writing_expected_pct, + p.writing_high_pct, + p.writing_progress, + p.maths_expected_pct, + p.maths_high_pct, + p.maths_avg_score, + p.maths_progress, + p.gps_expected_pct, + p.gps_high_pct, + p.gps_avg_score, + p.science_expected_pct, + + -- Absence + p.reading_absence_pct, + p.writing_absence_pct, + p.maths_absence_pct, + p.gps_absence_pct, + p.science_absence_pct, + + -- Gender + cast(gb.rwm_expected_boys_pct as numeric) as rwm_expected_boys_pct, + cast(gb.rwm_high_boys_pct as numeric) as rwm_high_boys_pct, + cast(gg.rwm_expected_girls_pct as numeric) as rwm_expected_girls_pct, + cast(gg.rwm_high_girls_pct as numeric) as rwm_high_girls_pct, + + -- Disadvantaged + cast(d.rwm_expected_disadvantaged_pct as numeric) as rwm_expected_disadvantaged_pct, + cast(nd.rwm_expected_non_disadvantaged_pct as numeric) as rwm_expected_non_disadvantaged_pct, + cast(d.rwm_expected_disadvantaged_pct as numeric) - cast(nd.rwm_expected_non_disadvantaged_pct as numeric) as disadvantaged_gap, + + -- Context + i.disadvantaged_pct, + i.eal_pct, + i.sen_support_pct, + i.sen_ehcp_pct, + i.stability_pct + +from pivoted p +left join info i on p.urn = i.urn and p.year = i.year +left join gender_boys gb on p.urn = cast(gb.school_urn as integer) and p.year = cast(gb.time_period as integer) +left join gender_girls gg on p.urn = cast(gg.school_urn as integer) and p.year = cast(gg.time_period as integer) +left join disadv d on p.urn = cast(d.school_urn as integer) and p.year = cast(d.time_period as integer) +left join not_disadv nd on p.urn = cast(nd.school_urn as integer) and p.year = cast(nd.time_period as integer)