Files
school_compare/pipeline/dags/school_data_pipeline.py
Tudor c576bba06a
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 34s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m9s
Build and Push Docker Images / Build Integrator (push) Successful in 57s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m26s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
fix(meltano): remove catalog capability and switch elt to run
The `catalog` capability forced Meltano to run --discover and generate
a catalog file (tap.properties.json) before each extraction. This fails
because our Singer SDK taps emit schemas inline and don't need external
catalog files. Removing the capability makes Meltano invoke taps
directly without catalog generation.

Also switch from deprecated `meltano elt` to `meltano run` for
Meltano 4.x compatibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 13:45:23 +00:00

167 lines
5.3 KiB
Python

"""
School Data Pipeline — Airflow DAG
Orchestrates the full ELT pipeline:
Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache
Schedule:
- GIAS: Daily at 03:00
- Ofsted: 1st of month at 02:00
- EES datasets: Annual (triggered manually or on detected release)
"""
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.task_group import TaskGroup
try:
from airflow.providers.standard.operators.bash import BashOperator
except ImportError:
from airflow.operators.bash import BashOperator
PIPELINE_DIR = "/opt/pipeline"
MELTANO_BIN = "meltano"
DBT_BIN = "dbt"
default_args = {
"owner": "school-compare",
"depends_on_past": False,
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# ── Daily DAG (GIAS + downstream) ──────────────────────────────────────
with DAG(
dag_id="school_data_daily",
default_args=default_args,
description="Daily school data pipeline (GIAS extract → full transform)",
schedule="0 3 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "daily"],
) as daily_dag:
with TaskGroup("extract") as extract_group:
extract_gias = BashOperator(
task_id="extract_gias",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-gias target-postgres",
)
validate_raw = BashOperator(
task_id="validate_raw",
bash_command=f"""
cd {PIPELINE_DIR} && python -c "
import psycopg2, os, sys
conn = psycopg2.connect(
host=os.environ.get('PG_HOST', 'localhost'),
port=os.environ.get('PG_PORT', '5432'),
user=os.environ.get('PG_USER', 'postgres'),
password=os.environ.get('PG_PASSWORD', 'postgres'),
dbname=os.environ.get('PG_DATABASE', 'school_compare'),
)
cur = conn.cursor()
cur.execute('SELECT count(*) FROM raw.gias_establishments')
count = cur.fetchone()[0]
conn.close()
if count < 20000:
print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr)
sys.exit(1)
print(f'Validation passed: {{count}} GIAS rows')
"
""",
)
dbt_build = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production",
)
geocode_new = BashOperator(
task_id="geocode_new",
bash_command=f"cd {PIPELINE_DIR} && python scripts/geocode_postcodes.py",
)
sync_typesense = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_group >> validate_raw >> dbt_build >> dbt_test >> geocode_new >> sync_typesense
# ── Monthly DAG (Ofsted) ───────────────────────────────────────────────
with DAG(
dag_id="school_data_monthly_ofsted",
default_args=default_args,
description="Monthly Ofsted MI extraction and transform",
schedule="0 2 1 * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "monthly"],
) as monthly_ofsted_dag:
extract_ofsted = BashOperator(
task_id="extract_ofsted",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ofsted target-postgres",
)
dbt_build_ofsted = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+",
)
sync_typesense_ofsted = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ───────────
with DAG(
dag_id="school_data_annual_ees",
default_args=default_args,
description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)",
schedule=None, # Triggered manually when new releases are published
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "annual"],
) as annual_ees_dag:
with TaskGroup("extract_ees") as extract_ees_group:
extract_ees = BashOperator(
task_id="extract_ees",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres",
)
dbt_build_ees = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production",
)
dbt_test_ees = BashOperator(
task_id="dbt_test",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production",
)
sync_typesense_ees = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ees_group >> dbt_build_ees >> dbt_test_ees >> sync_typesense_ees