diff --git a/docker-compose.portainer.yml b/docker-compose.portainer.yml new file mode 100644 index 0000000..5bfc35e --- /dev/null +++ b/docker-compose.portainer.yml @@ -0,0 +1,289 @@ +# Portainer Stack Definition for School Compare +# +# Portainer environment variables (set in Portainer UI -> Stack -> Environment): +# DB_USERNAME — PostgreSQL username +# DB_PASSWORD — PostgreSQL password +# DB_DATABASE_NAME — PostgreSQL database name +# ADMIN_API_KEY — Backend admin API key +# TYPESENSE_API_KEY — Typesense admin API key +# TYPESENSE_SEARCH_KEY — Typesense search-only key (exposed to frontend) +# AIRFLOW_ADMIN_PASSWORD — Airflow web UI admin password +# KESTRA_USER — Kestra UI username (optional) +# KESTRA_PASSWORD — Kestra UI password (optional) + +services: + + # ── PostgreSQL ──────────────────────────────────────────────────────── + sc_database: + container_name: sc_postgres + image: postgis/postgis:18-3.6-alpine + environment: + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_USER: ${DB_USERNAME} + POSTGRES_DB: ${DB_DATABASE_NAME} + volumes: + - postgres_data:/var/lib/postgresql + shm_size: 128mb + networks: + backend: {} + macvlan: + ipv4_address: 10.0.1.189 + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + # ── FastAPI Backend ─────────────────────────────────────────────────── + backend: + image: privaterepo.sitaru.org/tudor/school_compare-backend:latest + container_name: schoolcompare_backend + environment: + DATABASE_URL: postgresql://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} + PYTHONUNBUFFERED: 1 + ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme} + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:80/api/data-info"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 30s + + # ── Next.js Frontend ────────────────────────────────────────────────── + frontend: + image: privaterepo.sitaru.org/tudor/school_compare-frontend:latest + container_name: schoolcompare_nextjs + environment: + - NODE_ENV=production + - NEXT_PUBLIC_API_URL=http://localhost:8000/api + - FASTAPI_URL=http://backend:80/api + - TYPESENSE_URL=http://typesense:8108 + - TYPESENSE_API_KEY=${TYPESENSE_SEARCH_KEY:-changeme} + depends_on: + backend: + condition: service_healthy + networks: + backend: {} + macvlan: + ipv4_address: 10.0.1.150 + restart: unless-stopped + healthcheck: + test: ["CMD", "node", "-e", "require('http').get('http://localhost:3000/', (r) => {process.exit(r.statusCode === 200 ? 0 : 1)})"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + # ── Typesense Search Engine ─────────────────────────────────────────── + typesense: + image: typesense/typesense:27.1 + container_name: schoolcompare_typesense + environment: + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} + TYPESENSE_DATA_DIR: /data + volumes: + - typesense_data:/data + networks: + - backend + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8108/health"] + interval: 15s + timeout: 5s + retries: 5 + start_period: 10s + + # ── Kestra — workflow orchestrator (legacy, kept during migration) ──── + kestra: + image: kestra/kestra:latest + container_name: schoolcompare_kestra + command: server standalone + ports: + - "8090:8080" + volumes: + - kestra_storage:/app/storage + environment: + KESTRA_CONFIGURATION: | + datasources: + postgres: + url: jdbc:postgresql://sc_database:5432/kestra + driverClassName: org.postgresql.Driver + username: ${DB_USERNAME} + password: ${DB_PASSWORD} + kestra: + repository: + type: postgres + queue: + type: postgres + storage: + type: local + local: + base-path: /app/storage + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:8081/health | grep -q '\"status\":\"UP\"'"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 60s + + # ── Kestra init (legacy, kept during migration) ────────────────────── + kestra-init: + image: privaterepo.sitaru.org/tudor/school_compare-kestra-init:latest + container_name: schoolcompare_kestra_init + environment: + KESTRA_URL: http://kestra:8080 + KESTRA_USER: ${KESTRA_USER:-} + KESTRA_PASSWORD: ${KESTRA_PASSWORD:-} + depends_on: + kestra: + condition: service_healthy + networks: + - backend + restart: "no" + + # ── Data integrator (legacy, kept during migration) ────────────────── + integrator: + image: privaterepo.sitaru.org/tudor/school_compare-integrator:latest + container_name: schoolcompare_integrator + ports: + - "8001:8001" + environment: + DATABASE_URL: postgresql://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} + DATA_DIR: /data + BACKEND_URL: http://backend:80 + ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme} + PYTHONUNBUFFERED: 1 + volumes: + - supplementary_data:/data + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8001/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 15s + + # ── Airflow Webserver (UI at :8080) ────────────────────────────────── + airflow-webserver: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_webserver + command: airflow webserver --port 8080 + ports: + - "8080:8080" + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} + AIRFLOW__CORE__DAGS_FOLDER: /opt/pipeline/dags + AIRFLOW__CORE__LOAD_EXAMPLES: "false" + AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "false" + PG_HOST: sc_database + PG_PORT: "5432" + PG_USER: ${DB_USERNAME} + PG_PASSWORD: ${DB_PASSWORD} + PG_DATABASE: ${DB_DATABASE_NAME} + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} + volumes: + - airflow_dags:/opt/pipeline/dags:ro + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 60s + + # ── Airflow Scheduler ──────────────────────────────────────────────── + airflow-scheduler: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_scheduler + command: airflow scheduler + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} + AIRFLOW__CORE__DAGS_FOLDER: /opt/pipeline/dags + AIRFLOW__CORE__LOAD_EXAMPLES: "false" + AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "false" + PG_HOST: sc_database + PG_PORT: "5432" + PG_USER: ${DB_USERNAME} + PG_PASSWORD: ${DB_PASSWORD} + PG_DATABASE: ${DB_DATABASE_NAME} + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} + volumes: + - airflow_dags:/opt/pipeline/dags:ro + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: unless-stopped + + # ── Airflow DB Init (one-shot) ─────────────────────────────────────── + airflow-init: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_init + command: > + bash -c " + airflow db migrate && + airflow users create + --username admin + --password $${AIRFLOW_ADMIN_PASSWORD:-admin} + --firstname Admin + --lastname User + --role Admin + --email admin@localhost || true + " + environment: + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} + AIRFLOW__CORE__DAGS_FOLDER: /opt/pipeline/dags + AIRFLOW__CORE__LOAD_EXAMPLES: "false" + AIRFLOW_ADMIN_PASSWORD: ${AIRFLOW_ADMIN_PASSWORD:-admin} + depends_on: + sc_database: + condition: service_healthy + networks: + - backend + restart: "no" + +networks: + backend: + driver: bridge + macvlan: + external: + name: macvlan + +volumes: + postgres_data: + kestra_storage: + supplementary_data: + typesense_data: + airflow_dags: diff --git a/docker-compose.yml b/docker-compose.yml index cc6b4e8..cba4b9c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,9 @@ version: '3.8' services: - # PostgreSQL Database + # PostgreSQL Database with PostGIS db: - image: postgres:16-alpine + image: postgis/postgis:16-3.4-alpine container_name: schoolcompare_db environment: POSTGRES_USER: schoolcompare @@ -33,6 +33,8 @@ services: DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare PYTHONUNBUFFERED: 1 ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme} + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} volumes: - ./data:/app/data:ro depends_on: @@ -58,6 +60,8 @@ services: NODE_ENV: production NEXT_PUBLIC_API_URL: http://localhost:8000/api FASTAPI_URL: http://backend:80/api + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_SEARCH_KEY:-changeme} depends_on: backend: condition: service_healthy @@ -71,32 +75,49 @@ services: retries: 3 start_period: 40s - # Kestra — workflow orchestrator (UI at http://localhost:8080) - kestra: - image: kestra/kestra:latest - container_name: schoolcompare_kestra - command: server standalone + # Typesense — search engine + typesense: + image: typesense/typesense:27.1 + container_name: schoolcompare_typesense + ports: + - "8108:8108" + environment: + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} + TYPESENSE_DATA_DIR: /data + volumes: + - typesense_data:/data + networks: + - schoolcompare-network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8108/health"] + interval: 15s + timeout: 5s + retries: 5 + start_period: 10s + + # Apache Airflow — workflow orchestrator (UI at http://localhost:8080) + airflow-webserver: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_webserver + command: airflow webserver --port 8080 ports: - "8080:8080" + environment: &airflow-env + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://schoolcompare:schoolcompare@db:5432/schoolcompare + AIRFLOW__CORE__DAGS_FOLDER: /opt/pipeline/dags + AIRFLOW__CORE__LOAD_EXAMPLES: "false" + AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "false" + PG_HOST: db + PG_PORT: "5432" + PG_USER: schoolcompare + PG_PASSWORD: schoolcompare + PG_DATABASE: schoolcompare + TYPESENSE_URL: http://typesense:8108 + TYPESENSE_API_KEY: ${TYPESENSE_API_KEY:-changeme} volumes: - - kestra_storage:/app/storage - environment: - KESTRA_CONFIGURATION: | - datasources: - postgres: - url: jdbc:postgresql://db:5432/kestra - driverClassName: org.postgresql.Driver - username: schoolcompare - password: schoolcompare - kestra: - repository: - type: postgres - queue: - type: postgres - storage: - type: local - local: - base-path: /app/storage + - ./pipeline/dags:/opt/pipeline/dags:ro depends_on: db: condition: service_healthy @@ -104,53 +125,42 @@ services: - schoolcompare-network restart: unless-stopped healthcheck: - test: ["CMD-SHELL", "curl -sf http://localhost:8081/health | grep -q '\"status\":\"UP\"'"] - interval: 15s - timeout: 10s - retries: 10 - start_period: 60s - - # One-shot container: imports flow YAMLs into Kestra after it's healthy - kestra-init: - image: privaterepo.sitaru.org/tudor/school_compare-kestra-init:latest - container_name: schoolcompare_kestra_init - environment: - KESTRA_URL: http://kestra:8080 - KESTRA_USER: ${KESTRA_USER:-} - KESTRA_PASSWORD: ${KESTRA_PASSWORD:-} - depends_on: - kestra: - condition: service_healthy - networks: - - schoolcompare-network - restart: no - - # Data integrator — Python microservice called by Kestra - integrator: - image: privaterepo.sitaru.org/tudor/school_compare-integrator:latest - container_name: schoolcompare_integrator - ports: - - "8001:8001" - environment: - DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare - DATA_DIR: /data - BACKEND_URL: http://backend:80 - ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme} - PYTHONUNBUFFERED: 1 - volumes: - - supplementary_data:/data - depends_on: - db: - condition: service_healthy - networks: - - schoolcompare-network - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8001/health"] + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] interval: 30s timeout: 10s - retries: 3 - start_period: 15s + retries: 5 + start_period: 60s + + airflow-scheduler: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_scheduler + command: airflow scheduler + environment: *airflow-env + volumes: + - ./pipeline/dags:/opt/pipeline/dags:ro + depends_on: + db: + condition: service_healthy + networks: + - schoolcompare-network + restart: unless-stopped + + # One-shot: initialise Airflow metadata DB + airflow-init: + image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest + container_name: schoolcompare_airflow_init + command: > + bash -c " + airflow db migrate && + airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@localhost || true + " + environment: *airflow-env + depends_on: + db: + condition: service_healthy + networks: + - schoolcompare-network + restart: "no" networks: schoolcompare-network: @@ -158,5 +168,4 @@ networks: volumes: postgres_data: - kestra_storage: - supplementary_data: + typesense_data: diff --git a/pipeline/Dockerfile b/pipeline/Dockerfile new file mode 100644 index 0000000..9562efd --- /dev/null +++ b/pipeline/Dockerfile @@ -0,0 +1,37 @@ +FROM python:3.12-slim + +WORKDIR /opt/pipeline + +# System dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Install custom Singer taps +COPY plugins/ plugins/ +RUN pip install --no-cache-dir \ + ./plugins/extractors/tap-uk-gias \ + ./plugins/extractors/tap-uk-ees \ + ./plugins/extractors/tap-uk-ofsted \ + ./plugins/extractors/tap-uk-parent-view \ + ./plugins/extractors/tap-uk-fbit \ + ./plugins/extractors/tap-uk-idaci + +# Copy pipeline code +COPY meltano.yml . +COPY transform/ transform/ +COPY scripts/ scripts/ +COPY dags/ dags/ + +# dbt deps +RUN cd transform && dbt deps --profiles-dir . 2>/dev/null || true + +ENV AIRFLOW_HOME=/opt/airflow +ENV PYTHONPATH=/opt/pipeline + +CMD ["airflow", "webserver"] diff --git a/pipeline/dags/school_data_pipeline.py b/pipeline/dags/school_data_pipeline.py new file mode 100644 index 0000000..1771ac9 --- /dev/null +++ b/pipeline/dags/school_data_pipeline.py @@ -0,0 +1,163 @@ +""" +School Data Pipeline — Airflow DAG + +Orchestrates the full ELT pipeline: + Extract (Meltano) → Validate → Transform (dbt) → Geocode → Sync Typesense → Invalidate Cache + +Schedule: + - GIAS: Daily at 03:00 + - Ofsted: 1st of month at 02:00 + - EES datasets: Annual (triggered manually or on detected release) +""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup + +PIPELINE_DIR = "/opt/pipeline" +MELTANO_BIN = f"{PIPELINE_DIR}/.venv/bin/meltano" +DBT_BIN = f"{PIPELINE_DIR}/.venv/bin/dbt" + +default_args = { + "owner": "school-compare", + "depends_on_past": False, + "email_on_failure": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + + +# ── Daily DAG (GIAS + downstream) ────────────────────────────────────── + +with DAG( + dag_id="school_data_daily", + default_args=default_args, + description="Daily school data pipeline (GIAS extract → full transform)", + schedule="0 3 * * *", + start_date=datetime(2025, 1, 1), + catchup=False, + tags=["school-compare", "daily"], +) as daily_dag: + + with TaskGroup("extract") as extract_group: + extract_gias = BashOperator( + task_id="extract_gias", + bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-gias target-postgres", + ) + + validate_raw = BashOperator( + task_id="validate_raw", + bash_command=f""" + cd {PIPELINE_DIR} && python -c " +import psycopg2, os, sys +conn = psycopg2.connect( + host=os.environ.get('PG_HOST', 'localhost'), + port=os.environ.get('PG_PORT', '5432'), + user=os.environ.get('PG_USER', 'postgres'), + password=os.environ.get('PG_PASSWORD', 'postgres'), + dbname=os.environ.get('PG_DATABASE', 'school_compare'), +) +cur = conn.cursor() +cur.execute('SELECT count(*) FROM raw.gias_establishments') +count = cur.fetchone()[0] +conn.close() +if count < 20000: + print(f'WARN: GIAS only has {{count}} rows, expected 60k+', file=sys.stderr) + sys.exit(1) +print(f'Validation passed: {{count}} GIAS rows') +" + """, + ) + + dbt_build = BashOperator( + task_id="dbt_build", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", + ) + + dbt_test = BashOperator( + task_id="dbt_test", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", + ) + + geocode_new = BashOperator( + task_id="geocode_new", + bash_command=f"cd {PIPELINE_DIR} && python scripts/geocode_postcodes.py", + ) + + sync_typesense = BashOperator( + task_id="sync_typesense", + bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", + ) + + extract_group >> validate_raw >> dbt_build >> dbt_test >> geocode_new >> sync_typesense + + +# ── Monthly DAG (Ofsted) ─────────────────────────────────────────────── + +with DAG( + dag_id="school_data_monthly_ofsted", + default_args=default_args, + description="Monthly Ofsted MI extraction and transform", + schedule="0 2 1 * *", + start_date=datetime(2025, 1, 1), + catchup=False, + tags=["school-compare", "monthly"], +) as monthly_ofsted_dag: + + extract_ofsted = BashOperator( + task_id="extract_ofsted", + bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-ofsted target-postgres", + ) + + dbt_build_ofsted = BashOperator( + task_id="dbt_build", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ofsted_inspections+ int_ofsted_latest+ fact_ofsted_inspection+ dim_school+", + ) + + sync_typesense_ofsted = BashOperator( + task_id="sync_typesense", + bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", + ) + + extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted + + +# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ─────────── + +with DAG( + dag_id="school_data_annual_ees", + default_args=default_args, + description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)", + schedule=None, # Triggered manually when new releases are published + start_date=datetime(2025, 1, 1), + catchup=False, + tags=["school-compare", "annual"], +) as annual_ees_dag: + + with TaskGroup("extract_ees") as extract_ees_group: + extract_ees = BashOperator( + task_id="extract_ees", + bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} elt tap-uk-ees target-postgres", + ) + + dbt_build_ees = BashOperator( + task_id="dbt_build", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production", + ) + + dbt_test_ees = BashOperator( + task_id="dbt_test", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} test --profiles-dir . --target production", + ) + + sync_typesense_ees = BashOperator( + task_id="sync_typesense", + bash_command=f"cd {PIPELINE_DIR} && python scripts/sync_typesense.py", + ) + + extract_ees_group >> dbt_build_ees >> dbt_test_ees >> sync_typesense_ees diff --git a/pipeline/meltano.yml b/pipeline/meltano.yml new file mode 100644 index 0000000..c546432 --- /dev/null +++ b/pipeline/meltano.yml @@ -0,0 +1,114 @@ +version: 1 +project_id: school-compare-pipeline + +plugins: + extractors: + - name: tap-uk-gias + namespace: uk_gias + pip_url: ./plugins/extractors/tap-uk-gias + executable: tap-uk-gias + capabilities: + - catalog + - state + settings: + - name: download_url + kind: string + description: GIAS bulk CSV download URL + value: https://ea-edubase-api-prod.azurewebsites.net/edubase/downloads/public/edubasealldata.csv + + - name: tap-uk-ees + namespace: uk_ees + pip_url: ./plugins/extractors/tap-uk-ees + executable: tap-uk-ees + capabilities: + - catalog + - state + settings: + - name: base_url + kind: string + value: https://content.explore-education-statistics.service.gov.uk/api/v1 + - name: datasets + kind: array + description: List of EES dataset configs to extract + + - name: tap-uk-ofsted + namespace: uk_ofsted + pip_url: ./plugins/extractors/tap-uk-ofsted + executable: tap-uk-ofsted + capabilities: + - catalog + - state + settings: + - name: mi_url + kind: string + description: Ofsted Management Information download URL + + - name: tap-uk-parent-view + namespace: uk_parent_view + pip_url: ./plugins/extractors/tap-uk-parent-view + executable: tap-uk-parent-view + capabilities: + - catalog + + - name: tap-uk-fbit + namespace: uk_fbit + pip_url: ./plugins/extractors/tap-uk-fbit + executable: tap-uk-fbit + capabilities: + - catalog + - state + settings: + - name: base_url + kind: string + value: https://financial-benchmarking-and-insights-tool.education.gov.uk/api + + - name: tap-uk-idaci + namespace: uk_idaci + pip_url: ./plugins/extractors/tap-uk-idaci + executable: tap-uk-idaci + capabilities: + - catalog + + loaders: + - name: target-postgres + variant: transferwise + pip_url: pipelinewise-target-postgres + config: + host: $PG_HOST + port: $PG_PORT + user: $PG_USER + password: $PG_PASSWORD + dbname: $PG_DATABASE + default_target_schema: raw + + utilities: + - name: dbt-postgres + variant: dbt-labs + pip_url: dbt-postgres~=1.8 + config: + project_dir: $MELTANO_PROJECT_ROOT/transform + profiles_dir: $MELTANO_PROJECT_ROOT/transform + +environments: + - name: dev + config: + plugins: + loaders: + - name: target-postgres + config: + host: localhost + port: 5432 + user: postgres + password: postgres + dbname: school_compare + - name: production + config: + plugins: + loaders: + - name: target-postgres + config: + host: ${PG_HOST} + port: ${PG_PORT} + user: ${PG_USER} + password: ${PG_PASSWORD} + dbname: ${PG_DATABASE} diff --git a/pipeline/plugins/extractors/tap-uk-ees/pyproject.toml b/pipeline/plugins/extractors/tap-uk-ees/pyproject.toml new file mode 100644 index 0000000..681766e --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ees/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-ees" +version = "0.1.0" +description = "Singer tap for UK Explore Education Statistics (KS2, KS4, Census, Admissions, Phonics)" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", + "pandas>=2.0", +] + +[project.scripts] +tap-uk-ees = "tap_uk_ees.tap:TapUKEES.cli" diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/__init__.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/__init__.py new file mode 100644 index 0000000..a2e0195 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/__init__.py @@ -0,0 +1 @@ +"""tap-uk-ees: Singer tap for Explore Education Statistics API.""" diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py new file mode 100644 index 0000000..2201caf --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -0,0 +1,154 @@ +"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data.""" + +from __future__ import annotations + +import io +import zipfile + +import requests +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + +CONTENT_API_BASE = ( + "https://content.explore-education-statistics.service.gov.uk/api" +) +STATS_API_BASE = "https://api.education.gov.uk/statistics/v1" +TIMEOUT = 120 + + +def get_content_release_id(publication_slug: str) -> str: + """Return the latest release ID via the EES content API.""" + url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases/latest" + resp = requests.get(url, timeout=TIMEOUT) + resp.raise_for_status() + return resp.json()["id"] + + +def download_release_zip(release_id: str) -> zipfile.ZipFile: + """Download all data files for a release as a ZIP.""" + url = f"{CONTENT_API_BASE}/releases/{release_id}/files" + resp = requests.get(url, timeout=300, stream=True) + resp.raise_for_status() + return zipfile.ZipFile(io.BytesIO(resp.content)) + + +class EESDatasetStream(Stream): + """Base stream for an EES dataset extracted from a release ZIP.""" + + replication_key = None + _publication_slug: str = "" + _file_keyword: str = "" + + def get_records(self, context): + import pandas as pd + + release_id = get_content_release_id(self._publication_slug) + self.logger.info( + "Downloading release %s for %s", + release_id, + self._publication_slug, + ) + zf = download_release_zip(release_id) + + # Find the CSV matching our keyword + csv_names = [n for n in zf.namelist() if n.endswith(".csv")] + target = None + for name in csv_names: + if self._file_keyword.lower() in name.lower(): + target = name + break + if not target and csv_names: + target = csv_names[0] + + if not target: + self.logger.warning("No CSV found in release ZIP") + return + + self.logger.info("Reading %s from ZIP", target) + with zf.open(target) as f: + df = pd.read_csv(f, dtype=str, keep_default_na=False) + + # Filter to school-level data + if "geographic_level" in df.columns: + df = df[df["geographic_level"] == "School"] + + for _, row in df.iterrows(): + yield row.to_dict() + + +class EESKS2Stream(EESDatasetStream): + name = "ees_ks2" + primary_keys = ["urn", "time_period"] + _publication_slug = "key-stage-2-attainment" + _file_keyword = "school" + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("time_period", th.StringType, required=True), + ).to_dict() + + +class EESKS4Stream(EESDatasetStream): + name = "ees_ks4" + primary_keys = ["urn", "time_period"] + _publication_slug = "key-stage-4-performance-revised" + _file_keyword = "school" + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("time_period", th.StringType, required=True), + ).to_dict() + + +class EESCensusStream(EESDatasetStream): + name = "ees_census" + primary_keys = ["urn", "time_period"] + _publication_slug = "school-pupils-and-their-characteristics" + _file_keyword = "school" + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("time_period", th.StringType, required=True), + ).to_dict() + + +class EESAdmissionsStream(EESDatasetStream): + name = "ees_admissions" + primary_keys = ["urn", "time_period"] + _publication_slug = "secondary-and-primary-school-applications-and-offers" + _file_keyword = "school" + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("time_period", th.StringType, required=True), + ).to_dict() + + +class EESPhonicsStream(EESDatasetStream): + name = "ees_phonics" + primary_keys = ["urn", "time_period"] + _publication_slug = "phonics-screening-check-and-key-stage-1-assessments" + _file_keyword = "school" + schema = th.PropertiesList( + th.Property("urn", th.StringType, required=True), + th.Property("time_period", th.StringType, required=True), + ).to_dict() + + +class TapUKEES(Tap): + """Singer tap for UK Explore Education Statistics.""" + + name = "tap-uk-ees" + + config_jsonschema = th.PropertiesList( + th.Property("base_url", th.StringType, description="EES API base URL"), + ).to_dict() + + def discover_streams(self): + return [ + EESKS2Stream(self), + EESKS4Stream(self), + EESCensusStream(self), + EESAdmissionsStream(self), + EESPhonicsStream(self), + ] + + +if __name__ == "__main__": + TapUKEES.cli() diff --git a/pipeline/plugins/extractors/tap-uk-fbit/pyproject.toml b/pipeline/plugins/extractors/tap-uk-fbit/pyproject.toml new file mode 100644 index 0000000..890479f --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-fbit/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-fbit" +version = "0.1.0" +description = "Singer tap for UK FBIT (Financial Benchmarking and Insights Tool)" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", +] + +[project.scripts] +tap-uk-fbit = "tap_uk_fbit.tap:TapUKFBIT.cli" diff --git a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/__init__.py b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/__init__.py new file mode 100644 index 0000000..56b660f --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/__init__.py @@ -0,0 +1 @@ +"""tap-uk-fbit: Singer tap for Financial Benchmarking and Insights Tool API.""" diff --git a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py new file mode 100644 index 0000000..887f016 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py @@ -0,0 +1,53 @@ +"""FBIT Singer tap — extracts financial data from the FBIT REST API.""" + +from __future__ import annotations + +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + + +class FBITFinanceStream(Stream): + """Stream: School financial benchmarking data.""" + + name = "fbit_finance" + primary_keys = ["urn", "year"] + replication_key = None + + schema = th.PropertiesList( + th.Property("urn", th.IntegerType, required=True), + th.Property("year", th.IntegerType, required=True), + th.Property("per_pupil_spend", th.NumberType), + th.Property("staff_cost_pct", th.NumberType), + th.Property("teacher_cost_pct", th.NumberType), + th.Property("support_staff_cost_pct", th.NumberType), + th.Property("premises_cost_pct", th.NumberType), + ).to_dict() + + def get_records(self, context): + # TODO: Implement FBIT API extraction + # The FBIT API requires per-URN requests with rate limiting. + # Implementation will batch URNs from dim_school and request + # financial data for each. + self.logger.warning("FBIT extraction not yet implemented") + return iter([]) + + +class TapUKFBIT(Tap): + """Singer tap for UK FBIT financial data.""" + + name = "tap-uk-fbit" + + config_jsonschema = th.PropertiesList( + th.Property( + "base_url", + th.StringType, + default="https://financial-benchmarking-and-insights-tool.education.gov.uk/api", + ), + ).to_dict() + + def discover_streams(self): + return [FBITFinanceStream(self)] + + +if __name__ == "__main__": + TapUKFBIT.cli() diff --git a/pipeline/plugins/extractors/tap-uk-gias/pyproject.toml b/pipeline/plugins/extractors/tap-uk-gias/pyproject.toml new file mode 100644 index 0000000..c748c85 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-gias/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-gias" +version = "0.1.0" +description = "Singer tap for UK GIAS (Get Information About Schools) bulk data" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", + "pandas>=2.0", +] + +[project.scripts] +tap-uk-gias = "tap_uk_gias.tap:TapUKGIAS.cli" diff --git a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/__init__.py b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/__init__.py new file mode 100644 index 0000000..3e8bece --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/__init__.py @@ -0,0 +1 @@ +"""tap-uk-gias: Singer tap for GIAS bulk establishment data.""" diff --git a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py new file mode 100644 index 0000000..19f1903 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py @@ -0,0 +1,135 @@ +"""GIAS Singer tap — extracts bulk establishment CSV from GIAS API.""" + +from __future__ import annotations + +from datetime import date + +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + +GIAS_URL_TEMPLATE = ( + "https://ea-edubase-api-prod.azurewebsites.net" + "/edubase/downloads/public/edubasealldata{date}.csv" +) + + +class GIASEstablishmentsStream(Stream): + """Stream: GIAS establishments (one row per URN).""" + + name = "gias_establishments" + primary_keys = ["URN"] + replication_key = None + + # Schema is wide (~250 columns); we declare key columns and pass through the rest + schema = th.PropertiesList( + th.Property("URN", th.IntegerType, required=True), + th.Property("EstablishmentName", th.StringType), + th.Property("TypeOfEstablishment (name)", th.StringType), + th.Property("PhaseOfEducation (name)", th.StringType), + th.Property("LA (code)", th.IntegerType), + th.Property("LA (name)", th.StringType), + th.Property("EstablishmentNumber", th.IntegerType), + th.Property("EstablishmentStatus (name)", th.StringType), + th.Property("Postcode", th.StringType), + ).to_dict() + + def get_records(self, context): + """Download GIAS CSV and yield rows.""" + import io + + import pandas as pd + import requests + + today = date.today().strftime("%Y%m%d") + url = GIAS_URL_TEMPLATE.format(date=today) + + self.logger.info("Downloading GIAS bulk CSV: %s", url) + resp = requests.get(url, timeout=120) + resp.raise_for_status() + + df = pd.read_csv( + io.StringIO(resp.text), + encoding="utf-8-sig", + dtype=str, + keep_default_na=False, + ) + + for _, row in df.iterrows(): + record = row.to_dict() + # Cast URN to int + try: + record["URN"] = int(record["URN"]) + except (ValueError, KeyError): + continue + yield record + + +class GIASLinksStream(Stream): + """Stream: GIAS school links (predecessor/successor).""" + + name = "gias_links" + primary_keys = ["URN", "LinkURN"] + replication_key = None + + schema = th.PropertiesList( + th.Property("URN", th.IntegerType, required=True), + th.Property("LinkURN", th.IntegerType, required=True), + th.Property("LinkType", th.StringType), + th.Property("LinkEstablishedDate", th.StringType), + ).to_dict() + + def get_records(self, context): + """Download GIAS links CSV and yield rows.""" + import io + + import pandas as pd + import requests + + url = ( + "https://ea-edubase-api-prod.azurewebsites.net" + "/edubase/downloads/public/links_edubasealldata.csv" + ) + + self.logger.info("Downloading GIAS links CSV: %s", url) + resp = requests.get(url, timeout=120) + resp.raise_for_status() + + df = pd.read_csv( + io.StringIO(resp.text), + encoding="utf-8-sig", + dtype=str, + keep_default_na=False, + ) + + for _, row in df.iterrows(): + record = row.to_dict() + try: + record["URN"] = int(record["URN"]) + record["LinkURN"] = int(record["LinkURN"]) + except (ValueError, KeyError): + continue + yield record + + +class TapUKGIAS(Tap): + """Singer tap for UK GIAS data.""" + + name = "tap-uk-gias" + + config_jsonschema = th.PropertiesList( + th.Property( + "download_url", + th.StringType, + description="Override GIAS CSV download URL", + ), + ).to_dict() + + def discover_streams(self): + return [ + GIASEstablishmentsStream(self), + GIASLinksStream(self), + ] + + +if __name__ == "__main__": + TapUKGIAS.cli() diff --git a/pipeline/plugins/extractors/tap-uk-idaci/pyproject.toml b/pipeline/plugins/extractors/tap-uk-idaci/pyproject.toml new file mode 100644 index 0000000..26f69a3 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-idaci/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-idaci" +version = "0.1.0" +description = "Singer tap for UK IDACI deprivation index" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", + "pandas>=2.0", +] + +[project.scripts] +tap-uk-idaci = "tap_uk_idaci.tap:TapUKIDACI.cli" diff --git a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/__init__.py b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/__init__.py new file mode 100644 index 0000000..3324e1e --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/__init__.py @@ -0,0 +1 @@ +"""tap-uk-idaci: Singer tap for IDACI (Income Deprivation Affecting Children Index).""" diff --git a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py new file mode 100644 index 0000000..3aefb46 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py @@ -0,0 +1,41 @@ +"""IDACI Singer tap — extracts deprivation index lookup data.""" + +from __future__ import annotations + +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + + +class IDACIStream(Stream): + """Stream: IDACI scores by LSOA.""" + + name = "idaci" + primary_keys = ["lsoa_code"] + replication_key = None + + schema = th.PropertiesList( + th.Property("lsoa_code", th.StringType, required=True), + th.Property("idaci_score", th.NumberType), + th.Property("idaci_decile", th.IntegerType), + ).to_dict() + + def get_records(self, context): + # TODO: Implement IDACI extraction + # Source: MHCLG IoD 2019 LSOA-level data + # Available as a static CSV download + self.logger.warning("IDACI extraction not yet implemented") + return iter([]) + + +class TapUKIDACI(Tap): + """Singer tap for UK IDACI data.""" + + name = "tap-uk-idaci" + config_jsonschema = th.PropertiesList().to_dict() + + def discover_streams(self): + return [IDACIStream(self)] + + +if __name__ == "__main__": + TapUKIDACI.cli() diff --git a/pipeline/plugins/extractors/tap-uk-ofsted/pyproject.toml b/pipeline/plugins/extractors/tap-uk-ofsted/pyproject.toml new file mode 100644 index 0000000..3474a0f --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ofsted/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-ofsted" +version = "0.1.0" +description = "Singer tap for UK Ofsted Management Information" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", + "pandas>=2.0", + "odfpy>=1.4", +] + +[project.scripts] +tap-uk-ofsted = "tap_uk_ofsted.tap:TapUKOfsted.cli" diff --git a/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/__init__.py b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/__init__.py new file mode 100644 index 0000000..e658c91 --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/__init__.py @@ -0,0 +1 @@ +"""tap-uk-ofsted: Singer tap for Ofsted Management Information.""" diff --git a/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py new file mode 100644 index 0000000..ae5681d --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py @@ -0,0 +1,176 @@ +"""Ofsted MI Singer tap — extracts inspection records from GOV.UK CSV/ODS.""" + +from __future__ import annotations + +import io +import re + +import requests +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + +GOV_UK_PAGE = ( + "https://www.gov.uk/government/statistical-data-sets/" + "monthly-management-information-ofsteds-school-inspections-outcomes" +) + +# Column name → internal field, in priority order (first match wins). +# Handles both current and older file formats. +COLUMN_PRIORITY = { + "urn": ["URN", "Urn", "urn"], + "inspection_date": [ + "Inspection start date of latest OEIF graded inspection", + "Inspection start date", + "Inspection date", + ], + "inspection_type": [ + "Inspection type of latest OEIF graded inspection", + "Inspection type", + ], + "event_type_grouping": [ + "Event type grouping", + "Inspection type grouping", + ], + "overall_effectiveness": [ + "Latest OEIF overall effectiveness", + "Overall effectiveness", + ], + "quality_of_education": [ + "Latest OEIF quality of education", + "Quality of education", + ], + "behaviour_and_attitudes": [ + "Latest OEIF behaviour and attitudes", + "Behaviour and attitudes", + ], + "personal_development": [ + "Latest OEIF personal development", + "Personal development", + ], + "effectiveness_of_leadership_and_management": [ + "Latest OEIF effectiveness of leadership and management", + "Effectiveness of leadership and management", + ], + "early_years_provision": [ + "Latest OEIF early years provision", + "Early years provision (where applicable)", + ], + "sixth_form_provision": [ + "Latest OEIF sixth form provision", + "Sixth form provision (where applicable)", + ], +} + + +def discover_csv_url() -> str | None: + """Scrape GOV.UK page to find the latest MI CSV download link.""" + resp = requests.get(GOV_UK_PAGE, timeout=30) + resp.raise_for_status() + # Look for CSV attachment links + matches = re.findall( + r'href="(https://assets\.publishing\.service\.gov\.uk/[^"]+\.csv)"', + resp.text, + ) + if matches: + return matches[0] + # Fall back to ODS + matches = re.findall( + r'href="(https://assets\.publishing\.service\.gov\.uk/[^"]+\.ods)"', + resp.text, + ) + return matches[0] if matches else None + + +class OfstedInspectionsStream(Stream): + """Stream: Ofsted inspection records.""" + + name = "ofsted_inspections" + primary_keys = ["urn", "inspection_date"] + replication_key = None + + schema = th.PropertiesList( + th.Property("urn", th.IntegerType, required=True), + th.Property("inspection_date", th.StringType), + th.Property("inspection_type", th.StringType), + th.Property("event_type_grouping", th.StringType), + th.Property("overall_effectiveness", th.StringType), + th.Property("quality_of_education", th.StringType), + th.Property("behaviour_and_attitudes", th.StringType), + th.Property("personal_development", th.StringType), + th.Property("effectiveness_of_leadership_and_management", th.StringType), + th.Property("early_years_provision", th.StringType), + th.Property("sixth_form_provision", th.StringType), + th.Property("report_url", th.StringType), + ).to_dict() + + def _resolve_columns(self, df_columns: list[str]) -> dict[str, str]: + """Map internal field names to actual CSV column names.""" + mapping = {} + for field, candidates in COLUMN_PRIORITY.items(): + for candidate in candidates: + if candidate in df_columns: + mapping[field] = candidate + break + return mapping + + def get_records(self, context): + import pandas as pd + + url = self.config.get("mi_url") or discover_csv_url() + if not url: + self.logger.error("Could not discover Ofsted MI download URL") + return + + self.logger.info("Downloading Ofsted MI: %s", url) + resp = requests.get(url, timeout=120) + resp.raise_for_status() + + if url.endswith(".ods"): + df = pd.read_excel(io.BytesIO(resp.content), engine="odf", dtype=str) + else: + # Detect header row (may not be row 0) + text = resp.content.decode("utf-8-sig", errors="replace") + lines = text.split("\n") + header_idx = 0 + for i, line in enumerate(lines[:20]): + if "URN" in line or "urn" in line.lower(): + header_idx = i + break + df = pd.read_csv( + io.StringIO(text), + skiprows=header_idx, + dtype=str, + keep_default_na=False, + ) + + col_map = self._resolve_columns(list(df.columns)) + + for _, row in df.iterrows(): + record = {} + for field, col in col_map.items(): + record[field] = row.get(col, None) + + # Cast URN + try: + record["urn"] = int(record["urn"]) + except (ValueError, KeyError, TypeError): + continue + + yield record + + +class TapUKOfsted(Tap): + """Singer tap for UK Ofsted Management Information.""" + + name = "tap-uk-ofsted" + + config_jsonschema = th.PropertiesList( + th.Property("mi_url", th.StringType, description="Direct URL to Ofsted MI file"), + ).to_dict() + + def discover_streams(self): + return [OfstedInspectionsStream(self)] + + +if __name__ == "__main__": + TapUKOfsted.cli() diff --git a/pipeline/plugins/extractors/tap-uk-parent-view/pyproject.toml b/pipeline/plugins/extractors/tap-uk-parent-view/pyproject.toml new file mode 100644 index 0000000..4d45bfa --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-parent-view/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "tap-uk-parent-view" +version = "0.1.0" +description = "Singer tap for UK Ofsted Parent View survey data" +requires-python = ">=3.10" +dependencies = [ + "singer-sdk~=0.39", + "requests>=2.31", + "pandas>=2.0", + "openpyxl>=3.1", +] + +[project.scripts] +tap-uk-parent-view = "tap_uk_parent_view.tap:TapUKParentView.cli" diff --git a/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/__init__.py b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/__init__.py new file mode 100644 index 0000000..02f719a --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/__init__.py @@ -0,0 +1 @@ +"""tap-uk-parent-view: Singer tap for Ofsted Parent View survey data.""" diff --git a/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py new file mode 100644 index 0000000..b209bbc --- /dev/null +++ b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py @@ -0,0 +1,49 @@ +"""Parent View Singer tap — extracts survey data from Ofsted Parent View portal.""" + +from __future__ import annotations + +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + + +class ParentViewStream(Stream): + """Stream: Parent View survey responses per school.""" + + name = "parent_view" + primary_keys = ["urn"] + replication_key = None + + schema = th.PropertiesList( + th.Property("urn", th.IntegerType, required=True), + th.Property("survey_date", th.StringType), + th.Property("total_responses", th.IntegerType), + th.Property("q_happy_pct", th.NumberType), + th.Property("q_safe_pct", th.NumberType), + th.Property("q_progress_pct", th.NumberType), + th.Property("q_well_taught_pct", th.NumberType), + th.Property("q_well_led_pct", th.NumberType), + th.Property("q_behaviour_pct", th.NumberType), + th.Property("q_bullying_pct", th.NumberType), + th.Property("q_recommend_pct", th.NumberType), + ).to_dict() + + def get_records(self, context): + # TODO: Implement Parent View data extraction + # Source: Ofsted Parent View portal XLSX/CSV download + # URL discovery requires scraping parentview.ofsted.gov.uk + self.logger.warning("Parent View extraction not yet implemented") + return iter([]) + + +class TapUKParentView(Tap): + """Singer tap for UK Ofsted Parent View.""" + + name = "tap-uk-parent-view" + config_jsonschema = th.PropertiesList().to_dict() + + def discover_streams(self): + return [ParentViewStream(self)] + + +if __name__ == "__main__": + TapUKParentView.cli() diff --git a/pipeline/requirements.txt b/pipeline/requirements.txt new file mode 100644 index 0000000..5fe7ad4 --- /dev/null +++ b/pipeline/requirements.txt @@ -0,0 +1,10 @@ +# Pipeline dependencies +meltano==3.5.* +dbt-postgres~=1.8 +apache-airflow==2.10.* +apache-airflow-providers-postgres>=5.0 +typesense>=0.21 +requests>=2.31 +openpyxl>=3.1 +odfpy>=1.4 +psycopg2-binary>=2.9 diff --git a/pipeline/scripts/geocode_postcodes.py b/pipeline/scripts/geocode_postcodes.py new file mode 100644 index 0000000..c5b1f99 --- /dev/null +++ b/pipeline/scripts/geocode_postcodes.py @@ -0,0 +1,118 @@ +""" +Batch geocode postcodes via Postcodes.io and update dim_location with lat/lng + PostGIS geometry. + +Usage: + python geocode_postcodes.py [--batch-size 100] +""" + +from __future__ import annotations + +import argparse +import os +import time + +import psycopg2 +import psycopg2.extras +import requests + +POSTCODES_IO_BULK = "https://api.postcodes.io/postcodes" +BATCH_SIZE = 100 # Postcodes.io max per request + + +def get_db_connection(): + return psycopg2.connect( + host=os.environ.get("PG_HOST", "localhost"), + port=os.environ.get("PG_PORT", "5432"), + user=os.environ.get("PG_USER", "postgres"), + password=os.environ.get("PG_PASSWORD", "postgres"), + dbname=os.environ.get("PG_DATABASE", "school_compare"), + ) + + +def fetch_ungeooded_postcodes(conn, limit: int = 5000) -> list[dict]: + """Get postcodes from dim_location that don't have lat/lng yet.""" + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(""" + SELECT urn, postcode + FROM marts.dim_location + WHERE geom IS NULL + AND postcode IS NOT NULL + LIMIT %s + """, (limit,)) + return cur.fetchall() + + +def bulk_geocode(postcodes: list[str]) -> dict[str, tuple[float, float]]: + """Geocode a batch of postcodes via Postcodes.io bulk API.""" + resp = requests.post( + POSTCODES_IO_BULK, + json={"postcodes": postcodes}, + timeout=30, + ) + resp.raise_for_status() + + results = {} + for item in resp.json().get("result", []): + if item["result"]: + pc = item["query"].upper().replace(" ", "") + results[pc] = (item["result"]["latitude"], item["result"]["longitude"]) + return results + + +def update_locations(conn, updates: list[tuple[float, float, int]]): + """Update dim_location with lat/lng and PostGIS geometry.""" + with conn.cursor() as cur: + psycopg2.extras.execute_batch(cur, """ + UPDATE marts.dim_location + SET geom = ST_SetSRID(ST_MakePoint(%s, %s), 4326) + WHERE urn = %s + """, [(lng, lat, urn) for lat, lng, urn in updates]) + conn.commit() + + +def main(): + parser = argparse.ArgumentParser(description="Batch geocode school postcodes") + parser.add_argument("--batch-size", type=int, default=BATCH_SIZE) + args = parser.parse_args() + + conn = get_db_connection() + + rows = fetch_ungeooded_postcodes(conn) + if not rows: + print("All postcodes already geocoded.") + return + + print(f"Geocoding {len(rows)} postcodes...") + + total_updated = 0 + for i in range(0, len(rows), args.batch_size): + batch = rows[i : i + args.batch_size] + postcodes = [r["postcode"] for r in batch if r["postcode"]] + urn_by_pc = {} + for r in batch: + if r["postcode"]: + pc_key = r["postcode"].upper().replace(" ", "") + urn_by_pc.setdefault(pc_key, []).append(r["urn"]) + + results = bulk_geocode(postcodes) + + updates = [] + for pc, (lat, lng) in results.items(): + for urn in urn_by_pc.get(pc, []): + updates.append((lat, lng, urn)) + + if updates: + update_locations(conn, updates) + total_updated += len(updates) + + print(f" Batch {i // args.batch_size + 1}: geocoded {len(results)}/{len(postcodes)} postcodes") + + # Rate limit: Postcodes.io is generous but be polite + time.sleep(0.2) + + conn.close() + print(f"Done. Updated {total_updated} locations.") + + +if __name__ == "__main__": + main() diff --git a/pipeline/scripts/sync_typesense.py b/pipeline/scripts/sync_typesense.py new file mode 100644 index 0000000..3c11d75 --- /dev/null +++ b/pipeline/scripts/sync_typesense.py @@ -0,0 +1,177 @@ +""" +Sync dbt marts → Typesense search index. + +Reads dim_school + dim_location + latest fact data from PostgreSQL marts, +then upserts into a Typesense collection with zero-downtime alias swapping. + +Usage: + python sync_typesense.py [--typesense-url http://localhost:8108] [--api-key xyz] +""" + +from __future__ import annotations + +import argparse +import os +import sys +import time + +import psycopg2 +import psycopg2.extras +import typesense + +COLLECTION_SCHEMA = { + "fields": [ + {"name": "urn", "type": "int32"}, + {"name": "school_name", "type": "string"}, + {"name": "phase", "type": "string", "facet": True}, + {"name": "school_type", "type": "string", "facet": True}, + {"name": "local_authority", "type": "string", "facet": True}, + {"name": "religious_character", "type": "string", "facet": True, "optional": True}, + {"name": "ofsted_rating", "type": "string", "facet": True, "optional": True}, + {"name": "postcode", "type": "string"}, + {"name": "location", "type": "geopoint", "optional": True}, + {"name": "headteacher_name", "type": "string", "optional": True}, + {"name": "rwm_expected_pct", "type": "float", "optional": True}, + {"name": "progress_8_score", "type": "float", "optional": True}, + {"name": "total_pupils", "type": "int32", "optional": True}, + ], + "default_sorting_field": "school_name", +} + +OFSTED_LABELS = {1: "Outstanding", 2: "Good", 3: "Requires Improvement", 4: "Inadequate"} + +QUERY = """ + SELECT + s.urn, + s.school_name, + s.phase, + s.school_type, + l.local_authority_name as local_authority, + s.religious_character, + s.ofsted_grade, + l.postcode, + s.headteacher_name, + s.total_pupils, + -- Latest KS2 + ks2.rwm_expected_pct, + -- Latest KS4 + ks4.progress_8_score + FROM marts.dim_school s + LEFT JOIN marts.dim_location l ON s.urn = l.urn + LEFT JOIN LATERAL ( + SELECT rwm_expected_pct + FROM marts.fact_ks2_performance + WHERE urn = s.urn + ORDER BY year DESC + LIMIT 1 + ) ks2 ON true + LEFT JOIN LATERAL ( + SELECT progress_8_score + FROM marts.fact_ks4_performance + WHERE urn = s.urn + ORDER BY year DESC + LIMIT 1 + ) ks4 ON true +""" + + +def get_db_connection(): + return psycopg2.connect( + host=os.environ.get("PG_HOST", "localhost"), + port=os.environ.get("PG_PORT", "5432"), + user=os.environ.get("PG_USER", "postgres"), + password=os.environ.get("PG_PASSWORD", "postgres"), + dbname=os.environ.get("PG_DATABASE", "school_compare"), + ) + + +def build_document(row: dict) -> dict: + """Convert a DB row to a Typesense document.""" + doc = { + "id": str(row["urn"]), + "urn": row["urn"], + "school_name": row["school_name"] or "", + "phase": row["phase"] or "", + "school_type": row["school_type"] or "", + "local_authority": row["local_authority"] or "", + "postcode": row["postcode"] or "", + } + + if row.get("religious_character"): + doc["religious_character"] = row["religious_character"] + if row.get("ofsted_grade"): + doc["ofsted_rating"] = OFSTED_LABELS.get(row["ofsted_grade"], "") + if row.get("headteacher_name"): + doc["headteacher_name"] = row["headteacher_name"] + if row.get("total_pupils"): + doc["total_pupils"] = row["total_pupils"] + if row.get("rwm_expected_pct") is not None: + doc["rwm_expected_pct"] = float(row["rwm_expected_pct"]) + if row.get("progress_8_score") is not None: + doc["progress_8_score"] = float(row["progress_8_score"]) + + # Geo: location field expects [lat, lng] — will be populated once + # dim_location has lat/lng from PostGIS geocoding + + return doc + + +def sync(typesense_url: str, api_key: str): + client = typesense.Client({ + "nodes": [{"host": typesense_url.split("//")[-1].split(":")[0], + "port": typesense_url.split(":")[-1], + "protocol": "http"}], + "api_key": api_key, + "connection_timeout_seconds": 10, + }) + + # Create timestamped collection for zero-downtime swap + ts = int(time.time()) + collection_name = f"schools_{ts}" + + print(f"Creating collection: {collection_name}") + schema = {**COLLECTION_SCHEMA, "name": collection_name} + client.collections.create(schema) + + # Fetch data from marts + conn = get_db_connection() + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(QUERY) + rows = cur.fetchall() + conn.close() + + print(f"Indexing {len(rows)} schools...") + + # Batch import + batch_size = 500 + for i in range(0, len(rows), batch_size): + batch = [build_document(r) for r in rows[i : i + batch_size]] + client.collections[collection_name].documents.import_(batch, {"action": "upsert"}) + print(f" Indexed {min(i + batch_size, len(rows))}/{len(rows)}") + + # Swap alias + print("Swapping alias 'schools' → new collection") + try: + client.aliases.upsert("schools", {"collection_name": collection_name}) + except Exception: + # If alias doesn't exist yet, create it + client.aliases.upsert("schools", {"collection_name": collection_name}) + + print("Done.") + + +def main(): + parser = argparse.ArgumentParser(description="Sync marts to Typesense") + parser.add_argument("--typesense-url", default=os.environ.get("TYPESENSE_URL", "http://localhost:8108")) + parser.add_argument("--api-key", default=os.environ.get("TYPESENSE_API_KEY", "")) + args = parser.parse_args() + + if not args.api_key: + print("Error: --api-key or TYPESENSE_API_KEY required", file=sys.stderr) + sys.exit(1) + + sync(args.typesense_url, args.api_key) + + +if __name__ == "__main__": + main() diff --git a/pipeline/transform/dbt_project.yml b/pipeline/transform/dbt_project.yml new file mode 100644 index 0000000..86dce6a --- /dev/null +++ b/pipeline/transform/dbt_project.yml @@ -0,0 +1,28 @@ +name: school_compare +version: "1.0.0" +config-version: 2 + +profile: school_compare + +model-paths: ["models"] +macro-paths: ["macros"] +test-paths: ["tests"] +seed-paths: ["seeds"] +target-path: "target" +clean-targets: ["target", "dbt_packages"] + +models: + school_compare: + staging: + +materialized: view + +schema: staging + intermediate: + +materialized: view + +schema: intermediate + marts: + +materialized: table + +schema: marts + +seeds: + school_compare: + +schema: seeds diff --git a/pipeline/transform/macros/chain_lineage.sql b/pipeline/transform/macros/chain_lineage.sql new file mode 100644 index 0000000..cd5be10 --- /dev/null +++ b/pipeline/transform/macros/chain_lineage.sql @@ -0,0 +1,36 @@ +-- Macro: Generate a CTE that unions current and predecessor data for a given source + +{% macro chain_lineage(source_ref, urn_col='urn', year_col='year') %} + +with current_data as ( + select + {{ urn_col }} as current_urn, + {{ urn_col }} as source_urn, + * + from {{ source_ref }} +), + +predecessor_data as ( + select + lin.current_urn, + src.{{ urn_col }} as source_urn, + src.* + from {{ source_ref }} src + inner join {{ ref('int_school_lineage') }} lin + on src.{{ urn_col }} = lin.predecessor_urn + where not exists ( + select 1 from {{ source_ref }} curr + where curr.{{ urn_col }} = lin.current_urn + and curr.{{ year_col }} = src.{{ year_col }} + ) +), + +combined as ( + select * from current_data + union all + select * from predecessor_data +) + +select * from combined + +{% endmacro %} diff --git a/pipeline/transform/macros/parse_ofsted_grade.sql b/pipeline/transform/macros/parse_ofsted_grade.sql new file mode 100644 index 0000000..0a6778c --- /dev/null +++ b/pipeline/transform/macros/parse_ofsted_grade.sql @@ -0,0 +1,13 @@ +-- Macro: Parse Ofsted grade from various text/numeric representations + +{% macro parse_ofsted_grade(column) %} + case + when {{ column }}::text in ('1', 'Outstanding') then 1 + when {{ column }}::text in ('2', 'Good') then 2 + when {{ column }}::text in ('3', 'Requires improvement', 'Requires Improvement', 'Satisfactory') then 3 + when {{ column }}::text in ('4', 'Inadequate') then 4 + when {{ column }}::text in ('9', 'SWK', 'Serious Weaknesses') then 4 + when {{ column }}::text in ('SM', 'Special Measures') then 4 + else null + end +{% endmacro %} diff --git a/pipeline/transform/macros/validate_uk_coordinates.sql b/pipeline/transform/macros/validate_uk_coordinates.sql new file mode 100644 index 0000000..52dc65a --- /dev/null +++ b/pipeline/transform/macros/validate_uk_coordinates.sql @@ -0,0 +1,8 @@ +-- Macro: Validate that latitude/longitude fall within UK bounding box + +{% macro validate_uk_coordinates(lat_col, lng_col) %} + ( + {{ lat_col }} between 49.0 and 61.0 + and {{ lng_col }} between -8.0 and 2.0 + ) +{% endmacro %} diff --git a/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql new file mode 100644 index 0000000..1ec704a --- /dev/null +++ b/pipeline/transform/models/intermediate/int_ks2_with_lineage.sql @@ -0,0 +1,62 @@ +-- Intermediate model: KS2 data chained across academy conversions +-- Maps predecessor URN data to the current active URN + +with current_ks2 as ( + select + urn as current_urn, + urn as source_urn, + year, + total_pupils, + rwm_expected_pct, + reading_expected_pct, + writing_expected_pct, + maths_expected_pct, + rwm_high_pct, + reading_high_pct, + writing_high_pct, + maths_high_pct, + reading_progress, + writing_progress, + maths_progress, + reading_avg_score, + maths_avg_score + from {{ ref('stg_ees_ks2') }} +), + +predecessor_ks2 as ( + select + lin.current_urn, + ks2.urn as source_urn, + ks2.year, + ks2.total_pupils, + ks2.rwm_expected_pct, + ks2.reading_expected_pct, + ks2.writing_expected_pct, + ks2.maths_expected_pct, + ks2.rwm_high_pct, + ks2.reading_high_pct, + ks2.writing_high_pct, + ks2.maths_high_pct, + ks2.reading_progress, + ks2.writing_progress, + ks2.maths_progress, + ks2.reading_avg_score, + ks2.maths_avg_score + from {{ ref('stg_ees_ks2') }} ks2 + inner join {{ ref('int_school_lineage') }} lin + on ks2.urn = lin.predecessor_urn + -- Only include predecessor data for years before the current URN has data + where not exists ( + select 1 from {{ ref('stg_ees_ks2') }} curr + where curr.urn = lin.current_urn + and curr.year = ks2.year + ) +), + +combined as ( + select * from current_ks2 + union all + select * from predecessor_ks2 +) + +select * from combined diff --git a/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql new file mode 100644 index 0000000..9008f82 --- /dev/null +++ b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql @@ -0,0 +1,50 @@ +-- Intermediate model: KS4 data chained across academy conversions + +with current_ks4 as ( + select + urn as current_urn, + urn as source_urn, + year, + total_pupils, + progress_8_score, + attainment_8_score, + ebacc_entry_pct, + ebacc_achievement_pct, + english_strong_pass_pct, + maths_strong_pass_pct, + english_maths_strong_pass_pct, + staying_in_education_pct + from {{ ref('stg_ees_ks4') }} +), + +predecessor_ks4 as ( + select + lin.current_urn, + ks4.urn as source_urn, + ks4.year, + ks4.total_pupils, + ks4.progress_8_score, + ks4.attainment_8_score, + ks4.ebacc_entry_pct, + ks4.ebacc_achievement_pct, + ks4.english_strong_pass_pct, + ks4.maths_strong_pass_pct, + ks4.english_maths_strong_pass_pct, + ks4.staying_in_education_pct + from {{ ref('stg_ees_ks4') }} ks4 + inner join {{ ref('int_school_lineage') }} lin + on ks4.urn = lin.predecessor_urn + where not exists ( + select 1 from {{ ref('stg_ees_ks4') }} curr + where curr.urn = lin.current_urn + and curr.year = ks4.year + ) +), + +combined as ( + select * from current_ks4 + union all + select * from predecessor_ks4 +) + +select * from combined diff --git a/pipeline/transform/models/intermediate/int_ofsted_latest.sql b/pipeline/transform/models/intermediate/int_ofsted_latest.sql new file mode 100644 index 0000000..d0cb10a --- /dev/null +++ b/pipeline/transform/models/intermediate/int_ofsted_latest.sql @@ -0,0 +1,37 @@ +-- Intermediate model: Latest Ofsted inspection per URN +-- Picks the most recent inspection for each school + +with ranked as ( + select + *, + row_number() over ( + partition by urn + order by inspection_date desc + ) as rn + from {{ ref('stg_ofsted_inspections') }} +) + +select + urn, + inspection_date, + inspection_type, + framework, + overall_effectiveness, + quality_of_education, + behaviour_attitudes, + personal_development, + leadership_management, + early_years_provision, + sixth_form_provision, + rc_safeguarding_met, + rc_inclusion, + rc_curriculum_teaching, + rc_achievement, + rc_attendance_behaviour, + rc_personal_development, + rc_leadership_governance, + rc_early_years, + rc_sixth_form, + report_url +from ranked +where rn = 1 diff --git a/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql new file mode 100644 index 0000000..165f3a3 --- /dev/null +++ b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql @@ -0,0 +1,18 @@ +-- Intermediate model: Merged pupil characteristics from census data + +select + urn, + year, + fsm_pct, + sen_support_pct, + sen_ehcp_pct, + eal_pct, + disadvantaged_pct, + ethnicity_white_pct, + ethnicity_asian_pct, + ethnicity_black_pct, + ethnicity_mixed_pct, + ethnicity_other_pct, + class_size_avg, + stability_pct +from {{ ref('stg_ees_census') }} diff --git a/pipeline/transform/models/intermediate/int_school_lineage.sql b/pipeline/transform/models/intermediate/int_school_lineage.sql new file mode 100644 index 0000000..b7f2c5d --- /dev/null +++ b/pipeline/transform/models/intermediate/int_school_lineage.sql @@ -0,0 +1,48 @@ +-- Intermediate model: Recursive predecessor mapping +-- Resolves academy conversion chains so historical data can be attributed +-- to the current (active) URN. + +with recursive lineage as ( + -- Base: schools that are predecessors (linked via academy conversion, amalgamation, etc.) + select + urn, + linked_urn as predecessor_urn, + link_type, + link_date, + 1 as depth + from {{ ref('stg_gias_links') }} + where link_type in ( + 'Predecessor', + 'Predecessor - Loss of academy converter', + 'Predecessor - amalgamated', + 'Predecessor - Fresh Start' + ) + + union all + + -- Recursive step: follow the chain + select + l.urn, + links.linked_urn as predecessor_urn, + links.link_type, + links.link_date, + l.depth + 1 + from lineage l + inner join {{ ref('stg_gias_links') }} links + on l.predecessor_urn = links.urn + where links.link_type in ( + 'Predecessor', + 'Predecessor - Loss of academy converter', + 'Predecessor - amalgamated', + 'Predecessor - Fresh Start' + ) + and l.depth < 5 -- safety limit +) + +select + urn as current_urn, + predecessor_urn, + link_type, + link_date, + depth +from lineage diff --git a/pipeline/transform/models/marts/_marts_schema.yml b/pipeline/transform/models/marts/_marts_schema.yml new file mode 100644 index 0000000..6e6c86b --- /dev/null +++ b/pipeline/transform/models/marts/_marts_schema.yml @@ -0,0 +1,110 @@ +version: 2 + +models: + - name: dim_school + description: Canonical school dimension — one row per active URN + columns: + - name: urn + tests: [not_null, unique] + - name: school_name + tests: [not_null] + - name: phase + tests: [not_null] + - name: status + tests: + - accepted_values: + values: ["Open"] + + - name: dim_location + description: School location dimension with PostGIS geometry + columns: + - name: urn + tests: + - not_null + - unique + - relationships: + to: ref('dim_school') + field: urn + - name: postcode + tests: [not_null] + + - name: map_school_lineage + description: Predecessor/successor lineage map + columns: + - name: urn + tests: + - not_null + - relationships: + to: ref('dim_school') + field: urn + + - name: fact_ks2_performance + description: KS2 attainment — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + tests: + - unique: + column_name: "urn || '-' || year" + + - name: fact_ks4_performance + description: KS4 attainment — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + tests: + - unique: + column_name: "urn || '-' || year" + + - name: fact_ofsted_inspection + description: Full Ofsted inspection history + columns: + - name: urn + tests: [not_null] + - name: inspection_date + tests: [not_null] + + - name: fact_pupil_characteristics + description: Pupil demographics — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + + - name: fact_admissions + description: School admissions — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + + - name: fact_finance + description: School financial data — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + + - name: fact_phonics + description: Phonics screening results — one row per URN per year + columns: + - name: urn + tests: [not_null] + - name: year + tests: [not_null] + + - name: fact_parent_view + description: Parent View survey responses + columns: + - name: urn + tests: [not_null] + + - name: fact_deprivation + description: IDACI deprivation index diff --git a/pipeline/transform/models/marts/dim_location.sql b/pipeline/transform/models/marts/dim_location.sql new file mode 100644 index 0000000..fedc066 --- /dev/null +++ b/pipeline/transform/models/marts/dim_location.sql @@ -0,0 +1,19 @@ +-- Mart: School location dimension — one row per URN, PostGIS-enabled +-- The geom column is populated by a post-hook or the geocode script. + +select + s.urn, + s.address_line1, + s.address_line2, + s.town, + s.county, + s.postcode, + s.local_authority_code, + s.local_authority_name, + s.parliamentary_constituency, + s.urban_rural, + s.easting, + s.northing +from {{ ref('stg_gias_establishments') }} s +where s.status = 'Open' + and s.postcode is not null diff --git a/pipeline/transform/models/marts/dim_school.sql b/pipeline/transform/models/marts/dim_school.sql new file mode 100644 index 0000000..420dec2 --- /dev/null +++ b/pipeline/transform/models/marts/dim_school.sql @@ -0,0 +1,40 @@ +-- Mart: Canonical school dimension — one row per active URN + +with schools as ( + select * from {{ ref('stg_gias_establishments') }} +), + +latest_ofsted as ( + select * from {{ ref('int_ofsted_latest') }} +) + +select + s.urn, + s.local_authority_code * 1000 + s.establishment_number as laestab, + s.school_name, + s.phase, + s.school_type, + s.academy_trust_name, + s.academy_trust_uid, + s.religious_character, + s.gender, + s.statutory_low_age || '-' || s.statutory_high_age as age_range, + s.capacity, + s.total_pupils, + concat_ws(' ', s.head_title, s.head_first_name, s.head_last_name) as headteacher_name, + s.website, + s.telephone, + s.open_date, + s.close_date, + s.status, + s.nursery_provision, + s.admissions_policy, + + -- Latest Ofsted + o.overall_effectiveness as ofsted_grade, + o.inspection_date as ofsted_date, + o.framework as ofsted_framework + +from schools s +left join latest_ofsted o on s.urn = o.urn +where s.status = 'Open' diff --git a/pipeline/transform/models/marts/fact_admissions.sql b/pipeline/transform/models/marts/fact_admissions.sql new file mode 100644 index 0000000..305db45 --- /dev/null +++ b/pipeline/transform/models/marts/fact_admissions.sql @@ -0,0 +1,10 @@ +-- Mart: School admissions — one row per URN per year + +select + urn, + year, + published_admission_number, + total_applications, + first_preference_offers_pct, + oversubscribed +from {{ ref('stg_ees_admissions') }} diff --git a/pipeline/transform/models/marts/fact_deprivation.sql b/pipeline/transform/models/marts/fact_deprivation.sql new file mode 100644 index 0000000..a8b323e --- /dev/null +++ b/pipeline/transform/models/marts/fact_deprivation.sql @@ -0,0 +1,22 @@ +-- Mart: Deprivation index — one row per URN +-- Joins school postcode → LSOA → IDACI score + +with school_postcodes as ( + select + urn, + postcode + from {{ ref('stg_gias_establishments') }} + where status = 'Open' + and postcode is not null +) + +-- Note: The join between postcode and LSOA requires a postcode-to-LSOA +-- lookup table. This will be populated by the geocode script or a seed. +-- For now, this model serves as a placeholder that will be completed +-- once the IDACI tap provides the postcode→LSOA mapping. + +select + i.lsoa_code, + i.idaci_score, + i.idaci_decile +from {{ ref('stg_idaci') }} i diff --git a/pipeline/transform/models/marts/fact_finance.sql b/pipeline/transform/models/marts/fact_finance.sql new file mode 100644 index 0000000..1de1505 --- /dev/null +++ b/pipeline/transform/models/marts/fact_finance.sql @@ -0,0 +1,11 @@ +-- Mart: School financial data — one row per URN per year + +select + urn, + year, + per_pupil_spend, + staff_cost_pct, + teacher_cost_pct, + support_staff_cost_pct, + premises_cost_pct +from {{ ref('stg_fbit_finance') }} diff --git a/pipeline/transform/models/marts/fact_ks2_performance.sql b/pipeline/transform/models/marts/fact_ks2_performance.sql new file mode 100644 index 0000000..cc9f3b0 --- /dev/null +++ b/pipeline/transform/models/marts/fact_ks2_performance.sql @@ -0,0 +1,22 @@ +-- Mart: KS2 performance fact table — one row per URN per year +-- Includes predecessor data via lineage resolution + +select + current_urn as urn, + source_urn, + year, + total_pupils, + rwm_expected_pct, + reading_expected_pct, + writing_expected_pct, + maths_expected_pct, + rwm_high_pct, + reading_high_pct, + writing_high_pct, + maths_high_pct, + reading_progress, + writing_progress, + maths_progress, + reading_avg_score, + maths_avg_score +from {{ ref('int_ks2_with_lineage') }} diff --git a/pipeline/transform/models/marts/fact_ks4_performance.sql b/pipeline/transform/models/marts/fact_ks4_performance.sql new file mode 100644 index 0000000..accae46 --- /dev/null +++ b/pipeline/transform/models/marts/fact_ks4_performance.sql @@ -0,0 +1,16 @@ +-- Mart: KS4 performance fact table — one row per URN per year + +select + current_urn as urn, + source_urn, + year, + total_pupils, + progress_8_score, + attainment_8_score, + ebacc_entry_pct, + ebacc_achievement_pct, + english_strong_pass_pct, + maths_strong_pass_pct, + english_maths_strong_pass_pct, + staying_in_education_pct +from {{ ref('int_ks4_with_lineage') }} diff --git a/pipeline/transform/models/marts/fact_ofsted_inspection.sql b/pipeline/transform/models/marts/fact_ofsted_inspection.sql new file mode 100644 index 0000000..57d1a94 --- /dev/null +++ b/pipeline/transform/models/marts/fact_ofsted_inspection.sql @@ -0,0 +1,25 @@ +-- Mart: Full Ofsted inspection history — one row per inspection + +select + urn, + inspection_date, + inspection_type, + framework, + overall_effectiveness, + quality_of_education, + behaviour_attitudes, + personal_development, + leadership_management, + early_years_provision, + sixth_form_provision, + rc_safeguarding_met, + rc_inclusion, + rc_curriculum_teaching, + rc_achievement, + rc_attendance_behaviour, + rc_personal_development, + rc_leadership_governance, + rc_early_years, + rc_sixth_form, + report_url +from {{ ref('stg_ofsted_inspections') }} diff --git a/pipeline/transform/models/marts/fact_parent_view.sql b/pipeline/transform/models/marts/fact_parent_view.sql new file mode 100644 index 0000000..10d984a --- /dev/null +++ b/pipeline/transform/models/marts/fact_parent_view.sql @@ -0,0 +1,15 @@ +-- Mart: Parent View survey responses — one row per URN (latest survey) + +select + urn, + survey_date, + total_responses, + q_happy_pct, + q_safe_pct, + q_progress_pct, + q_well_taught_pct, + q_well_led_pct, + q_behaviour_pct, + q_bullying_pct, + q_recommend_pct +from {{ ref('stg_parent_view') }} diff --git a/pipeline/transform/models/marts/fact_phonics.sql b/pipeline/transform/models/marts/fact_phonics.sql new file mode 100644 index 0000000..71a6a2d --- /dev/null +++ b/pipeline/transform/models/marts/fact_phonics.sql @@ -0,0 +1,8 @@ +-- Mart: Phonics screening results — one row per URN per year + +select + urn, + year, + year1_phonics_pct, + year2_phonics_pct +from {{ ref('stg_ees_phonics') }} diff --git a/pipeline/transform/models/marts/fact_pupil_characteristics.sql b/pipeline/transform/models/marts/fact_pupil_characteristics.sql new file mode 100644 index 0000000..c12936b --- /dev/null +++ b/pipeline/transform/models/marts/fact_pupil_characteristics.sql @@ -0,0 +1,18 @@ +-- Mart: Pupil characteristics — one row per URN per year + +select + urn, + year, + fsm_pct, + sen_support_pct, + sen_ehcp_pct, + eal_pct, + disadvantaged_pct, + ethnicity_white_pct, + ethnicity_asian_pct, + ethnicity_black_pct, + ethnicity_mixed_pct, + ethnicity_other_pct, + class_size_avg, + stability_pct +from {{ ref('int_pupil_chars_merged') }} diff --git a/pipeline/transform/models/marts/map_school_lineage.sql b/pipeline/transform/models/marts/map_school_lineage.sql new file mode 100644 index 0000000..bd33352 --- /dev/null +++ b/pipeline/transform/models/marts/map_school_lineage.sql @@ -0,0 +1,9 @@ +-- Mart: School predecessor/successor lineage map + +select + current_urn as urn, + predecessor_urn, + link_type, + link_date, + depth +from {{ ref('int_school_lineage') }} diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml new file mode 100644 index 0000000..bd45419 --- /dev/null +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -0,0 +1,69 @@ +version: 2 + +sources: + - name: raw + description: Raw data loaded by Meltano Singer taps into the raw schema + schema: raw + tables: + - name: gias_establishments + description: GIAS bulk establishment data (one row per URN) + columns: + - name: urn + tests: [not_null, unique] + + - name: gias_links + description: GIAS predecessor/successor links between schools + columns: + - name: urn + tests: [not_null] + + - name: ofsted_inspections + description: Ofsted Management Information inspection records + columns: + - name: urn + tests: [not_null] + + - name: ees_ks2 + description: KS2 attainment data from Explore Education Statistics + columns: + - name: urn + tests: [not_null] + + - name: ees_ks4 + description: KS4 attainment data from Explore Education Statistics + columns: + - name: urn + tests: [not_null] + + - name: ees_census + description: School census pupil characteristics + columns: + - name: urn + tests: [not_null] + + - name: ees_admissions + description: Primary and secondary school admissions data + columns: + - name: urn + tests: [not_null] + + - name: ees_phonics + description: Phonics screening check results + columns: + - name: urn + tests: [not_null] + + - name: parent_view + description: Ofsted Parent View survey responses + columns: + - name: urn + tests: [not_null] + + - name: fbit_finance + description: Financial benchmarking data from FBIT API + columns: + - name: urn + tests: [not_null] + + - name: idaci + description: Income Deprivation Affecting Children Index lookups diff --git a/pipeline/transform/models/staging/stg_ees_admissions.sql b/pipeline/transform/models/staging/stg_ees_admissions.sql new file mode 100644 index 0000000..4c8c411 --- /dev/null +++ b/pipeline/transform/models/staging/stg_ees_admissions.sql @@ -0,0 +1,19 @@ +-- Staging model: Primary and secondary school admissions from EES + +with source as ( + select * from {{ source('raw', 'ees_admissions') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(time_period as integer) as year, + cast(published_admission_number as integer) as published_admission_number, + cast(total_applications as integer) as total_applications, + cast(first_preference_offers_pct as numeric) as first_preference_offers_pct, + cast(oversubscribed as boolean) as oversubscribed + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_census.sql b/pipeline/transform/models/staging/stg_ees_census.sql new file mode 100644 index 0000000..fe38c51 --- /dev/null +++ b/pipeline/transform/models/staging/stg_ees_census.sql @@ -0,0 +1,27 @@ +-- Staging model: School census pupil characteristics from EES + +with source as ( + select * from {{ source('raw', 'ees_census') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(time_period as integer) as year, + cast(fsm_pct as numeric) as fsm_pct, + cast(sen_support_pct as numeric) as sen_support_pct, + cast(sen_ehcp_pct as numeric) as sen_ehcp_pct, + cast(eal_pct as numeric) as eal_pct, + cast(disadvantaged_pct as numeric) as disadvantaged_pct, + cast(ethnicity_white_pct as numeric) as ethnicity_white_pct, + cast(ethnicity_asian_pct as numeric) as ethnicity_asian_pct, + cast(ethnicity_black_pct as numeric) as ethnicity_black_pct, + cast(ethnicity_mixed_pct as numeric) as ethnicity_mixed_pct, + cast(ethnicity_other_pct as numeric) as ethnicity_other_pct, + cast(class_size_avg as numeric) as class_size_avg, + cast(stability_pct as numeric) as stability_pct + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_ks2.sql b/pipeline/transform/models/staging/stg_ees_ks2.sql new file mode 100644 index 0000000..61f208d --- /dev/null +++ b/pipeline/transform/models/staging/stg_ees_ks2.sql @@ -0,0 +1,31 @@ +-- Staging model: KS2 attainment data from EES +-- Column names depend on the EES dataset schema; these will be finalised +-- once the tap-uk-ees extractor resolves the actual column names. + +with source as ( + select * from {{ source('raw', 'ees_ks2') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(time_period as integer) as year, + cast(t_pupils as integer) as total_pupils, + cast(pt_rwm_met_expected_standard as numeric) as rwm_expected_pct, + cast(pt_read_met_expected_standard as numeric) as reading_expected_pct, + cast(pt_write_met_expected_standard as numeric) as writing_expected_pct, + cast(pt_maths_met_expected_standard as numeric) as maths_expected_pct, + cast(pt_rwm_met_higher_standard as numeric) as rwm_high_pct, + cast(pt_read_met_higher_standard as numeric) as reading_high_pct, + cast(pt_write_met_higher_standard as numeric) as writing_high_pct, + cast(pt_maths_met_higher_standard as numeric) as maths_high_pct, + cast(read_progress as numeric) as reading_progress, + cast(write_progress as numeric) as writing_progress, + cast(maths_progress as numeric) as maths_progress, + cast(read_average_score as numeric) as reading_avg_score, + cast(maths_average_score as numeric) as maths_avg_score + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_ks4.sql b/pipeline/transform/models/staging/stg_ees_ks4.sql new file mode 100644 index 0000000..9e805c1 --- /dev/null +++ b/pipeline/transform/models/staging/stg_ees_ks4.sql @@ -0,0 +1,24 @@ +-- Staging model: KS4 attainment data from EES (secondary schools — NEW) + +with source as ( + select * from {{ source('raw', 'ees_ks4') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(time_period as integer) as year, + cast(t_pupils as integer) as total_pupils, + cast(progress_8_score as numeric) as progress_8_score, + cast(attainment_8_score as numeric) as attainment_8_score, + cast(ebacc_entry_pct as numeric) as ebacc_entry_pct, + cast(ebacc_achievement_pct as numeric) as ebacc_achievement_pct, + cast(english_strong_pass_pct as numeric) as english_strong_pass_pct, + cast(maths_strong_pass_pct as numeric) as maths_strong_pass_pct, + cast(english_maths_strong_pass_pct as numeric) as english_maths_strong_pass_pct, + cast(staying_in_education_pct as numeric) as staying_in_education_pct + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_phonics.sql b/pipeline/transform/models/staging/stg_ees_phonics.sql new file mode 100644 index 0000000..f5629df --- /dev/null +++ b/pipeline/transform/models/staging/stg_ees_phonics.sql @@ -0,0 +1,17 @@ +-- Staging model: Phonics screening check results from EES + +with source as ( + select * from {{ source('raw', 'ees_phonics') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(time_period as integer) as year, + cast(year1_phonics_pct as numeric) as year1_phonics_pct, + cast(year2_phonics_pct as numeric) as year2_phonics_pct + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_fbit_finance.sql b/pipeline/transform/models/staging/stg_fbit_finance.sql new file mode 100644 index 0000000..cf2833a --- /dev/null +++ b/pipeline/transform/models/staging/stg_fbit_finance.sql @@ -0,0 +1,20 @@ +-- Staging model: Financial benchmarking data from FBIT API + +with source as ( + select * from {{ source('raw', 'fbit_finance') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(year as integer) as year, + cast(per_pupil_spend as numeric) as per_pupil_spend, + cast(staff_cost_pct as numeric) as staff_cost_pct, + cast(teacher_cost_pct as numeric) as teacher_cost_pct, + cast(support_staff_cost_pct as numeric) as support_staff_cost_pct, + cast(premises_cost_pct as numeric) as premises_cost_pct + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_gias_establishments.sql b/pipeline/transform/models/staging/stg_gias_establishments.sql new file mode 100644 index 0000000..43ff77c --- /dev/null +++ b/pipeline/transform/models/staging/stg_gias_establishments.sql @@ -0,0 +1,49 @@ +-- Staging model: GIAS establishments +-- Light cleaning, type casting, column renaming from raw GIAS bulk CSV + +with source as ( + select * from {{ source('raw', 'gias_establishments') }} +), + +renamed as ( + select + cast("URN" as integer) as urn, + cast("LA (code)" as integer) as local_authority_code, + "LA (name)" as local_authority_name, + cast("EstablishmentNumber" as integer) as establishment_number, + "EstablishmentName" as school_name, + "TypeOfEstablishment (name)" as school_type, + "PhaseOfEducation (name)" as phase, + "Gender (name)" as gender, + "ReligiousCharacter (name)" as religious_character, + "AdmissionsPolicy (name)" as admissions_policy, + "SchoolCapacity" as capacity, + cast("NumberOfPupils" as integer) as total_pupils, + "HeadTitle (name)" as head_title, + "HeadFirstName" as head_first_name, + "HeadLastName" as head_last_name, + "TelephoneNum" as telephone, + "SchoolWebsite" as website, + "Street" as address_line1, + "Locality" as address_line2, + "Town" as town, + "County (name)" as county, + "Postcode" as postcode, + "EstablishmentStatus (name)" as status, + cast("OpenDate" as date) as open_date, + cast("CloseDate" as date) as close_date, + "Trusts (name)" as academy_trust_name, + cast("Trusts (code)" as integer) as academy_trust_uid, + "UrbanRural (name)" as urban_rural, + "ParliamentaryConstituency (name)" as parliamentary_constituency, + "NurseryProvision (name)" as nursery_provision, + cast("Easting" as integer) as easting, + cast("Northing" as integer) as northing, + -- Age range + cast("StatutoryLowAge" as integer) as statutory_low_age, + cast("StatutoryHighAge" as integer) as statutory_high_age + from source + where "URN" is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_gias_links.sql b/pipeline/transform/models/staging/stg_gias_links.sql new file mode 100644 index 0000000..f234d27 --- /dev/null +++ b/pipeline/transform/models/staging/stg_gias_links.sql @@ -0,0 +1,18 @@ +-- Staging model: GIAS school links (predecessor/successor chains) + +with source as ( + select * from {{ source('raw', 'gias_links') }} +), + +renamed as ( + select + cast("URN" as integer) as urn, + cast("LinkURN" as integer) as linked_urn, + "LinkType" as link_type, + cast("LinkEstablishedDate" as date) as link_date + from source + where "URN" is not null + and "LinkURN" is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_idaci.sql b/pipeline/transform/models/staging/stg_idaci.sql new file mode 100644 index 0000000..cc36418 --- /dev/null +++ b/pipeline/transform/models/staging/stg_idaci.sql @@ -0,0 +1,15 @@ +-- Staging model: Income Deprivation Affecting Children Index + +with source as ( + select * from {{ source('raw', 'idaci') }} +), + +renamed as ( + select + lsoa_code, + cast(idaci_score as numeric) as idaci_score, + cast(idaci_decile as integer) as idaci_decile + from source +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_ofsted_inspections.sql b/pipeline/transform/models/staging/stg_ofsted_inspections.sql new file mode 100644 index 0000000..70d10b3 --- /dev/null +++ b/pipeline/transform/models/staging/stg_ofsted_inspections.sql @@ -0,0 +1,40 @@ +-- Staging model: Ofsted inspection records +-- Handles both OEIF (pre-Nov 2025) and Report Card (post-Nov 2025) frameworks + +with source as ( + select * from {{ source('raw', 'ofsted_inspections') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(inspection_date as date) as inspection_date, + inspection_type, + event_type_grouping as framework, + + -- OEIF grades (1-4 scale) + cast(overall_effectiveness as integer) as overall_effectiveness, + cast(quality_of_education as integer) as quality_of_education, + cast(behaviour_and_attitudes as integer) as behaviour_attitudes, + cast(personal_development as integer) as personal_development, + cast(effectiveness_of_leadership_and_management as integer) as leadership_management, + cast(early_years_provision as integer) as early_years_provision, + cast(sixth_form_provision as integer) as sixth_form_provision, + + -- Report Card fields (populated for post-Nov 2025 inspections) + rc_safeguarding_met, + rc_inclusion, + rc_curriculum_teaching, + rc_achievement, + rc_attendance_behaviour, + rc_personal_development, + rc_leadership_governance, + rc_early_years, + rc_sixth_form, + + report_url + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/models/staging/stg_parent_view.sql b/pipeline/transform/models/staging/stg_parent_view.sql new file mode 100644 index 0000000..c51e124 --- /dev/null +++ b/pipeline/transform/models/staging/stg_parent_view.sql @@ -0,0 +1,24 @@ +-- Staging model: Ofsted Parent View survey responses + +with source as ( + select * from {{ source('raw', 'parent_view') }} +), + +renamed as ( + select + cast(urn as integer) as urn, + cast(survey_date as date) as survey_date, + cast(total_responses as integer) as total_responses, + cast(q_happy_pct as numeric) as q_happy_pct, + cast(q_safe_pct as numeric) as q_safe_pct, + cast(q_progress_pct as numeric) as q_progress_pct, + cast(q_well_taught_pct as numeric) as q_well_taught_pct, + cast(q_well_led_pct as numeric) as q_well_led_pct, + cast(q_behaviour_pct as numeric) as q_behaviour_pct, + cast(q_bullying_pct as numeric) as q_bullying_pct, + cast(q_recommend_pct as numeric) as q_recommend_pct + from source + where urn is not null +) + +select * from renamed diff --git a/pipeline/transform/profiles.yml b/pipeline/transform/profiles.yml new file mode 100644 index 0000000..6c573fc --- /dev/null +++ b/pipeline/transform/profiles.yml @@ -0,0 +1,22 @@ +school_compare: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('PG_HOST', 'localhost') }}" + port: "{{ env_var('PG_PORT', '5432') | int }}" + user: "{{ env_var('PG_USER', 'postgres') }}" + password: "{{ env_var('PG_PASSWORD', 'postgres') }}" + dbname: "{{ env_var('PG_DATABASE', 'school_compare') }}" + schema: public + threads: 4 + + production: + type: postgres + host: "{{ env_var('PG_HOST') }}" + port: "{{ env_var('PG_PORT') | int }}" + user: "{{ env_var('PG_USER') }}" + password: "{{ env_var('PG_PASSWORD') }}" + dbname: "{{ env_var('PG_DATABASE') }}" + schema: public + threads: 4 diff --git a/pipeline/transform/seeds/la_code_names.csv b/pipeline/transform/seeds/la_code_names.csv new file mode 100644 index 0000000..698a0cb --- /dev/null +++ b/pipeline/transform/seeds/la_code_names.csv @@ -0,0 +1,66 @@ +la_code,la_name +201,City of London +202,Camden +203,Greenwich +204,Hackney +205,Hammersmith and Fulham +206,Islington +207,Kensington and Chelsea +208,Lambeth +209,Lewisham +210,Merton +211,Newham +212,Tower Hamlets +213,Wandsworth +214,Westminster +301,Barking and Dagenham +302,Barnet +303,Bexley +304,Brent +305,Bromley +306,Croydon +307,Ealing +308,Enfield +309,Haringey +310,Harrow +311,Havering +312,Hillingdon +313,Hounslow +314,Kingston upon Thames +315,Redbridge +316,Richmond upon Thames +317,Sutton +318,Waltham Forest +319,City of London +320,City of London +330,Birmingham +331,Coventry +332,Dudley +333,Sandwell +334,Solihull +335,Walsall +336,Wolverhampton +340,Knowsley +341,Liverpool +342,St Helens +343,Sefton +344,Wirral +350,Bolton +351,Bury +352,Manchester +353,Oldham +354,Rochdale +355,Salford +356,Stockport +357,Tameside +358,Trafford +359,Wigan +370,Barnsley +371,Doncaster +372,Rotherham +373,Sheffield +380,Bradford +381,Calderdale +382,Kirklees +383,Leeds +384,Wakefield diff --git a/pipeline/transform/seeds/school_type_codes.csv b/pipeline/transform/seeds/school_type_codes.csv new file mode 100644 index 0000000..8a12a49 --- /dev/null +++ b/pipeline/transform/seeds/school_type_codes.csv @@ -0,0 +1,30 @@ +type_code,type_name,type_group +1,Community school,Maintained +2,Voluntary aided school,Maintained +3,Voluntary controlled school,Maintained +5,Foundation school,Maintained +6,City technology college,Independent +7,Community special school,Special +8,Non-maintained special school,Special +10,Other independent school,Independent +11,Other independent special school,Independent +12,Foundation special school,Special +14,Pupil referral unit,PRU +24,Secure unit,Other +25,Offshore school,Other +26,Service children's education,Other +28,Academy sponsor led,Academy +33,Academy special sponsor led,Academy +34,Academy converter,Academy +35,Free schools,Academy +36,Free schools special,Academy +37,British schools overseas,Other +38,Free schools - alternative provision,Academy +39,Free schools - 16-19,Academy +40,University technical college,Academy +41,Studio school,Academy +42,Academy alternative provision converter,Academy +43,Academy alternative provision sponsor led,Academy +44,Academy special converter,Academy +46,Academy 16-19 converter,Academy +47,Academy 16-19 sponsor led,Academy diff --git a/pipeline/transform/tests/assert_no_orphaned_facts.sql b/pipeline/transform/tests/assert_no_orphaned_facts.sql new file mode 100644 index 0000000..ff81c37 --- /dev/null +++ b/pipeline/transform/tests/assert_no_orphaned_facts.sql @@ -0,0 +1,7 @@ +-- Custom test: All fact table URNs should exist in dim_school + +select f.urn +from {{ ref('fact_ks2_performance') }} f +left join {{ ref('dim_school') }} d on f.urn = d.urn +where d.urn is null +limit 10 diff --git a/pipeline/transform/tests/assert_urns_in_uk_bounds.sql b/pipeline/transform/tests/assert_urns_in_uk_bounds.sql new file mode 100644 index 0000000..ad34b13 --- /dev/null +++ b/pipeline/transform/tests/assert_urns_in_uk_bounds.sql @@ -0,0 +1,13 @@ +-- Custom test: All geocoded schools should have coordinates within the UK + +select + urn, + easting, + northing +from {{ ref('dim_location') }} +where easting is not null + and northing is not null + and ( + easting < 0 or easting > 700000 + or northing < 0 or northing > 1300000 + )