From e7b1ab9f372e387dd99a890409657336a409d202 Mon Sep 17 00:00:00 2001 From: Tudor Date: Thu, 26 Mar 2026 20:43:24 +0000 Subject: [PATCH] fix(pipeline): expand GIAS schema, handle empty strings, scope DAG selectors - Declare all 34 columns needed by dbt in GIAS tap schema (target-postgres only persists columns present in the Singer schema message) - Use nullif() for empty-string-to-integer/date casts in staging models - Scope daily DAG dbt build to GIAS models only (stg_gias_establishments+ stg_gias_links+) to avoid errors on unloaded sources - Scope annual EES DAG similarly; remove redundant dbt test steps - Make dim_school gracefully handle missing int_ofsted_latest table Co-Authored-By: Claude Opus 4.6 --- pipeline/dags/school_data_pipeline.py | 18 +++-------- .../extractors/tap-uk-gias/tap_uk_gias/tap.py | 31 +++++++++++++++++-- .../transform/models/marts/dim_school.sql | 22 +++++++++---- .../staging/stg_gias_establishments.sql | 21 ++++++------- .../models/staging/stg_gias_links.sql | 2 +- 5 files changed, 59 insertions(+), 35 deletions(-) diff --git a/pipeline/dags/school_data_pipeline.py b/pipeline/dags/school_data_pipeline.py index 0dac821..8b7a1ac 100644 --- a/pipeline/dags/school_data_pipeline.py +++ b/pipeline/dags/school_data_pipeline.py @@ -79,12 +79,7 @@ print(f'Validation passed: {{count}} GIAS rows') dbt_build = BashOperator( task_id="dbt_build", - bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", - ) - - dbt_test = BashOperator( - task_id="dbt_test", - bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_gias_establishments+ stg_gias_links+", ) geocode_new = BashOperator( @@ -97,7 +92,7 @@ print(f'Validation passed: {{count}} GIAS rows') bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", ) - extract_group >> validate_raw >> dbt_build >> dbt_test >> geocode_new >> sync_typesense + extract_group >> validate_raw >> dbt_build >> geocode_new >> sync_typesense # ── Monthly DAG (Ofsted) ─────────────────────────────────────────────── @@ -150,12 +145,7 @@ with DAG( dbt_build_ees = BashOperator( task_id="dbt_build", - bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", - ) - - dbt_test_ees = BashOperator( - task_id="dbt_test", - bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_phonics+", ) sync_typesense_ees = BashOperator( @@ -163,4 +153,4 @@ with DAG( bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", ) - extract_ees_group >> dbt_build_ees >> dbt_test_ees >> sync_typesense_ees + extract_ees_group >> dbt_build_ees >> sync_typesense_ees diff --git a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py index 7c01b78..250dc25 100644 --- a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py +++ b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py @@ -25,9 +25,9 @@ class GIASEstablishmentsStream(Stream): primary_keys = ["URN"] replication_key = None - # Schema is wide (~250 columns); we declare key columns and pass through the rest - # All columns are read as strings from CSV; dbt staging models handle type casting. - # Only URN is cast to int in get_records() for the primary key. + # All columns used by dbt staging models must be declared here — + # target-postgres only persists columns present in the Singer schema. + # All non-PK columns are StringType; dbt handles type casting. schema = th.PropertiesList( th.Property("URN", th.IntegerType, required=True), th.Property("EstablishmentName", th.StringType), @@ -38,6 +38,31 @@ class GIASEstablishmentsStream(Stream): th.Property("EstablishmentNumber", th.StringType), th.Property("EstablishmentStatus (name)", th.StringType), th.Property("Postcode", th.StringType), + th.Property("Gender (name)", th.StringType), + th.Property("ReligiousCharacter (name)", th.StringType), + th.Property("AdmissionsPolicy (name)", th.StringType), + th.Property("SchoolCapacity", th.StringType), + th.Property("NumberOfPupils", th.StringType), + th.Property("HeadTitle (name)", th.StringType), + th.Property("HeadFirstName", th.StringType), + th.Property("HeadLastName", th.StringType), + th.Property("TelephoneNum", th.StringType), + th.Property("SchoolWebsite", th.StringType), + th.Property("Street", th.StringType), + th.Property("Locality", th.StringType), + th.Property("Town", th.StringType), + th.Property("County (name)", th.StringType), + th.Property("OpenDate", th.StringType), + th.Property("CloseDate", th.StringType), + th.Property("Trusts (name)", th.StringType), + th.Property("Trusts (code)", th.StringType), + th.Property("UrbanRural (name)", th.StringType), + th.Property("ParliamentaryConstituency (name)", th.StringType), + th.Property("NurseryProvision (name)", th.StringType), + th.Property("Easting", th.StringType), + th.Property("Northing", th.StringType), + th.Property("StatutoryLowAge", th.StringType), + th.Property("StatutoryHighAge", th.StringType), ).to_dict() def get_records(self, context): diff --git a/pipeline/transform/models/marts/dim_school.sql b/pipeline/transform/models/marts/dim_school.sql index 420dec2..9dead72 100644 --- a/pipeline/transform/models/marts/dim_school.sql +++ b/pipeline/transform/models/marts/dim_school.sql @@ -2,12 +2,14 @@ with schools as ( select * from {{ ref('stg_gias_establishments') }} -), - -latest_ofsted as ( - select * from {{ ref('int_ofsted_latest') }} ) +{% set ofsted_relation = adapter.get_relation( + database=target.database, + schema=target.schema, + identifier='int_ofsted_latest' +) %} + select s.urn, s.local_authority_code * 1000 + s.establishment_number as laestab, @@ -30,11 +32,19 @@ select s.nursery_provision, s.admissions_policy, - -- Latest Ofsted + -- Latest Ofsted (populated after monthly Ofsted pipeline runs) + {% if ofsted_relation is not none %} o.overall_effectiveness as ofsted_grade, o.inspection_date as ofsted_date, o.framework as ofsted_framework + {% else %} + null::text as ofsted_grade, + null::date as ofsted_date, + null::text as ofsted_framework + {% endif %} from schools s -left join latest_ofsted o on s.urn = o.urn +{% if ofsted_relation is not none %} +left join {{ ref('int_ofsted_latest') }} o on s.urn = o.urn +{% endif %} where s.status = 'Open' diff --git a/pipeline/transform/models/staging/stg_gias_establishments.sql b/pipeline/transform/models/staging/stg_gias_establishments.sql index 43ff77c..1b4af2e 100644 --- a/pipeline/transform/models/staging/stg_gias_establishments.sql +++ b/pipeline/transform/models/staging/stg_gias_establishments.sql @@ -8,9 +8,9 @@ with source as ( renamed as ( select cast("URN" as integer) as urn, - cast("LA (code)" as integer) as local_authority_code, + cast(nullif("LA (code)", '') as integer) as local_authority_code, "LA (name)" as local_authority_name, - cast("EstablishmentNumber" as integer) as establishment_number, + cast(nullif("EstablishmentNumber", '') as integer) as establishment_number, "EstablishmentName" as school_name, "TypeOfEstablishment (name)" as school_type, "PhaseOfEducation (name)" as phase, @@ -18,7 +18,7 @@ renamed as ( "ReligiousCharacter (name)" as religious_character, "AdmissionsPolicy (name)" as admissions_policy, "SchoolCapacity" as capacity, - cast("NumberOfPupils" as integer) as total_pupils, + cast(nullif("NumberOfPupils", '') as integer) as total_pupils, "HeadTitle (name)" as head_title, "HeadFirstName" as head_first_name, "HeadLastName" as head_last_name, @@ -30,18 +30,17 @@ renamed as ( "County (name)" as county, "Postcode" as postcode, "EstablishmentStatus (name)" as status, - cast("OpenDate" as date) as open_date, - cast("CloseDate" as date) as close_date, + case when "OpenDate" = '' then null else cast("OpenDate" as date) end as open_date, + case when "CloseDate" = '' then null else cast("CloseDate" as date) end as close_date, "Trusts (name)" as academy_trust_name, - cast("Trusts (code)" as integer) as academy_trust_uid, + cast(nullif("Trusts (code)", '') as integer) as academy_trust_uid, "UrbanRural (name)" as urban_rural, "ParliamentaryConstituency (name)" as parliamentary_constituency, "NurseryProvision (name)" as nursery_provision, - cast("Easting" as integer) as easting, - cast("Northing" as integer) as northing, - -- Age range - cast("StatutoryLowAge" as integer) as statutory_low_age, - cast("StatutoryHighAge" as integer) as statutory_high_age + cast(nullif("Easting", '') as integer) as easting, + cast(nullif("Northing", '') as integer) as northing, + cast(nullif("StatutoryLowAge", '') as integer) as statutory_low_age, + cast(nullif("StatutoryHighAge", '') as integer) as statutory_high_age from source where "URN" is not null ) diff --git a/pipeline/transform/models/staging/stg_gias_links.sql b/pipeline/transform/models/staging/stg_gias_links.sql index f234d27..1e3ccf6 100644 --- a/pipeline/transform/models/staging/stg_gias_links.sql +++ b/pipeline/transform/models/staging/stg_gias_links.sql @@ -9,7 +9,7 @@ renamed as ( cast("URN" as integer) as urn, cast("LinkURN" as integer) as linked_urn, "LinkType" as link_type, - cast("LinkEstablishedDate" as date) as link_date + case when "LinkEstablishedDate" = '' then null else cast("LinkEstablishedDate" as date) end as link_date from source where "URN" is not null and "LinkURN" is not null