feat(pipeline): add Meltano + dbt + Airflow ELT pipeline scaffold
Replaces the hand-rolled integrator with a production-grade ELT pipeline
using Meltano (Singer taps), dbt Core (medallion architecture), and
Apache Airflow (orchestration). Adds Typesense for search and PostGIS
for geospatial queries.
- 6 custom Singer taps (GIAS, EES, Ofsted, Parent View, FBIT, IDACI)
- dbt project: 12 staging, 5 intermediate, 12 mart models
- 3 Airflow DAGs (daily/monthly/annual schedules)
- Typesense sync + batch geocoding scripts
- docker-compose: add Airflow, Typesense; upgrade to PostGIS
- Portainer stack definition matching live deployment topology
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 08:37:53 +00:00
|
|
|
"""
|
|
|
|
|
School Data Pipeline — Airflow DAG
|
|
|
|
|
|
|
|
|
|
Orchestrates the full ELT pipeline:
|
|
|
|
|
Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache
|
|
|
|
|
|
|
|
|
|
Schedule:
|
|
|
|
|
- GIAS: Daily at 03:00
|
|
|
|
|
- Ofsted: 1st of month at 02:00
|
|
|
|
|
- EES datasets: Annual (triggered manually or on detected release)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
|
|
|
|
from airflow import DAG
|
|
|
|
|
from airflow.utils.task_group import TaskGroup
|
|
|
|
|
|
2026-03-26 10:47:18 +00:00
|
|
|
try:
|
|
|
|
|
from airflow.providers.standard.operators.bash import BashOperator
|
|
|
|
|
except ImportError:
|
|
|
|
|
from airflow.operators.bash import BashOperator
|
|
|
|
|
|
feat(pipeline): add Meltano + dbt + Airflow ELT pipeline scaffold
Replaces the hand-rolled integrator with a production-grade ELT pipeline
using Meltano (Singer taps), dbt Core (medallion architecture), and
Apache Airflow (orchestration). Adds Typesense for search and PostGIS
for geospatial queries.
- 6 custom Singer taps (GIAS, EES, Ofsted, Parent View, FBIT, IDACI)
- dbt project: 12 staging, 5 intermediate, 12 mart models
- 3 Airflow DAGs (daily/monthly/annual schedules)
- Typesense sync + batch geocoding scripts
- docker-compose: add Airflow, Typesense; upgrade to PostGIS
- Portainer stack definition matching live deployment topology
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 08:37:53 +00:00
|
|
|
PIPELINE_DIR = "/opt/pipeline"
|
2026-03-26 10:47:18 +00:00
|
|
|
MELTANO_BIN = "meltano"
|
|
|
|
|
DBT_BIN = "dbt"
|
feat(pipeline): add Meltano + dbt + Airflow ELT pipeline scaffold
Replaces the hand-rolled integrator with a production-grade ELT pipeline
using Meltano (Singer taps), dbt Core (medallion architecture), and
Apache Airflow (orchestration). Adds Typesense for search and PostGIS
for geospatial queries.
- 6 custom Singer taps (GIAS, EES, Ofsted, Parent View, FBIT, IDACI)
- dbt project: 12 staging, 5 intermediate, 12 mart models
- 3 Airflow DAGs (daily/monthly/annual schedules)
- Typesense sync + batch geocoding scripts
- docker-compose: add Airflow, Typesense; upgrade to PostGIS
- Portainer stack definition matching live deployment topology
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 08:37:53 +00:00
|
|
|
|
|
|
|
|
default_args = {
|
|
|
|
|
"owner": "school-compare",
|
|
|
|
|
"depends_on_past": False,
|
|
|
|
|
"email_on_failure": False,
|
|
|
|
|
"retries": 1,
|
|
|
|
|
"retry_delay": timedelta(minutes=5),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Daily DAG (GIAS + downstream) ──────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
with DAG(
|
|
|
|
|
dag_id="school_data_daily",
|
|
|
|
|
default_args=default_args,
|
|
|
|
|
description="Daily school data pipeline (GIAS extract → full transform)",
|
|
|
|
|
schedule="0 3 * * *",
|
|
|
|
|
start_date=datetime(2025, 1, 1),
|
|
|
|
|
catchup=False,
|
|
|
|
|
tags=["school-compare", "daily"],
|
|
|
|
|
) as daily_dag:
|
|
|
|
|
|
|
|
|
|
with TaskGroup("extract") as extract_group:
|
|
|
|
|
extract_gias = BashOperator(
|
|
|
|
|
task_id="extract_gias",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-gias target-postgres",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
validate_raw = BashOperator(
|
|
|
|
|
task_id="validate_raw",
|
|
|
|
|
bash_command=f"""
|
|
|
|
|
cd {PIPELINE_DIR} && python -c "
|
|
|
|
|
import psycopg2, os, sys
|
|
|
|
|
conn = psycopg2.connect(
|
|
|
|
|
host=os.environ.get('PG_HOST', 'localhost'),
|
|
|
|
|
port=os.environ.get('PG_PORT', '5432'),
|
|
|
|
|
user=os.environ.get('PG_USER', 'postgres'),
|
|
|
|
|
password=os.environ.get('PG_PASSWORD', 'postgres'),
|
|
|
|
|
dbname=os.environ.get('PG_DATABASE', 'school_compare'),
|
|
|
|
|
)
|
|
|
|
|
cur = conn.cursor()
|
|
|
|
|
cur.execute('SELECT count(*) FROM raw.gias_establishments')
|
|
|
|
|
count = cur.fetchone()[0]
|
|
|
|
|
conn.close()
|
|
|
|
|
if count < 20000:
|
|
|
|
|
print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
print(f'Validation passed: {{count}} GIAS rows')
|
|
|
|
|
"
|
|
|
|
|
""",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dbt_build = BashOperator(
|
|
|
|
|
task_id="dbt_build",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dbt_test = BashOperator(
|
|
|
|
|
task_id="dbt_test",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
geocode_new = BashOperator(
|
|
|
|
|
task_id="geocode_new",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/geocode_postcodes.py",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
sync_typesense = BashOperator(
|
|
|
|
|
task_id="sync_typesense",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
extract_group >> validate_raw >> dbt_build >> dbt_test >> geocode_new >> sync_typesense
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Monthly DAG (Ofsted) ───────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
with DAG(
|
|
|
|
|
dag_id="school_data_monthly_ofsted",
|
|
|
|
|
default_args=default_args,
|
|
|
|
|
description="Monthly Ofsted MI extraction and transform",
|
|
|
|
|
schedule="0 2 1 * *",
|
|
|
|
|
start_date=datetime(2025, 1, 1),
|
|
|
|
|
catchup=False,
|
|
|
|
|
tags=["school-compare", "monthly"],
|
|
|
|
|
) as monthly_ofsted_dag:
|
|
|
|
|
|
|
|
|
|
extract_ofsted = BashOperator(
|
|
|
|
|
task_id="extract_ofsted",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-ofsted target-postgres",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dbt_build_ofsted = BashOperator(
|
|
|
|
|
task_id="dbt_build",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
sync_typesense_ofsted = BashOperator(
|
|
|
|
|
task_id="sync_typesense",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ───────────
|
|
|
|
|
|
|
|
|
|
with DAG(
|
|
|
|
|
dag_id="school_data_annual_ees",
|
|
|
|
|
default_args=default_args,
|
|
|
|
|
description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)",
|
|
|
|
|
schedule=None, # Triggered manually when new releases are published
|
|
|
|
|
start_date=datetime(2025, 1, 1),
|
|
|
|
|
catchup=False,
|
|
|
|
|
tags=["school-compare", "annual"],
|
|
|
|
|
) as annual_ees_dag:
|
|
|
|
|
|
|
|
|
|
with TaskGroup("extract_ees") as extract_ees_group:
|
|
|
|
|
extract_ees = BashOperator(
|
|
|
|
|
task_id="extract_ees",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-ees target-postgres",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dbt_build_ees = BashOperator(
|
|
|
|
|
task_id="dbt_build",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
dbt_test_ees = BashOperator(
|
|
|
|
|
task_id="dbt_test",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
sync_typesense_ees = BashOperator(
|
|
|
|
|
task_id="sync_typesense",
|
|
|
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
extract_ees_group >> dbt_build_ees >> dbt_test_ees >> sync_typesense_ees
|