diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index 5d00c05..e95f19d 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -44,6 +44,8 @@ class EESDatasetStream(Stream): Subclasses set _target_filename to a keyword that appears in the target CSV path inside the ZIP (substring match, not exact). + Subclasses may set _column_renames to map messy CSV column names to + clean Singer field names before yielding records. """ replication_key = None @@ -51,6 +53,7 @@ class EESDatasetStream(Stream): _target_filename: str = "" # keyword that appears in the CSV path _urn_column: str = "school_urn" # column name for URN in the CSV _encoding: str = "utf-8" # CSV file encoding (some DfE files use latin-1) + _column_renames: dict = {} # CSV column name → Singer field name def get_records(self, context): import pandas as pd @@ -104,6 +107,10 @@ class EESDatasetStream(Stream): for _, row in df.iterrows(): record = row.to_dict() + # Apply subclass column renames (messy CSV names → clean Singer fields) + for old, new in self._column_renames.items(): + if old in record: + record[new] = record.pop(old) # Normalise URN column to 'school_urn' for consistency if self._urn_column in record and self._urn_column != "school_urn": record["school_urn"] = record.pop(self._urn_column) @@ -281,15 +288,75 @@ class EESCensusStream(EESDatasetStream): _target_filename = "spc_school_level_underlying_data" _urn_column = "urn" _encoding = "latin-1" + + # Map verbose CSV column names to clean Singer field names + _column_renames = { + "headcount of pupils": "total_pupils", + "headcount total female": "female_pupils", + "headcount total male": "male_pupils", + "full-time pupils": "full_time_pupils", + "part-time pupils": "part_time_pupils", + "total boarders": "total_boarders", + "number of pupils known to be eligible for free school meals": "fsm_eligible_n", + "% of pupils known to be eligible for free school meals": "fsm_pct", + "% of pupils whose first language is known or believed to be other than English": "eal_pct", + "% of pupils who are a young carer": "young_carer_pct", + "% of pupils classified as white British ethnic origin": "ethnicity_white_british_pct", + "% of pupils classified as any other white background ethnic origin": "ethnicity_white_other_pct", + "% of pupils classified as Gypsy/Roma ethnic origin": "ethnicity_gypsy_roma_pct", + "% of pupils classified as white and black Caribbean ethnic origin": "ethnicity_mixed_wbc_pct", + "% of pupils classified as white and black African ethnic origin": "ethnicity_mixed_wba_pct", + "% of pupils classified as white and Asian ethnic origin": "ethnicity_mixed_wa_pct", + "% of pupils classified as any other mixed background ethnic origin": "ethnicity_mixed_other_pct", + "% of pupils classified as Indian ethnic origin": "ethnicity_asian_indian_pct", + "% of pupils classified as Pakistani ethnic origin": "ethnicity_asian_pakistani_pct", + "% of pupils classified as Bangladeshi ethnic origin": "ethnicity_asian_bangladeshi_pct", + "% of pupils classified as any other Asian background ethnic origin": "ethnicity_asian_other_pct", + "% of pupils classified as Caribbean ethnic origin": "ethnicity_black_caribbean_pct", + "% of pupils classified as African ethnic origin": "ethnicity_black_african_pct", + "% of pupils classified as any other black background ethnic origin": "ethnicity_black_other_pct", + "% of pupils classified as Chinese ethnic origin": "ethnicity_chinese_pct", + "% of pupils classified as any other ethnic group ethnic origin": "ethnicity_other_pct", + "% of pupils unclassified": "ethnicity_unclassified_pct", + } + schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True), th.Property("school_name", th.StringType), th.Property("laestab", th.StringType), th.Property("phase_type_grouping", th.StringType), - # TODO: Add data columns (ethnicity %, FSM %, SEN %, etc.) once - # actual column names are verified on the container. The CSV has - # 269 columns — only the first 30 (metadata) have been inspected. + # Pupil counts + th.Property("total_pupils", th.StringType), + th.Property("female_pupils", th.StringType), + th.Property("male_pupils", th.StringType), + th.Property("full_time_pupils", th.StringType), + th.Property("part_time_pupils", th.StringType), + th.Property("total_boarders", th.StringType), + # FSM + th.Property("fsm_eligible_n", th.StringType), + th.Property("fsm_pct", th.StringType), + # EAL & young carers + th.Property("eal_pct", th.StringType), + th.Property("young_carer_pct", th.StringType), + # Ethnicity percentages + th.Property("ethnicity_white_british_pct", th.StringType), + th.Property("ethnicity_white_other_pct", th.StringType), + th.Property("ethnicity_gypsy_roma_pct", th.StringType), + th.Property("ethnicity_mixed_wbc_pct", th.StringType), + th.Property("ethnicity_mixed_wba_pct", th.StringType), + th.Property("ethnicity_mixed_wa_pct", th.StringType), + th.Property("ethnicity_mixed_other_pct", th.StringType), + th.Property("ethnicity_asian_indian_pct", th.StringType), + th.Property("ethnicity_asian_pakistani_pct", th.StringType), + th.Property("ethnicity_asian_bangladeshi_pct", th.StringType), + th.Property("ethnicity_asian_other_pct", th.StringType), + th.Property("ethnicity_black_caribbean_pct", th.StringType), + th.Property("ethnicity_black_african_pct", th.StringType), + th.Property("ethnicity_black_other_pct", th.StringType), + th.Property("ethnicity_chinese_pct", th.StringType), + th.Property("ethnicity_other_pct", th.StringType), + th.Property("ethnicity_unclassified_pct", th.StringType), ).to_dict() diff --git a/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql index 9c171b7..1576974 100644 --- a/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql +++ b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql @@ -1,8 +1,34 @@ -- Intermediate model: Merged pupil characteristics from census data --- TODO: Expand once census data columns are verified and added to stg_ees_census select urn, year, - phase_type_grouping + phase_type_grouping, + total_pupils, + female_pupils, + male_pupils, + full_time_pupils, + part_time_pupils, + total_boarders, + fsm_eligible_n, + fsm_pct, + eal_pct, + young_carer_pct, + ethnicity_white_british_pct, + ethnicity_white_other_pct, + ethnicity_gypsy_roma_pct, + ethnicity_mixed_wbc_pct, + ethnicity_mixed_wba_pct, + ethnicity_mixed_wa_pct, + ethnicity_mixed_other_pct, + ethnicity_asian_indian_pct, + ethnicity_asian_pakistani_pct, + ethnicity_asian_bangladeshi_pct, + ethnicity_asian_other_pct, + ethnicity_black_caribbean_pct, + ethnicity_black_african_pct, + ethnicity_black_other_pct, + ethnicity_chinese_pct, + ethnicity_other_pct, + ethnicity_unclassified_pct from {{ ref('stg_ees_census') }} diff --git a/pipeline/transform/models/marts/fact_pupil_characteristics.sql b/pipeline/transform/models/marts/fact_pupil_characteristics.sql index a3081ee..e8dec67 100644 --- a/pipeline/transform/models/marts/fact_pupil_characteristics.sql +++ b/pipeline/transform/models/marts/fact_pupil_characteristics.sql @@ -1,8 +1,34 @@ -- Mart: Pupil characteristics — one row per URN per year --- TODO: Expand once census data columns are verified and added to staging select urn, year, - phase_type_grouping + phase_type_grouping, + total_pupils, + female_pupils, + male_pupils, + full_time_pupils, + part_time_pupils, + total_boarders, + fsm_eligible_n, + fsm_pct, + eal_pct, + young_carer_pct, + ethnicity_white_british_pct, + ethnicity_white_other_pct, + ethnicity_gypsy_roma_pct, + ethnicity_mixed_wbc_pct, + ethnicity_mixed_wba_pct, + ethnicity_mixed_wa_pct, + ethnicity_mixed_other_pct, + ethnicity_asian_indian_pct, + ethnicity_asian_pakistani_pct, + ethnicity_asian_bangladeshi_pct, + ethnicity_asian_other_pct, + ethnicity_black_caribbean_pct, + ethnicity_black_african_pct, + ethnicity_black_other_pct, + ethnicity_chinese_pct, + ethnicity_other_pct, + ethnicity_unclassified_pct from {{ ref('int_pupil_chars_merged') }} diff --git a/pipeline/transform/models/staging/stg_ees_census.sql b/pipeline/transform/models/staging/stg_ees_census.sql index ec6438f..1f7ef8f 100644 --- a/pipeline/transform/models/staging/stg_ees_census.sql +++ b/pipeline/transform/models/staging/stg_ees_census.sql @@ -1,30 +1,55 @@ --- Staging model: School census pupil characteristics from EES --- File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/) --- Uses 'urn' column (not school_urn). Tap normalises to school_urn. --- --- TODO: The CSV has 269 columns but only metadata columns have been verified. --- Data columns (ethnicity %, FSM %, SEN %, class sizes) need to be discovered --- by inspecting the CSV on the Airflow container. The column references below --- are placeholders and will fail until the tap schema and this model are updated --- with the actual column names. +{{ config(materialized='table') }} + +-- Staging model: School Pupils and Characteristics (census) +-- One row per school per year. CSV has 269 columns; we extract the +-- aggregate demographic summaries only (not per-age/year-group breakdowns). +-- Column renames happen in the tap before Singer persists to raw.ees_census. with source as ( select * from {{ source('raw', 'ees_census') }} - where school_urn is not null -), - -renamed as ( - select - cast(school_urn as integer) as urn, - cast(time_period as integer) as year, - school_name, - phase_type_grouping - -- TODO: Add census data columns once verified: - -- fsm_pct, sen_support_pct, sen_ehcp_pct, eal_pct, - -- disadvantaged_pct, ethnicity_white_pct, ethnicity_asian_pct, - -- ethnicity_black_pct, ethnicity_mixed_pct, ethnicity_other_pct, - -- class_size_avg, stability_pct - from source + where school_urn ~ '^[0-9]+$' + and time_period ~ '^[0-9]+$' ) -select * from renamed +select + cast(trim(school_urn) as integer) as urn, + cast(trim(time_period) as integer) as year, + school_name, + phase_type_grouping, + + -- Pupil counts + {{ safe_numeric('total_pupils') }}::integer as total_pupils, + {{ safe_numeric('female_pupils') }}::integer as female_pupils, + {{ safe_numeric('male_pupils') }}::integer as male_pupils, + {{ safe_numeric('full_time_pupils') }}::integer as full_time_pupils, + {{ safe_numeric('part_time_pupils') }}::integer as part_time_pupils, + {{ safe_numeric('total_boarders') }}::integer as total_boarders, + + -- FSM + {{ safe_numeric('fsm_eligible_n') }}::integer as fsm_eligible_n, + {{ safe_numeric('fsm_pct') }} as fsm_pct, + + -- EAL & young carers + {{ safe_numeric('eal_pct') }} as eal_pct, + {{ safe_numeric('young_carer_pct') }} as young_carer_pct, + + -- Ethnicity + {{ safe_numeric('ethnicity_white_british_pct') }} as ethnicity_white_british_pct, + {{ safe_numeric('ethnicity_white_other_pct') }} as ethnicity_white_other_pct, + {{ safe_numeric('ethnicity_gypsy_roma_pct') }} as ethnicity_gypsy_roma_pct, + {{ safe_numeric('ethnicity_mixed_wbc_pct') }} as ethnicity_mixed_wbc_pct, + {{ safe_numeric('ethnicity_mixed_wba_pct') }} as ethnicity_mixed_wba_pct, + {{ safe_numeric('ethnicity_mixed_wa_pct') }} as ethnicity_mixed_wa_pct, + {{ safe_numeric('ethnicity_mixed_other_pct') }} as ethnicity_mixed_other_pct, + {{ safe_numeric('ethnicity_asian_indian_pct') }} as ethnicity_asian_indian_pct, + {{ safe_numeric('ethnicity_asian_pakistani_pct') }} as ethnicity_asian_pakistani_pct, + {{ safe_numeric('ethnicity_asian_bangladeshi_pct') }} as ethnicity_asian_bangladeshi_pct, + {{ safe_numeric('ethnicity_asian_other_pct') }} as ethnicity_asian_other_pct, + {{ safe_numeric('ethnicity_black_caribbean_pct') }} as ethnicity_black_caribbean_pct, + {{ safe_numeric('ethnicity_black_african_pct') }} as ethnicity_black_african_pct, + {{ safe_numeric('ethnicity_black_other_pct') }} as ethnicity_black_other_pct, + {{ safe_numeric('ethnicity_chinese_pct') }} as ethnicity_chinese_pct, + {{ safe_numeric('ethnicity_other_pct') }} as ethnicity_other_pct, + {{ safe_numeric('ethnicity_unclassified_pct') }} as ethnicity_unclassified_pct + +from source