""" School Data Pipeline — Airflow DAG Orchestrates the full ELT pipeline: Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache Schedule: - GIAS: Daily at 03:00 - Ofsted: 1st of month at 02:00 - EES datasets: Annual (triggered manually or on detected release) """ from __future__ import annotations from datetime import datetime, timedelta from airflow import DAG from airflow.utils.task_group import TaskGroup try: from airflow.providers.standard.operators.bash import BashOperator except ImportError: from airflow.operators.bash import BashOperator PIPELINE_DIR = "/opt/pipeline" MELTANO_BIN = "meltano" DBT_BIN = "dbt" default_args = { "owner": "school-compare", "depends_on_past": False, "email_on_failure": False, "retries": 1, "retry_delay": timedelta(minutes=5), } # ── Daily DAG (GIAS + downstream) ────────────────────────────────────── with DAG( dag_id="school_data_daily", default_args=default_args, description="Daily school data pipeline (GIAS extract → full transform)", schedule="0 3 * * *", start_date=datetime(2025, 1, 1), catchup=False, tags=["school-compare", "daily"], ) as daily_dag: with TaskGroup("extract") as extract_group: extract_gias = BashOperator( task_id="extract_gias", bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-gias target-postgres", ) validate_raw = BashOperator( task_id="validate_raw", bash_command=f""" cd {PIPELINE_DIR} && python -c " import psycopg2, os, sys conn = psycopg2.connect( host=os.environ.get('PG_HOST', 'localhost'), port=os.environ.get('PG_PORT', '5432'), user=os.environ.get('PG_USER', 'postgres'), password=os.environ.get('PG_PASSWORD', 'postgres'), dbname=os.environ.get('PG_DATABASE', 'school_compare'), ) cur = conn.cursor() cur.execute('SELECT count(*) FROM raw.gias_establishments') count = cur.fetchone()[0] conn.close() if count < 20000: print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr) sys.exit(1) print(f'Validation passed: {{count}} GIAS rows') " """, ) dbt_build = BashOperator( task_id="dbt_build", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", ) dbt_test = BashOperator( task_id="dbt_test", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", ) geocode_new = BashOperator( task_id="geocode_new", bash_command=f"cd {PIPELINE_DIR} && python scripts/geocode_postcodes.py", ) sync_typesense = BashOperator( task_id="sync_typesense", bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", ) extract_group >> validate_raw >> dbt_build >> dbt_test >> geocode_new >> sync_typesense # ── Monthly DAG (Ofsted) ─────────────────────────────────────────────── with DAG( dag_id="school_data_monthly_ofsted", default_args=default_args, description="Monthly Ofsted MI extraction and transform", schedule="0 2 1 * *", start_date=datetime(2025, 1, 1), catchup=False, tags=["school-compare", "monthly"], ) as monthly_ofsted_dag: extract_ofsted = BashOperator( task_id="extract_ofsted", bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ofsted target-postgres", ) dbt_build_ofsted = BashOperator( task_id="dbt_build", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+", ) sync_typesense_ofsted = BashOperator( task_id="sync_typesense", bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", ) extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted # ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ─────────── with DAG( dag_id="school_data_annual_ees", default_args=default_args, description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)", schedule=None, # Triggered manually when new releases are published start_date=datetime(2025, 1, 1), catchup=False, tags=["school-compare", "annual"], ) as annual_ees_dag: with TaskGroup("extract_ees") as extract_ees_group: extract_ees = BashOperator( task_id="extract_ees", bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres", ) dbt_build_ees = BashOperator( task_id="dbt_build", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", ) dbt_test_ees = BashOperator( task_id="dbt_test", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", ) sync_typesense_ees = BashOperator( task_id="sync_typesense", bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", ) extract_ees_group >> dbt_build_ees >> dbt_test_ees >> sync_typesense_ees