Pipeline: - EES tap: split KS4 into performance + info streams, fix admissions filename (SchoolLevel keyword match), fix census filename (yearly suffix), remove phonics (no school-level data on EES), change endswith → in for matching - stg_ees_ks4: rewrite to filter long-format data and extract Attainment 8, Progress 8, EBacc, English/Maths metrics; join KS4 info for context - stg_ees_admissions: map real CSV columns (total_number_places_offered, etc.) - stg_ees_census: update source reference, stub with TODO for data columns - Remove stg_ees_phonics, fact_phonics (no school-level EES data) - Add ees_ks4_performance + ees_ks4_info sources, remove ees_ks4 + ees_phonics - Update int_ks4_with_lineage + fact_ks4_performance with new KS4 columns - Annual EES DAG: remove stg_ees_phonics+ from selector Backend: - models.py: replace all models to point at marts.* tables with schema='marts' (DimSchool, DimLocation, KS2Performance, FactOfstedInspection, etc.) - data_loader.py: rewrite load_school_data_as_dataframe() using raw SQL joining dim_school + dim_location + fact_ks2_performance; update get_supplementary_data() - database.py: remove migration machinery, keep only connection setup - app.py: remove check_and_migrate_if_needed, remove /api/admin/reimport-ks2 endpoints (pipeline handles all imports) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
152 lines
5.0 KiB
Python
152 lines
5.0 KiB
Python
"""
|
|
School Data Pipeline — Airflow DAG
|
|
|
|
Orchestrates the full ELT pipeline:
|
|
Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache
|
|
|
|
Schedule:
|
|
- GIAS: Daily at 03:00
|
|
- Ofsted: 1st of month at 02:00
|
|
- EES datasets: Annual (triggered manually or on detected release)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from airflow import DAG
|
|
from airflow.utils.task_group import TaskGroup
|
|
|
|
try:
|
|
from airflow.providers.standard.operators.bash import BashOperator
|
|
except ImportError:
|
|
from airflow.operators.bash import BashOperator
|
|
|
|
PIPELINE_DIR = "/opt/pipeline"
|
|
MELTANO_BIN = "meltano"
|
|
DBT_BIN = "dbt"
|
|
|
|
default_args = {
|
|
"owner": "school-compare",
|
|
"depends_on_past": False,
|
|
"email_on_failure": False,
|
|
"retries": 1,
|
|
"retry_delay": timedelta(minutes=5),
|
|
}
|
|
|
|
|
|
# ── Daily DAG (GIAS + downstream) ──────────────────────────────────────
|
|
|
|
with DAG(
|
|
dag_id="school_data_daily",
|
|
default_args=default_args,
|
|
description="Daily school data pipeline (GIAS extract → full transform)",
|
|
schedule="0 3 * * *",
|
|
start_date=datetime(2025, 1, 1),
|
|
catchup=False,
|
|
tags=["school-compare", "daily"],
|
|
) as daily_dag:
|
|
|
|
with TaskGroup("extract") as extract_group:
|
|
extract_gias = BashOperator(
|
|
task_id="extract_gias",
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-gias target-postgres",
|
|
)
|
|
|
|
validate_raw = BashOperator(
|
|
task_id="validate_raw",
|
|
bash_command=f"""
|
|
cd {PIPELINE_DIR} && python -c "
|
|
import psycopg2, os, sys
|
|
conn = psycopg2.connect(
|
|
host=os.environ.get('PG_HOST', 'localhost'),
|
|
port=os.environ.get('PG_PORT', '5432'),
|
|
user=os.environ.get('PG_USER', 'postgres'),
|
|
password=os.environ.get('PG_PASSWORD', 'postgres'),
|
|
dbname=os.environ.get('PG_DATABASE', 'school_compare'),
|
|
)
|
|
cur = conn.cursor()
|
|
cur.execute('SELECT count(*) FROM raw.gias_establishments')
|
|
count = cur.fetchone()[0]
|
|
conn.close()
|
|
if count < 20000:
|
|
print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f'Validation passed: {{count}} GIAS rows')
|
|
"
|
|
""",
|
|
)
|
|
|
|
dbt_build = BashOperator(
|
|
task_id="dbt_build",
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_gias_establishments+ stg_gias_links+ --exclude int_ks2_with_lineage+ int_ks4_with_lineage+",
|
|
)
|
|
|
|
sync_typesense = BashOperator(
|
|
task_id="sync_typesense",
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
)
|
|
|
|
extract_group >> validate_raw >> dbt_build >> sync_typesense
|
|
|
|
|
|
# ── Monthly DAG (Ofsted) ───────────────────────────────────────────────
|
|
|
|
with DAG(
|
|
dag_id="school_data_monthly_ofsted",
|
|
default_args=default_args,
|
|
description="Monthly Ofsted MI extraction and transform",
|
|
schedule="0 2 1 * *",
|
|
start_date=datetime(2025, 1, 1),
|
|
catchup=False,
|
|
tags=["school-compare", "monthly"],
|
|
) as monthly_ofsted_dag:
|
|
|
|
extract_ofsted = BashOperator(
|
|
task_id="extract_ofsted",
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ofsted target-postgres",
|
|
)
|
|
|
|
dbt_build_ofsted = BashOperator(
|
|
task_id="dbt_build",
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+",
|
|
)
|
|
|
|
sync_typesense_ofsted = BashOperator(
|
|
task_id="sync_typesense",
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
)
|
|
|
|
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
|
|
|
|
|
|
# ── Annual DAG (EES: KS2, KS4, Census, Admissions) ───────────────────
|
|
|
|
with DAG(
|
|
dag_id="school_data_annual_ees",
|
|
default_args=default_args,
|
|
description="Annual EES data extraction (KS2, KS4, Census, Admissions)",
|
|
schedule=None, # Triggered manually when new releases are published
|
|
start_date=datetime(2025, 1, 1),
|
|
catchup=False,
|
|
tags=["school-compare", "annual"],
|
|
) as annual_ees_dag:
|
|
|
|
with TaskGroup("extract_ees") as extract_ees_group:
|
|
extract_ees = BashOperator(
|
|
task_id="extract_ees",
|
|
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres",
|
|
)
|
|
|
|
dbt_build_ees = BashOperator(
|
|
task_id="dbt_build",
|
|
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+",
|
|
)
|
|
|
|
sync_typesense_ees = BashOperator(
|
|
task_id="sync_typesense",
|
|
bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py",
|
|
)
|
|
|
|
extract_ees_group >> dbt_build_ees >> sync_typesense_ees
|