Files
school_compare/pipeline/dags/school_data_pipeline.py
Tudor ca351e9d73 feat: migrate backend to marts schema, update EES tap for verified datasets
Pipeline:
- EES tap: split KS4 into performance + info streams, fix admissions filename
  (SchoolLevel keyword match), fix census filename (yearly suffix), remove
  phonics (no school-level data on EES), change endswith → in for matching
- stg_ees_ks4: rewrite to filter long-format data and extract Attainment 8,
  Progress 8, EBacc, English/Maths metrics; join KS4 info for context
- stg_ees_admissions: map real CSV columns (total_number_places_offered, etc.)
- stg_ees_census: update source reference, stub with TODO for data columns
- Remove stg_ees_phonics, fact_phonics (no school-level EES data)
- Add ees_ks4_performance + ees_ks4_info sources, remove ees_ks4 + ees_phonics
- Update int_ks4_with_lineage + fact_ks4_performance with new KS4 columns
- Annual EES DAG: remove stg_ees_phonics+ from selector

Backend:
- models.py: replace all models to point at marts.* tables with schema='marts'
  (DimSchool, DimLocation, KS2Performance, FactOfstedInspection, etc.)
- data_loader.py: rewrite load_school_data_as_dataframe() using raw SQL joining
  dim_school + dim_location + fact_ks2_performance; update get_supplementary_data()
- database.py: remove migration machinery, keep only connection setup
- app.py: remove check_and_migrate_if_needed, remove /api/admin/reimport-ks2
  endpoints (pipeline handles all imports)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 09:29:27 +00:00

152 lines
5.0 KiB
Python

"""
School Data Pipeline — Airflow DAG
Orchestrates the full ELT pipeline:
Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache
Schedule:
- GIAS: Daily at 03:00
- Ofsted: 1st of month at 02:00
- EES datasets: Annual (triggered manually or on detected release)
"""
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.task_group import TaskGroup
try:
from airflow.providers.standard.operators.bash import BashOperator
except ImportError:
from airflow.operators.bash import BashOperator
PIPELINE_DIR = "/opt/pipeline"
MELTANO_BIN = "meltano"
DBT_BIN = "dbt"
default_args = {
"owner": "school-compare",
"depends_on_past": False,
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# ── Daily DAG (GIAS + downstream) ──────────────────────────────────────
with DAG(
dag_id="school_data_daily",
default_args=default_args,
description="Daily school data pipeline (GIAS extract → full transform)",
schedule="0 3 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "daily"],
) as daily_dag:
with TaskGroup("extract") as extract_group:
extract_gias = BashOperator(
task_id="extract_gias",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-gias target-postgres",
)
validate_raw = BashOperator(
task_id="validate_raw",
bash_command=f"""
cd {PIPELINE_DIR} && python -c "
import psycopg2, os, sys
conn = psycopg2.connect(
host=os.environ.get('PG_HOST', 'localhost'),
port=os.environ.get('PG_PORT', '5432'),
user=os.environ.get('PG_USER', 'postgres'),
password=os.environ.get('PG_PASSWORD', 'postgres'),
dbname=os.environ.get('PG_DATABASE', 'school_compare'),
)
cur = conn.cursor()
cur.execute('SELECT count(*) FROM raw.gias_establishments')
count = cur.fetchone()[0]
conn.close()
if count < 20000:
print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr)
sys.exit(1)
print(f'Validation passed: {{count}} GIAS rows')
"
""",
)
dbt_build = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_gias_establishments+ stg_gias_links+ --exclude int_ks2_with_lineage+ int_ks4_with_lineage+",
)
sync_typesense = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_group >> validate_raw >> dbt_build >> sync_typesense
# ── Monthly DAG (Ofsted) ───────────────────────────────────────────────
with DAG(
dag_id="school_data_monthly_ofsted",
default_args=default_args,
description="Monthly Ofsted MI extraction and transform",
schedule="0 2 1 * *",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "monthly"],
) as monthly_ofsted_dag:
extract_ofsted = BashOperator(
task_id="extract_ofsted",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ofsted target-postgres",
)
dbt_build_ofsted = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+",
)
sync_typesense_ofsted = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
# ── Annual DAG (EES: KS2, KS4, Census, Admissions) ───────────────────
with DAG(
dag_id="school_data_annual_ees",
default_args=default_args,
description="Annual EES data extraction (KS2, KS4, Census, Admissions)",
schedule=None, # Triggered manually when new releases are published
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["school-compare", "annual"],
) as annual_ees_dag:
with TaskGroup("extract_ees") as extract_ees_group:
extract_ees = BashOperator(
task_id="extract_ees",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres",
)
dbt_build_ees = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+",
)
sync_typesense_ees = BashOperator(
task_id="sync_typesense",
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
)
extract_ees_group >> dbt_build_ees >> sync_typesense_ees