Files
school_compare/pipeline/dags/school_data_pipeline.py
Tudor e7b1ab9f37
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Integrator (push) Successful in 57s
Build and Push Docker Images / Build Kestra Init (push) Successful in 34s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m39s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s
fix(pipeline): expand GIAS schema, handle empty strings, scope DAG selectors
- Declare all 34 columns needed by dbt in GIAS tap schema (target-postgres
  only persists columns present in the Singer schema message)
- Use nullif() for empty-string-to-integer/date casts in staging models
- Scope daily DAG dbt build to GIAS models only (stg_gias_establishments+
  stg_gias_links+) to avoid errors on unloaded sources
- Scope annual EES DAG similarly; remove redundant dbt test steps
- Make dim_school gracefully handle missing int_ofsted_latest table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 20:43:24 +00:00

157 lines
5.1 KiB
Python

"""
School Data Pipeline — Airflow DAG
Orchestrates the full ELT pipeline:
Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache
Schedule:
- GIAS: Daily at 03:00
- Ofsted: 1st of month at 02:00
- EES datasets: Annual (triggered manually or on detected release)
"""
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.task_group import TaskGroup
try:
from airflow.providers.standard.operators.bash import BashOperator
except ImportError:
from airflow.operators.bash import BashOperator
PIPELINE_DIR = "/opt/pipeline"
MELTANO_BIN = "meltano"
DBT_BIN = "dbt"
default_args = {
"owner": "school-compare",
"depends_on_past": False,
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# ── Daily DAG (GIAS + downstream) ──────────────────────────────────────
with DAG(
dag_id="school_data_daily",
default_args=default_args,
description="Daily school data pipeline (GIAS extract → full transform)",
schedule="0 3 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "daily"],
) as daily_dag:
with TaskGroup("extract") as extract_group:
extract_gias = BashOperator(
task_id="extract_gias",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-gias target-postgres",
)
validate_raw = BashOperator(
task_id="validate_raw",
bash_command=f"""
cd {PIPELINE_DIR} && python -c "
import psycopg2, os, sys
conn = psycopg2.connect(
host=os.environ.get('PG_HOST', 'localhost'),
port=os.environ.get('PG_PORT', '5432'),
user=os.environ.get('PG_USER', 'postgres'),
password=os.environ.get('PG_PASSWORD', 'postgres'),
dbname=os.environ.get('PG_DATABASE', 'school_compare'),
)
cur = conn.cursor()
cur.execute('SELECT count(*) FROM raw.gias_establishments')
count = cur.fetchone()[0]
conn.close()
if count < 20000:
print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr)
sys.exit(1)
print(f'Validation passed: {{count}} GIAS rows')
"
""",
)
dbt_build = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_gias_establishments+ stg_gias_links+",
)
geocode_new = BashOperator(
task_id="geocode_new",
bash_command=f"cd {PIPELINE_DIR} && python scripts/geocode_postcodes.py",
)
sync_typesense = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_group >> validate_raw >> dbt_build >> geocode_new >> sync_typesense
# ── Monthly DAG (Ofsted) ───────────────────────────────────────────────
with DAG(
dag_id="school_data_monthly_ofsted",
default_args=default_args,
description="Monthly Ofsted MI extraction and transform",
schedule="0 2 1 * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "monthly"],
) as monthly_ofsted_dag:
extract_ofsted = BashOperator(
task_id="extract_ofsted",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ofsted target-postgres",
)
dbt_build_ofsted = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+",
)
sync_typesense_ofsted = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ───────────
with DAG(
dag_id="school_data_annual_ees",
default_args=default_args,
description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)",
schedule=None, # Triggered manually when new releases are published
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "annual"],
) as annual_ees_dag:
with TaskGroup("extract_ees") as extract_ees_group:
extract_ees = BashOperator(
task_id="extract_ees",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres",
)
dbt_build_ees = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_phonics+",
)
sync_typesense_ees = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ees_group >> dbt_build_ees >> sync_typesense_ees