diff --git a/docker-compose.portainer.yml b/docker-compose.portainer.yml index 5dabfa6..47401cd 100644 --- a/docker-compose.portainer.yml +++ b/docker-compose.portainer.yml @@ -7,9 +7,7 @@ # ADMIN_API_KEY — Backend admin API key # TYPESENSE_API_KEY — Typesense admin API key # TYPESENSE_SEARCH_KEY — Typesense search-only key (exposed to frontend) -# AIRFLOW_ADMIN_USER — Airflow admin username (password auto-generated, see api-server logs) -# KESTRA_USER — Kestra UI username (optional) -# KESTRA_PASSWORD — Kestra UI password (optional) +# AIRFLOW_ADMIN_USER — Airflow admin username (password auto-generated, see api-server logs) services: @@ -103,87 +101,6 @@ services: retries: 5 start_period: 10s - # ── Kestra — workflow orchestrator (legacy, kept during migration) ──── - kestra: - image: kestra/kestra:latest - container_name: schoolcompare_kestra - command: server standalone - ports: - - "8090:8080" - volumes: - - kestra_storage:/app/storage - environment: - KESTRA_CONFIGURATION: | - datasources: - postgres: - url: jdbc:postgresql://sc_database:5432/kestra - driverClassName: org.postgresql.Driver - username: ${DB_USERNAME} - password: ${DB_PASSWORD} - kestra: - repository: - type: postgres - queue: - type: postgres - storage: - type: local - local: - base-path: /app/storage - depends_on: - sc_database: - condition: service_healthy - networks: - - backend - restart: unless-stopped - healthcheck: - test: ["CMD-SHELL", "curl -sf http://localhost:8081/health | grep -q '\"status\":\"UP\"'"] - interval: 15s - timeout: 10s - retries: 10 - start_period: 60s - - # ── Kestra init (legacy, kept during migration) ────────────────────── - kestra-init: - image: privaterepo.sitaru.org/tudor/school_compare-kestra-init:latest - container_name: schoolcompare_kestra_init - environment: - KESTRA_URL: http://kestra:8080 - KESTRA_USER: ${KESTRA_USER:-} - KESTRA_PASSWORD: ${KESTRA_PASSWORD:-} - depends_on: - kestra: - condition: service_healthy - networks: - - backend - restart: "no" - - # ── Data integrator (legacy, kept during migration) ────────────────── - integrator: - image: privaterepo.sitaru.org/tudor/school_compare-integrator:latest - container_name: schoolcompare_integrator - ports: - - "8001:8001" - environment: - DATABASE_URL: postgresql://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME} - DATA_DIR: /data - BACKEND_URL: http://backend:80 - ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme} - PYTHONUNBUFFERED: 1 - volumes: - - supplementary_data:/data - depends_on: - sc_database: - condition: service_healthy - networks: - - backend - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8001/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 15s - # ── Airflow API Server + UI ─────────────────────────────────────────── airflow-api-server: image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest @@ -282,7 +199,5 @@ networks: volumes: postgres_data: - kestra_storage: - supplementary_data: typesense_data: airflow_logs: diff --git a/integrator/Dockerfile b/integrator/Dockerfile deleted file mode 100644 index 99f721e..0000000 --- a/integrator/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM python:3.12-slim - -WORKDIR /app - -# Install dependencies -COPY requirements.txt . -RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/* - -RUN pip install --no-cache-dir -r requirements.txt - -# Copy application code -COPY scripts/ ./scripts/ -COPY server.py . - -CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8001"] diff --git a/integrator/Dockerfile.init b/integrator/Dockerfile.init deleted file mode 100644 index ad248bf..0000000 --- a/integrator/Dockerfile.init +++ /dev/null @@ -1,6 +0,0 @@ -FROM alpine:3.19 -RUN apk add --no-cache curl -COPY flows/ /flows/ -COPY docker/kestra-init.sh /kestra-init.sh -RUN chmod +x /kestra-init.sh -CMD ["/kestra-init.sh"] diff --git a/integrator/docker/kestra-init.sh b/integrator/docker/kestra-init.sh deleted file mode 100644 index 91885c8..0000000 --- a/integrator/docker/kestra-init.sh +++ /dev/null @@ -1,59 +0,0 @@ -#!/bin/sh -set -e - -KESTRA_URL="${KESTRA_URL:-http://kestra:8080}" -MAX_WAIT=120 - -# Basic auth — set KESTRA_USER / KESTRA_PASSWORD if authentication is enabled -AUTH="" -if [ -n "$KESTRA_USER" ] && [ -n "$KESTRA_PASSWORD" ]; then - AUTH="-u ${KESTRA_USER}:${KESTRA_PASSWORD}" -fi - -echo "Waiting for Kestra API at ${KESTRA_URL}..." -elapsed=0 -until curl -sf $AUTH "${KESTRA_URL}/api/v1/flows/search" > /dev/null 2>&1; do - if [ "$elapsed" -ge "$MAX_WAIT" ]; then - echo "ERROR: Kestra API not reachable after ${MAX_WAIT}s" - exit 1 - fi - sleep 5 - elapsed=$((elapsed + 5)) -done -echo "Kestra API is ready." - -echo "Importing flows..." - -for f in /flows/*.yml; do - name="$(basename "$f")" - echo " -> $name" - - http_code=$(curl -s $AUTH -o /tmp/kestra_resp -w "%{http_code}" \ - -X POST "${KESTRA_URL}/api/v1/flows" \ - -H "Content-Type: application/x-yaml" \ - --data-binary "@${f}") - - if [ "$http_code" = "200" ] || [ "$http_code" = "201" ]; then - echo " created" - elif [ "$http_code" = "409" ]; then - ns=$(grep '^namespace:' "$f" | awk '{print $2}') - id=$(grep '^id:' "$f" | awk '{print $2}') - http_code2=$(curl -s $AUTH -o /tmp/kestra_resp -w "%{http_code}" \ - -X PUT "${KESTRA_URL}/api/v1/flows/${ns}/${id}" \ - -H "Content-Type: application/x-yaml" \ - --data-binary "@${f}") - if [ "$http_code2" = "200" ] || [ "$http_code2" = "201" ]; then - echo " updated" - else - echo " ERROR updating $name: HTTP $http_code2" - cat /tmp/kestra_resp; echo - exit 1 - fi - else - echo " ERROR importing $name: HTTP $http_code" - cat /tmp/kestra_resp; echo - exit 1 - fi -done - -echo "All flows imported." diff --git a/integrator/flows/admissions.yml b/integrator/flows/admissions.yml deleted file mode 100644 index 7e61594..0000000 --- a/integrator/flows/admissions.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: admissions-annual-update -namespace: schoolcompare.data -description: Download and load school admissions data via EES API - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 4 1 7 *" # 1 July annually at 04:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/admissions?action=download - method: POST - timeout: PT20M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/admissions?action=load - method: POST - timeout: PT30M - -retry: - type: constant - maxAttempts: 3 - interval: PT15M diff --git a/integrator/flows/census.yml b/integrator/flows/census.yml deleted file mode 100644 index d051b63..0000000 --- a/integrator/flows/census.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: census-annual-update -namespace: schoolcompare.data -description: Download and load School Census (SPC) data via EES API - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 4 1 9 *" # 1 September annually at 04:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/census?action=download - method: POST - timeout: PT20M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/census?action=load - method: POST - timeout: PT30M - -retry: - type: constant - maxAttempts: 3 - interval: PT15M diff --git a/integrator/flows/finance.yml b/integrator/flows/finance.yml deleted file mode 100644 index a7c4c8a..0000000 --- a/integrator/flows/finance.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: finance-annual-update -namespace: schoolcompare.data -description: Fetch FBIT financial benchmarking data from DfE API for all schools - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 4 1 12 *" # 1 December annually at 04:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/finance?action=download - method: POST - timeout: PT120M # Fetches per-school from API — ~20k schools - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/finance?action=load - method: POST - timeout: PT30M - -retry: - type: constant - maxAttempts: 2 - interval: PT30M diff --git a/integrator/flows/gias.yml b/integrator/flows/gias.yml deleted file mode 100644 index 847a2ea..0000000 --- a/integrator/flows/gias.yml +++ /dev/null @@ -1,31 +0,0 @@ -id: gias-weekly-update -namespace: schoolcompare.data -description: Download and load GIAS (Get Information About Schools) bulk CSV - -triggers: - - id: weekly-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 3 * * 0" # Every Sunday at 03:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/gias?action=download - method: POST - timeout: PT30M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/gias?action=load - method: POST - timeout: PT30M - -errors: - - id: notify-failure - type: io.kestra.plugin.core.log.Log - message: "GIAS update FAILED: {{ error.message }}" - -retry: - type: constant - maxAttempts: 3 - interval: PT10M diff --git a/integrator/flows/idaci.yml b/integrator/flows/idaci.yml deleted file mode 100644 index 9cc9be1..0000000 --- a/integrator/flows/idaci.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: idaci-annual-check -namespace: schoolcompare.data -description: Download IoD2019 IDACI file and compute deprivation scores for all schools - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 5 1 1 *" # 1 January annually at 05:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/idaci?action=download - method: POST - timeout: PT10M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/idaci?action=load - method: POST - timeout: PT60M - -retry: - type: constant - maxAttempts: 2 - interval: PT30M diff --git a/integrator/flows/ks2.yml b/integrator/flows/ks2.yml deleted file mode 100644 index d2b2178..0000000 --- a/integrator/flows/ks2.yml +++ /dev/null @@ -1,23 +0,0 @@ -id: ks2-reimport -namespace: schoolcompare.data -description: Re-import KS2 attainment data from bundled CSV files (use after DB wipe) - -# No scheduled trigger — run manually from the Kestra UI when needed. - -tasks: - - id: reimport - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/ks2?action=load - method: POST - allowFailed: false - timeout: PT30S # fire-and-forget; backend runs migration in background - -errors: - - id: notify-failure - type: io.kestra.plugin.core.log.Log - message: "KS2 re-import FAILED: {{ error.message }}" - -retry: - type: constant - maxAttempts: 2 - interval: PT5M diff --git a/integrator/flows/ofsted.yml b/integrator/flows/ofsted.yml deleted file mode 100644 index 07bd639..0000000 --- a/integrator/flows/ofsted.yml +++ /dev/null @@ -1,33 +0,0 @@ -id: ofsted-monthly-update -namespace: schoolcompare.data -description: Download and load Ofsted Monthly Management Information CSV - -triggers: - - id: monthly-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 2 1 * *" # 1st of each month at 02:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/ofsted?action=download - method: POST - allowFailed: false - timeout: PT10M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/ofsted?action=load - method: POST - allowFailed: false - timeout: PT30M - -errors: - - id: notify-failure - type: io.kestra.plugin.core.log.Log - message: "Ofsted update FAILED: {{ error.message }}" - -retry: - type: constant - maxAttempts: 3 - interval: PT10M diff --git a/integrator/flows/parent_view.yml b/integrator/flows/parent_view.yml deleted file mode 100644 index f555512..0000000 --- a/integrator/flows/parent_view.yml +++ /dev/null @@ -1,31 +0,0 @@ -id: parent-view-monthly-check -namespace: schoolcompare.data -description: Download and load Ofsted Parent View open data (released ~3x/year) - -triggers: - - id: monthly-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 3 1 * *" # 1st of each month at 03:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/parent_view?action=download - method: POST - timeout: PT10M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/parent_view?action=load - method: POST - timeout: PT20M - -errors: - - id: notify-failure - type: io.kestra.plugin.core.log.Log - message: "Parent View update FAILED: {{ error.message }}" - -retry: - type: constant - maxAttempts: 3 - interval: PT10M diff --git a/integrator/flows/phonics.yml b/integrator/flows/phonics.yml deleted file mode 100644 index 59ba5ae..0000000 --- a/integrator/flows/phonics.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: phonics-annual-update -namespace: schoolcompare.data -description: Download and load Phonics Screening Check data via EES API - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 5 1 9 *" # 1 September annually at 05:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/phonics?action=download - method: POST - timeout: PT20M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/phonics?action=load - method: POST - timeout: PT30M - -retry: - type: constant - maxAttempts: 3 - interval: PT15M diff --git a/integrator/flows/sen_detail.yml b/integrator/flows/sen_detail.yml deleted file mode 100644 index a4e6dae..0000000 --- a/integrator/flows/sen_detail.yml +++ /dev/null @@ -1,26 +0,0 @@ -id: sen-detail-annual-update -namespace: schoolcompare.data -description: Download and load SEN primary need breakdown via EES API - -triggers: - - id: annual-schedule - type: io.kestra.plugin.core.trigger.Schedule - cron: "0 4 15 9 *" # 15 September annually at 04:00 - -tasks: - - id: download - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/sen_detail?action=download - method: POST - timeout: PT20M - - - id: load - type: io.kestra.plugin.core.http.Request - uri: http://integrator:8001/run/sen_detail?action=load - method: POST - timeout: PT30M - -retry: - type: constant - maxAttempts: 3 - interval: PT15M diff --git a/integrator/requirements.txt b/integrator/requirements.txt deleted file mode 100644 index 676ff15..0000000 --- a/integrator/requirements.txt +++ /dev/null @@ -1,7 +0,0 @@ -fastapi==0.115.0 -uvicorn[standard]==0.30.6 -requests==2.32.3 -pandas==2.2.3 -openpyxl==3.1.5 -psycopg2-binary==2.9.9 -sqlalchemy==2.0.35 diff --git a/integrator/scripts/config.py b/integrator/scripts/config.py deleted file mode 100644 index c2684ad..0000000 --- a/integrator/scripts/config.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Configuration for the data integrator.""" -import os -from pathlib import Path - -DATABASE_URL = os.environ.get( - "DATABASE_URL", - "postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare", -) - -DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) -SUPPLEMENTARY_DIR = DATA_DIR / "supplementary" - -BACKEND_URL = os.environ.get("BACKEND_URL", "http://backend:80") -ADMIN_API_KEY = os.environ.get("ADMIN_API_KEY", "changeme") diff --git a/integrator/scripts/db.py b/integrator/scripts/db.py deleted file mode 100644 index 2e89b32..0000000 --- a/integrator/scripts/db.py +++ /dev/null @@ -1,23 +0,0 @@ -"""Database connection for the integrator.""" -from contextlib import contextmanager - -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -from config import DATABASE_URL - -engine = create_engine(DATABASE_URL, pool_pre_ping=True) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - - -@contextmanager -def get_session(): - session = SessionLocal() - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - session.close() diff --git a/integrator/scripts/sources/__init__.py b/integrator/scripts/sources/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/integrator/scripts/sources/admissions.py b/integrator/scripts/sources/admissions.py deleted file mode 100644 index 159839e..0000000 --- a/integrator/scripts/sources/admissions.py +++ /dev/null @@ -1,184 +0,0 @@ -""" -School Admissions data downloader and loader. - -Source: EES publication "primary-and-secondary-school-applications-and-offers" - Content API release ZIP → supporting-files/AppsandOffers_*_SchoolLevel*.csv -Update: Annual (June/July post-offer round) -""" -import argparse -import re -import sys -from pathlib import Path - -import pandas as pd - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session -from sources.ees import download_release_zip_csv - -DEST_DIR = SUPPLEMENTARY_DIR / "admissions" -PUBLICATION_SLUG = "primary-and-secondary-school-applications-and-offers" - -NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", "Z", ""} - -# Maps actual CSV column names → internal field names -COLUMN_MAP = { - # School identifier - "school_urn": "urn", - # Year — e.g. 202526 → 2025 - "time_period": "time_period_raw", - # PAN (places offered) - "total_number_places_offered": "pan", - # Applications (total times put as any preference) - "times_put_as_any_preferred_school": "total_applications", - # 1st-preference applications - "times_put_as_1st_preference": "times_1st_pref", - # 1st-preference offers - "number_1st_preference_offers": "offers_1st_pref", -} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "admissions") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - dest_file = dest / "admissions_school_level_latest.csv" - return download_release_zip_csv( - PUBLICATION_SLUG, - dest_file, - zip_member_keyword="schoollevel", - ) - - -def _parse_int(val) -> int | None: - if pd.isna(val): - return None - s = str(val).strip().upper().replace(",", "") - if s in NULL_VALUES: - return None - try: - return int(float(s)) - except ValueError: - return None - - -def _parse_pct(val) -> float | None: - if pd.isna(val): - return None - s = str(val).strip().upper().replace("%", "") - if s in NULL_VALUES: - return None - try: - return float(s) - except ValueError: - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "admissions") if data_dir else DEST_DIR - files = sorted(dest.glob("*.csv")) - if not files: - raise FileNotFoundError(f"No admissions CSV found in {dest}") - path = files[-1] - - print(f" Admissions: loading {path} ...") - df = pd.read_csv(path, encoding="utf-8-sig", low_memory=False) - - # Rename columns we care about - df.rename(columns=COLUMN_MAP, inplace=True) - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - # Filter to primary schools only - if "school_phase" in df.columns: - df = df[df["school_phase"].str.lower() == "primary"] - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - # Derive year from time_period (e.g. 202526 → 2025) - def _extract_year(val) -> int | None: - s = str(val).strip() - m = re.match(r"(\d{4})\d{2}", s) - if m: - return int(m.group(1)) - m2 = re.search(r"20(\d{2})", s) - if m2: - return int("20" + m2.group(1)) - return None - - if "time_period_raw" in df.columns: - df["year"] = df["time_period_raw"].apply(_extract_year) - else: - year_m = re.search(r"20(\d{2})", path.stem) - df["year"] = int("20" + year_m.group(1)) if year_m else None - - df = df.dropna(subset=["year"]) - df["year"] = df["year"].astype(int) - - # Keep most recent year per school (file may contain multiple years) - df = df.sort_values("year", ascending=False).groupby("urn").first().reset_index() - - inserted = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - year = int(row["year"]) - - pan = _parse_int(row.get("pan")) - total_apps = _parse_int(row.get("total_applications")) - times_1st = _parse_int(row.get("times_1st_pref")) - offers_1st = _parse_int(row.get("offers_1st_pref")) - - # % of 1st-preference applicants who received an offer - if times_1st and times_1st > 0 and offers_1st is not None: - pct_1st = round(offers_1st / times_1st * 100, 1) - else: - pct_1st = None - - oversubscribed = ( - True if (pan and times_1st and times_1st > pan) else - False if (pan and times_1st and times_1st <= pan) else - None - ) - - session.execute( - text(""" - INSERT INTO school_admissions - (urn, year, published_admission_number, total_applications, - first_preference_offers_pct, oversubscribed) - VALUES (:urn, :year, :pan, :total_apps, :pct_1st, :oversubscribed) - ON CONFLICT (urn, year) DO UPDATE SET - published_admission_number = EXCLUDED.published_admission_number, - total_applications = EXCLUDED.total_applications, - first_preference_offers_pct = EXCLUDED.first_preference_offers_pct, - oversubscribed = EXCLUDED.oversubscribed - """), - { - "urn": urn, "year": year, "pan": pan, - "total_apps": total_apps, "pct_1st": pct_1st, - "oversubscribed": oversubscribed, - }, - ) - inserted += 1 - if inserted % 5000 == 0: - session.flush() - print(f" Processed {inserted} records...") - - print(f" Admissions: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/census.py b/integrator/scripts/sources/census.py deleted file mode 100644 index a30c9e2..0000000 --- a/integrator/scripts/sources/census.py +++ /dev/null @@ -1,148 +0,0 @@ -""" -School Census (SPC) downloader and loader. - -Source: EES publication "schools-pupils-and-their-characteristics" -Update: Annual (June) -Adds: class_size_avg, ethnicity breakdown by school -""" -import argparse -import re -import sys -from pathlib import Path - -import pandas as pd - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session -from sources.ees import get_latest_csv_url, download_csv - -DEST_DIR = SUPPLEMENTARY_DIR / "census" -PUBLICATION_SLUG = "schools-pupils-and-their-characteristics" - -NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", ""} - -COLUMN_MAP = { - "URN": "urn", - "urn": "urn", - "YEAR": "year", - "Year": "year", - # Class size - "average_class_size": "class_size_avg", - "AVCLAS": "class_size_avg", - "avg_class_size": "class_size_avg", - # Ethnicity — DfE uses ethnicity major group percentages - "perc_white": "ethnicity_white_pct", - "perc_asian": "ethnicity_asian_pct", - "perc_black": "ethnicity_black_pct", - "perc_mixed": "ethnicity_mixed_pct", - "perc_other_ethnic": "ethnicity_other_pct", - "PTWHITE": "ethnicity_white_pct", - "PTASIAN": "ethnicity_asian_pct", - "PTBLACK": "ethnicity_black_pct", - "PTMIXED": "ethnicity_mixed_pct", - "PTOTHER": "ethnicity_other_pct", -} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "census") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school") - if not url: - raise RuntimeError(f"Could not find CSV URL for census publication") - - filename = url.split("/")[-1].split("?")[0] or "census_latest.csv" - return download_csv(url, dest / filename) - - -def _parse_pct(val) -> float | None: - if pd.isna(val): - return None - s = str(val).strip().upper().replace("%", "") - if s in NULL_VALUES: - return None - try: - return float(s) - except ValueError: - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "census") if data_dir else DEST_DIR - files = sorted(dest.glob("*.csv")) - if not files: - raise FileNotFoundError(f"No census CSV found in {dest}") - path = files[-1] - - print(f" Census: loading {path} ...") - df = pd.read_csv(path, encoding="latin-1", low_memory=False) - df.rename(columns=COLUMN_MAP, inplace=True) - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - year = None - m = re.search(r"20(\d{2})", path.stem) - if m: - year = int("20" + m.group(1)) - - inserted = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year - if not row_year: - continue - - session.execute( - text(""" - INSERT INTO school_census - (urn, year, class_size_avg, - ethnicity_white_pct, ethnicity_asian_pct, ethnicity_black_pct, - ethnicity_mixed_pct, ethnicity_other_pct) - VALUES (:urn, :year, :class_size_avg, - :white, :asian, :black, :mixed, :other) - ON CONFLICT (urn, year) DO UPDATE SET - class_size_avg = EXCLUDED.class_size_avg, - ethnicity_white_pct = EXCLUDED.ethnicity_white_pct, - ethnicity_asian_pct = EXCLUDED.ethnicity_asian_pct, - ethnicity_black_pct = EXCLUDED.ethnicity_black_pct, - ethnicity_mixed_pct = EXCLUDED.ethnicity_mixed_pct, - ethnicity_other_pct = EXCLUDED.ethnicity_other_pct - """), - { - "urn": urn, - "year": row_year, - "class_size_avg": _parse_pct(row.get("class_size_avg")), - "white": _parse_pct(row.get("ethnicity_white_pct")), - "asian": _parse_pct(row.get("ethnicity_asian_pct")), - "black": _parse_pct(row.get("ethnicity_black_pct")), - "mixed": _parse_pct(row.get("ethnicity_mixed_pct")), - "other": _parse_pct(row.get("ethnicity_other_pct")), - }, - ) - inserted += 1 - if inserted % 5000 == 0: - session.flush() - - print(f" Census: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/ees.py b/integrator/scripts/sources/ees.py deleted file mode 100644 index a614466..0000000 --- a/integrator/scripts/sources/ees.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Shared EES (Explore Education Statistics) API client. - -Two APIs are available: - - Statistics API: https://api.education.gov.uk/statistics/v1 (only ~13 publications) - - Content API: https://content.explore-education-statistics.service.gov.uk/api - Covers all publications; use this for admissions and other data not in the stats API. - Download all files for a release as a ZIP from /api/releases/{id}/files. -""" -import io -import zipfile -from pathlib import Path -from typing import Optional - -import requests - -STATS_API_BASE = "https://api.education.gov.uk/statistics/v1" -CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api" -TIMEOUT = 60 - - -def get_publication_files(publication_slug: str) -> list[dict]: - """Return list of data-set file descriptors for a publication (statistics API).""" - url = f"{STATS_API_BASE}/publications/{publication_slug}/data-set-files" - resp = requests.get(url, timeout=TIMEOUT) - resp.raise_for_status() - return resp.json().get("results", []) - - -def get_latest_csv_url(publication_slug: str, keyword: str = "") -> Optional[str]: - """ - Find the most recent CSV download URL for a publication (statistics API). - Optionally filter by a keyword in the file name. - """ - files = get_publication_files(publication_slug) - for entry in files: - name = entry.get("name", "").lower() - if keyword and keyword.lower() not in name: - continue - csv_url = entry.get("csvDownloadUrl") or entry.get("file", {}).get("url") - if csv_url: - return csv_url - return None - - -def get_content_release_id(publication_slug: str) -> str: - """Return the latest release ID for a publication via the content API.""" - url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases/latest" - resp = requests.get(url, timeout=TIMEOUT) - resp.raise_for_status() - return resp.json()["id"] - - -def download_release_zip_csv( - publication_slug: str, - dest_path: Path, - zip_member_keyword: str = "", -) -> Path: - """ - Download the full-release ZIP from the EES content API and extract one CSV. - - If zip_member_keyword is given, the first member whose path contains that - keyword (case-insensitive) is extracted; otherwise the first .csv found is used. - Returns dest_path (the extracted CSV file). - """ - if dest_path.exists(): - print(f" EES: {dest_path.name} already exists, skipping.") - return dest_path - - release_id = get_content_release_id(publication_slug) - zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files" - print(f" EES: downloading release ZIP for '{publication_slug}' ...") - resp = requests.get(zip_url, timeout=300, stream=True) - resp.raise_for_status() - - data = b"".join(resp.iter_content(chunk_size=65536)) - with zipfile.ZipFile(io.BytesIO(data)) as z: - members = z.namelist() - target = None - kw = zip_member_keyword.lower() - for m in members: - if m.endswith(".csv") and (not kw or kw in m.lower()): - target = m - break - if not target: - raise ValueError( - f"No CSV matching '{zip_member_keyword}' in ZIP. Members: {members}" - ) - print(f" EES: extracting '{target}' ...") - dest_path.parent.mkdir(parents=True, exist_ok=True) - with z.open(target) as src, open(dest_path, "wb") as dst: - dst.write(src.read()) - - print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)") - return dest_path - - -def download_csv(url: str, dest_path: Path) -> Path: - """Download a CSV from EES to dest_path.""" - if dest_path.exists(): - print(f" EES: {dest_path.name} already exists, skipping.") - return dest_path - print(f" EES: downloading {url} ...") - resp = requests.get(url, timeout=300, stream=True) - resp.raise_for_status() - dest_path.parent.mkdir(parents=True, exist_ok=True) - with open(dest_path, "wb") as f: - for chunk in resp.iter_content(chunk_size=65536): - f.write(chunk) - print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)") - return dest_path diff --git a/integrator/scripts/sources/finance.py b/integrator/scripts/sources/finance.py deleted file mode 100644 index 11bc2d8..0000000 --- a/integrator/scripts/sources/finance.py +++ /dev/null @@ -1,143 +0,0 @@ -""" -FBIT (Financial Benchmarking and Insights Tool) financial data loader. - -Source: https://schools-financial-benchmarking.service.gov.uk/api/ -Update: Annual (December — data for the prior financial year) -""" -import argparse -import sys -import time -from pathlib import Path - -import pandas as pd -import requests - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session - -DEST_DIR = SUPPLEMENTARY_DIR / "finance" -API_BASE = "https://schools-financial-benchmarking.service.gov.uk/api" -RATE_LIMIT_DELAY = 0.1 # seconds between requests - - -def download(data_dir: Path | None = None) -> Path: - """ - Fetch per-URN financial data from FBIT API and save as CSV. - Batches all school URNs from the database. - """ - dest = (data_dir / "supplementary" / "finance") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - # Determine year from API (use current year minus 1 for completed financials) - from datetime import date - year = date.today().year - 1 - dest_file = dest / f"fbit_{year}.csv" - - if dest_file.exists(): - print(f" Finance: {dest_file.name} already exists, skipping download.") - return dest_file - - # Get all URNs from the database - with get_session() as session: - from sqlalchemy import text - rows = session.execute(text("SELECT urn FROM schools")).fetchall() - urns = [r[0] for r in rows] - print(f" Finance: fetching FBIT data for {len(urns)} schools (year {year}) ...") - - records = [] - errors = 0 - for i, urn in enumerate(urns): - if i % 500 == 0: - print(f" {i}/{len(urns)} ...") - try: - resp = requests.get( - f"{API_BASE}/schoolFinancialDataObject/{urn}", - timeout=10, - ) - if resp.status_code == 200: - data = resp.json() - if data: - records.append({ - "urn": urn, - "year": year, - "per_pupil_spend": data.get("totalExpenditure") and - data.get("numberOfPupils") and - round(data["totalExpenditure"] / data["numberOfPupils"], 2), - "staff_cost_pct": data.get("staffCostPercent"), - "teacher_cost_pct": data.get("teachingStaffCostPercent"), - "support_staff_cost_pct": data.get("educationSupportStaffCostPercent"), - "premises_cost_pct": data.get("premisesStaffCostPercent"), - }) - elif resp.status_code not in (404, 400): - errors += 1 - except Exception: - errors += 1 - - time.sleep(RATE_LIMIT_DELAY) - - df = pd.DataFrame(records) - df.to_csv(dest_file, index=False) - print(f" Finance: saved {len(records)} records to {dest_file} ({errors} errors)") - return dest_file - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "finance") if data_dir else DEST_DIR - files = sorted(dest.glob("fbit_*.csv")) - if not files: - raise FileNotFoundError(f"No finance CSV found in {dest}") - path = files[-1] - - print(f" Finance: loading {path} ...") - df = pd.read_csv(path) - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - inserted = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - session.execute( - text(""" - INSERT INTO school_finance - (urn, year, per_pupil_spend, staff_cost_pct, teacher_cost_pct, - support_staff_cost_pct, premises_cost_pct) - VALUES (:urn, :year, :per_pupil, :staff, :teacher, :support, :premises) - ON CONFLICT (urn, year) DO UPDATE SET - per_pupil_spend = EXCLUDED.per_pupil_spend, - staff_cost_pct = EXCLUDED.staff_cost_pct, - teacher_cost_pct = EXCLUDED.teacher_cost_pct, - support_staff_cost_pct = EXCLUDED.support_staff_cost_pct, - premises_cost_pct = EXCLUDED.premises_cost_pct - """), - { - "urn": int(row["urn"]), - "year": int(row["year"]), - "per_pupil": float(row["per_pupil_spend"]) if pd.notna(row.get("per_pupil_spend")) else None, - "staff": float(row["staff_cost_pct"]) if pd.notna(row.get("staff_cost_pct")) else None, - "teacher": float(row["teacher_cost_pct"]) if pd.notna(row.get("teacher_cost_pct")) else None, - "support": float(row["support_staff_cost_pct"]) if pd.notna(row.get("support_staff_cost_pct")) else None, - "premises": float(row["premises_cost_pct"]) if pd.notna(row.get("premises_cost_pct")) else None, - }, - ) - inserted += 1 - if inserted % 2000 == 0: - session.flush() - - print(f" Finance: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/gias.py b/integrator/scripts/sources/gias.py deleted file mode 100644 index 19b232c..0000000 --- a/integrator/scripts/sources/gias.py +++ /dev/null @@ -1,159 +0,0 @@ -""" -GIAS (Get Information About Schools) bulk CSV downloader and loader. - -Source: https://get-information-schools.service.gov.uk/Downloads -Update: Daily; we refresh weekly. -Adds: website, headteacher_name, capacity, trust_name, trust_uid, gender, nursery_provision -""" -import argparse -import sys -from datetime import date -from pathlib import Path - -import pandas as pd -import requests - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session - -DEST_DIR = SUPPLEMENTARY_DIR / "gias" - -# GIAS bulk download URL — date is injected at runtime -GIAS_URL_TEMPLATE = "https://ea-edubase-api-prod.azurewebsites.net/edubase/downloads/public/edubasealldata{date}.csv" - -COLUMN_MAP = { - "URN": "urn", - "SchoolWebsite": "website", - "SchoolCapacity": "capacity", - "TrustName": "trust_name", - "TrustUID": "trust_uid", - "Gender (name)": "gender", - "NurseryProvision (name)": "nursery_provision_raw", - "HeadTitle": "head_title", - "HeadFirstName": "head_first", - "HeadLastName": "head_last", -} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "gias") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - today = date.today().strftime("%Y%m%d") - url = GIAS_URL_TEMPLATE.format(date=today) - filename = f"gias_{today}.csv" - dest_file = dest / filename - - if dest_file.exists(): - print(f" GIAS: {filename} already exists, skipping download.") - return dest_file - - print(f" GIAS: downloading {url} ...") - resp = requests.get(url, timeout=300, stream=True) - - # GIAS may not have today's file yet — fall back to yesterday - if resp.status_code == 404: - from datetime import timedelta - yesterday = (date.today() - timedelta(days=1)).strftime("%Y%m%d") - url = GIAS_URL_TEMPLATE.format(date=yesterday) - filename = f"gias_{yesterday}.csv" - dest_file = dest / filename - if dest_file.exists(): - print(f" GIAS: {filename} already exists, skipping download.") - return dest_file - resp = requests.get(url, timeout=300, stream=True) - - resp.raise_for_status() - with open(dest_file, "wb") as f: - for chunk in resp.iter_content(chunk_size=65536): - f.write(chunk) - - print(f" GIAS: saved {dest_file} ({dest_file.stat().st_size // 1024} KB)") - return dest_file - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "gias") if data_dir else DEST_DIR - files = sorted(dest.glob("gias_*.csv")) - if not files: - raise FileNotFoundError(f"No GIAS CSV found in {dest}") - path = files[-1] - - print(f" GIAS: loading {path} ...") - df = pd.read_csv(path, encoding="latin-1", low_memory=False) - df.rename(columns=COLUMN_MAP, inplace=True) - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - # Build headteacher_name from parts - def build_name(row): - parts = [ - str(row.get("head_title", "") or "").strip(), - str(row.get("head_first", "") or "").strip(), - str(row.get("head_last", "") or "").strip(), - ] - return " ".join(p for p in parts if p) or None - - df["headteacher_name"] = df.apply(build_name, axis=1) - df["nursery_provision"] = df.get("nursery_provision_raw", pd.Series()).apply( - lambda v: True if str(v).strip().lower().startswith("has") else False if pd.notna(v) else None - ) - - def clean_str(val): - s = str(val).strip() if pd.notna(val) else None - return s if s and s.lower() not in ("nan", "none", "") else None - - updated = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - session.execute( - text(""" - UPDATE schools SET - website = :website, - headteacher_name = :headteacher_name, - capacity = :capacity, - trust_name = :trust_name, - trust_uid = :trust_uid, - gender = :gender, - nursery_provision = :nursery_provision - WHERE urn = :urn - """), - { - "urn": urn, - "website": clean_str(row.get("website")), - "headteacher_name": row.get("headteacher_name"), - "capacity": int(row["capacity"]) if pd.notna(row.get("capacity")) and str(row.get("capacity")).strip().isdigit() else None, - "trust_name": clean_str(row.get("trust_name")), - "trust_uid": clean_str(row.get("trust_uid")), - "gender": clean_str(row.get("gender")), - "nursery_provision": row.get("nursery_provision"), - }, - ) - updated += 1 - if updated % 5000 == 0: - session.flush() - print(f" Updated {updated} schools...") - - print(f" GIAS: updated {updated} school records") - return {"inserted": 0, "updated": updated, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - - if args.action in ("download", "all"): - path = download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/idaci.py b/integrator/scripts/sources/idaci.py deleted file mode 100644 index bbcd199..0000000 --- a/integrator/scripts/sources/idaci.py +++ /dev/null @@ -1,176 +0,0 @@ -""" -IDACI (Income Deprivation Affecting Children Index) loader. - -Source: English Indices of Deprivation 2019 -https://www.gov.uk/government/statistics/english-indices-of-deprivation-2019 - -This is a one-time download (5-yearly release). We join school postcodes to LSOAs -via postcodes.io, then look up IDACI scores from the IoD2019 file. - -Update: ~5-yearly (next release expected 2025/26) -""" -import argparse -import sys -from pathlib import Path - -import pandas as pd -import requests - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session - -DEST_DIR = SUPPLEMENTARY_DIR / "idaci" - -# IoD 2019 supplementary data — "Income Deprivation Affecting Children Index (IDACI)" -IOD_2019_URL = ( - "https://assets.publishing.service.gov.uk/government/uploads/system/uploads/" - "attachment_data/file/833970/File_1_-_IMD2019_Index_of_Multiple_Deprivation.xlsx" -) - -POSTCODES_IO_BATCH = "https://api.postcodes.io/postcodes" -BATCH_SIZE = 100 - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - filename = "iod2019_idaci.xlsx" - dest_file = dest / filename - if dest_file.exists(): - print(f" IDACI: {filename} already exists, skipping download.") - return dest_file - - print(f" IDACI: downloading IoD2019 file ...") - resp = requests.get(IOD_2019_URL, timeout=300, stream=True) - resp.raise_for_status() - with open(dest_file, "wb") as f: - for chunk in resp.iter_content(chunk_size=65536): - f.write(chunk) - - print(f" IDACI: saved {dest_file}") - return dest_file - - -def _postcode_to_lsoa(postcodes: list[str]) -> dict[str, str]: - """Batch-resolve postcodes to LSOA codes via postcodes.io.""" - result = {} - valid = [p.strip().upper() for p in postcodes if p and len(str(p).strip()) >= 5] - valid = list(set(valid)) - - for i in range(0, len(valid), BATCH_SIZE): - batch = valid[i:i + BATCH_SIZE] - try: - resp = requests.post(POSTCODES_IO_BATCH, json={"postcodes": batch}, timeout=30) - if resp.status_code == 200: - for item in resp.json().get("result", []): - if item and item.get("result"): - lsoa = item["result"].get("lsoa") - if lsoa: - result[item["query"].upper()] = lsoa - except Exception as e: - print(f" Warning: postcodes.io batch failed: {e}") - - return result - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR - if path is None: - files = sorted(dest.glob("*.xlsx")) - if not files: - raise FileNotFoundError(f"No IDACI file found in {dest}") - path = files[-1] - - print(f" IDACI: loading IoD2019 from {path} ...") - - # IoD2019 File 1 — sheet "IoD2019 IDACI" or similar - try: - iod_df = pd.read_excel(path, sheet_name=None) - # Find sheet with IDACI data - idaci_sheet = None - for name, df in iod_df.items(): - if "IDACI" in name.upper() or "IDACI" in str(df.columns.tolist()).upper(): - idaci_sheet = name - break - if idaci_sheet is None: - idaci_sheet = list(iod_df.keys())[0] - df_iod = iod_df[idaci_sheet] - except Exception as e: - raise RuntimeError(f"Could not read IoD2019 file: {e}") - - # Normalise column names — IoD2019 uses specific headers - col_lsoa = next((c for c in df_iod.columns if "LSOA" in str(c).upper() and "code" in str(c).lower()), None) - col_score = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "score" in str(c).lower()), None) - col_rank = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "rank" in str(c).lower()), None) - - if not col_lsoa or not col_score: - print(f" IDACI columns available: {list(df_iod.columns)[:20]}") - raise ValueError("Could not find LSOA code or IDACI score columns") - - df_iod = df_iod[[col_lsoa, col_score]].copy() - df_iod.columns = ["lsoa_code", "idaci_score"] - df_iod = df_iod.dropna() - - # Compute decile from rank (or from score distribution) - total = len(df_iod) - df_iod = df_iod.sort_values("idaci_score", ascending=False) - df_iod["idaci_decile"] = (pd.qcut(df_iod["idaci_score"], 10, labels=False) + 1).astype(int) - # Decile 1 = most deprived (highest IDACI score) - df_iod["idaci_decile"] = 11 - df_iod["idaci_decile"] - - lsoa_lookup = df_iod.set_index("lsoa_code")[["idaci_score", "idaci_decile"]].to_dict("index") - print(f" IDACI: loaded {len(lsoa_lookup)} LSOA records") - - # Fetch all school postcodes from the database - with get_session() as session: - from sqlalchemy import text - rows = session.execute(text("SELECT urn, postcode FROM schools WHERE postcode IS NOT NULL")).fetchall() - - postcodes = [r[1] for r in rows] - print(f" IDACI: resolving {len(postcodes)} postcodes via postcodes.io ...") - pc_to_lsoa = _postcode_to_lsoa(postcodes) - print(f" IDACI: resolved {len(pc_to_lsoa)} postcodes to LSOAs") - - inserted = skipped = 0 - with get_session() as session: - from sqlalchemy import text - for urn, postcode in rows: - lsoa = pc_to_lsoa.get(str(postcode).strip().upper()) - if not lsoa: - skipped += 1 - continue - iod = lsoa_lookup.get(lsoa) - if not iod: - skipped += 1 - continue - - session.execute( - text(""" - INSERT INTO school_deprivation (urn, lsoa_code, idaci_score, idaci_decile) - VALUES (:urn, :lsoa, :score, :decile) - ON CONFLICT (urn) DO UPDATE SET - lsoa_code = EXCLUDED.lsoa_code, - idaci_score = EXCLUDED.idaci_score, - idaci_decile = EXCLUDED.idaci_decile - """), - {"urn": urn, "lsoa": lsoa, "score": float(iod["idaci_score"]), "decile": int(iod["idaci_decile"])}, - ) - inserted += 1 - if inserted % 2000 == 0: - session.flush() - - print(f" IDACI: upserted {inserted}, skipped {skipped}") - return {"inserted": inserted, "updated": 0, "skipped": skipped} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/ks2.py b/integrator/scripts/sources/ks2.py deleted file mode 100644 index 7bc2bdc..0000000 --- a/integrator/scripts/sources/ks2.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -KS2 attainment data re-importer. - -Triggers a full re-import of the KS2 CSV data by calling the backend's -admin endpoint. The backend owns the migration logic and CSV column mappings; -this module is a thin trigger so the re-import can be orchestrated via Kestra -like all other data sources. - -The CSV files must already be present in the data volume under - /data/{year}/england_ks2final.csv -(populated at deploy time from the repo's data/ directory). -""" -import requests -from config import BACKEND_URL, ADMIN_API_KEY - -HEADERS = {"X-API-Key": ADMIN_API_KEY} - - -def download(): - """No download step — CSVs are shipped with the repo.""" - print("KS2 CSVs are bundled in the data volume; no download needed.") - return {"skipped": True} - - -def load(): - """Trigger KS2 re-import on the backend and return immediately. - - The migration (including geocoding) runs as a background thread on the - backend and can take up to an hour. Poll GET /api/admin/reimport-ks2/status - to check progress, or simply wait for schools to appear in the UI. - """ - url = f"{BACKEND_URL}/api/admin/reimport-ks2?geocode=true" - print(f"POST {url}") - resp = requests.post(url, headers=HEADERS, timeout=30) - resp.raise_for_status() - result = resp.json() - print(f"Result: {result}") - return result - - -if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - args = parser.parse_args() - if args.action in ("download", "all"): - download() - if args.action in ("load", "all"): - load() diff --git a/integrator/scripts/sources/ofsted.py b/integrator/scripts/sources/ofsted.py deleted file mode 100644 index a04c68a..0000000 --- a/integrator/scripts/sources/ofsted.py +++ /dev/null @@ -1,418 +0,0 @@ -""" -Ofsted Monthly Management Information CSV downloader and loader. - -Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes -Update: Monthly (released ~2 weeks into each month) -""" -import argparse -import re -import sys -from datetime import date, datetime -from pathlib import Path - -import pandas as pd -import requests - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session - -# Current Ofsted MI download URL — update this when Ofsted releases a new file. -# The URL follows a predictable pattern; we attempt to discover it from the GOV.UK page. -GOV_UK_PAGE = "https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes" - -# Column name → internal field, listed in priority order per field. -# First matching column wins; later entries are fallbacks for older file formats. -COLUMN_PRIORITY = { - "urn": ["URN", "Urn", "urn"], - "inspection_date": [ - "Inspection start date of latest OEIF graded inspection", - "Inspection start date", - "Inspection date", - "InspectionDate", - ], - "publication_date": [ - "Publication date of latest OEIF graded inspection", - "Publication date", - "PublicationDate", - ], - "inspection_type": [ - "Inspection type of latest OEIF graded inspection", - "Inspection type", - "InspectionType", - ], - "overall_effectiveness": [ - "Latest OEIF overall effectiveness", - "Overall effectiveness", - "OverallEffectiveness", - ], - "quality_of_education": [ - "Latest OEIF quality of education", - "Quality of education", - "QualityOfEducation", - ], - "behaviour_attitudes": [ - "Latest OEIF behaviour and attitudes", - "Behaviour and attitudes", - "BehaviourAndAttitudes", - ], - "personal_development": [ - "Latest OEIF personal development", - "Personal development", - "PersonalDevelopment", - ], - "leadership_management": [ - "Latest OEIF effectiveness of leadership and management", - "Leadership and management", - "LeadershipAndManagement", - ], - "early_years_provision": [ - "Latest OEIF early years provision (where applicable)", - "Early years provision", - "EarlyYearsProvision", - ], -} - -GRADE_MAP = { - "Outstanding": 1, "1": 1, 1: 1, - "Good": 2, "2": 2, 2: 2, - "Requires improvement": 3, "3": 3, 3: 3, - "Requires Improvement": 3, - "Inadequate": 4, "4": 4, 4: 4, -} - -# Report Card grade text → integer (1=Exceptional … 5=Urgent improvement) -RC_GRADE_MAP = { - "exceptional": 1, - "strong standard": 2, - "strong": 2, - "expected standard": 3, - "expected": 3, - "needs attention": 4, - "urgent improvement": 5, -} - -# Column name priority for Report Card fields (best-guess names; Ofsted may vary) -RC_COLUMN_PRIORITY = { - "rc_safeguarding": [ - "Safeguarding", - "safeguarding", - "Safeguarding standards", - ], - "rc_inclusion": [ - "Inclusion", - "inclusion", - ], - "rc_curriculum_teaching": [ - "Curriculum and teaching", - "curriculum_and_teaching", - "Curriculum & teaching", - ], - "rc_achievement": [ - "Achievement", - "achievement", - ], - "rc_attendance_behaviour": [ - "Attendance and behaviour", - "attendance_and_behaviour", - "Attendance & behaviour", - ], - "rc_personal_development": [ - "Personal development and well-being", - "Personal development and wellbeing", - "personal_development_and_wellbeing", - "Personal development & well-being", - ], - "rc_leadership_governance": [ - "Leadership and governance", - "leadership_and_governance", - "Leadership & governance", - ], - "rc_early_years": [ - "Early years", - "early_years", - "Early years provision", - ], - "rc_sixth_form": [ - "Sixth form", - "sixth_form", - "Sixth form in schools", - ], -} - -DEST_DIR = SUPPLEMENTARY_DIR / "ofsted" - - -def _discover_csv_url() -> str | None: - """Scrape the GOV.UK page for the most recent CSV/ZIP link.""" - try: - resp = requests.get(GOV_UK_PAGE, timeout=30) - resp.raise_for_status() - # Look for links to assets.publishing.service.gov.uk CSV or ZIP files - pattern = r'href="(https://assets\.publishing\.service\.gov\.uk[^"]+\.(?:csv|zip))"' - urls = re.findall(pattern, resp.text, re.IGNORECASE) - if urls: - return urls[0] - except Exception as e: - print(f" Warning: could not scrape GOV.UK page: {e}") - return None - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "ofsted") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - url = _discover_csv_url() - if not url: - raise RuntimeError( - "Could not discover Ofsted MI download URL. " - "Visit https://www.gov.uk/government/statistical-data-sets/" - "monthly-management-information-ofsteds-school-inspections-outcomes " - "to get the latest URL and update MANUAL_URL in ofsted.py" - ) - - filename = url.split("/")[-1] - dest_file = dest / filename - - if dest_file.exists(): - print(f" Ofsted: {filename} already exists, skipping download.") - return dest_file - - print(f" Ofsted: downloading {url} ...") - resp = requests.get(url, timeout=120, stream=True) - resp.raise_for_status() - with open(dest_file, "wb") as f: - for chunk in resp.iter_content(chunk_size=65536): - f.write(chunk) - - print(f" Ofsted: saved {dest_file} ({dest_file.stat().st_size // 1024} KB)") - return dest_file - - -def _parse_grade(val) -> int | None: - if pd.isna(val): - return None - key = str(val).strip() - return GRADE_MAP.get(key) - - -def _parse_rc_grade(val) -> int | None: - """Parse a Report Card grade text to integer 1–5.""" - if pd.isna(val): - return None - key = str(val).strip().lower() - return RC_GRADE_MAP.get(key) - - -def _parse_safeguarding(val) -> bool | None: - """Parse safeguarding 'Met'/'Not met' to boolean.""" - if pd.isna(val): - return None - s = str(val).strip().lower() - if s == "met": - return True - if s in ("not met", "not_met"): - return False - return None - - -def _parse_date(val) -> date | None: - if pd.isna(val): - return None - for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y", "%d %B %Y"): - try: - return datetime.strptime(str(val).strip(), fmt).date() - except ValueError: - pass - return None - - -def _framework_for_row(row) -> str | None: - """Determine inspection framework for a single school row. - - Check RC columns first — if any have a value, it's a Report Card inspection. - Fall back to OEIF columns. If neither has data, the school has no graded - inspection on record (return None). - """ - rc_check_cols = [ - "rc_inclusion", "rc_curriculum_teaching", "rc_achievement", - "rc_attendance_behaviour", "rc_personal_development", - "rc_leadership_governance", "rc_safeguarding", - ] - for col in rc_check_cols: - val = row.get(col) - if val is not None and not (isinstance(val, float) and pd.isna(val)): - return "ReportCard" - - oeif_check_cols = ["overall_effectiveness", "quality_of_education"] - for col in oeif_check_cols: - val = row.get(col) - if val is not None and not (isinstance(val, float) and pd.isna(val)): - return "OEIF" - - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "ofsted") if data_dir else DEST_DIR - files = sorted(dest.glob("*.csv")) + sorted(dest.glob("*.zip")) - if not files: - raise FileNotFoundError(f"No Ofsted MI file found in {dest}") - path = files[-1] - - print(f" Ofsted: loading {path} ...") - - def _find_header_row(filepath, encoding="latin-1"): - """Scan up to 10 rows to find the one containing a URN column.""" - for i in range(10): - peek = pd.read_csv(filepath, encoding=encoding, header=i, nrows=0) - if any(str(c).strip() in ("URN", "Urn", "urn") for c in peek.columns): - return i - return 0 - - if str(path).endswith(".zip"): - import zipfile, io - with zipfile.ZipFile(path) as z: - csv_names = [n for n in z.namelist() if n.endswith(".csv")] - if not csv_names: - raise ValueError("No CSV found inside Ofsted ZIP") - # Extract to a temp file so we can scan for the header row - import tempfile, os - with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: - tmp.write(z.read(csv_names[0])) - tmp_path = tmp.name - try: - hdr = _find_header_row(tmp_path) - df = pd.read_csv(tmp_path, encoding="latin-1", low_memory=False, header=hdr) - finally: - os.unlink(tmp_path) - else: - hdr = _find_header_row(path) - df = pd.read_csv(path, encoding="latin-1", low_memory=False, header=hdr) - - # Normalise OEIF column names: for each target field pick the first source column present - available = set(df.columns) - for target, sources in COLUMN_PRIORITY.items(): - for src in sources: - if src in available: - df.rename(columns={src: target}, inplace=True) - break - - # Normalise Report Card column names (if present) - available = set(df.columns) - for target, sources in RC_COLUMN_PRIORITY.items(): - for src in sources: - if src in available: - df.rename(columns={src: target}, inplace=True) - break - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - # Only keep rows with a valid URN - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - inserted = updated = skipped = 0 - - with get_session() as session: - # Keep only the most recent inspection per URN - if "inspection_date" in df.columns: - df["_date_parsed"] = df["inspection_date"].apply(_parse_date) - df = df.sort_values("_date_parsed", ascending=False).groupby("urn").first().reset_index() - - from sqlalchemy import text - - for _, row in df.iterrows(): - urn = int(row["urn"]) - - record = { - "urn": urn, - "framework": _framework_for_row(row), - "inspection_date": _parse_date(row.get("inspection_date")), - "publication_date": _parse_date(row.get("publication_date")), - "inspection_type": str(row.get("inspection_type", "")).strip() or None, - # OEIF fields - "overall_effectiveness": _parse_grade(row.get("overall_effectiveness")), - "quality_of_education": _parse_grade(row.get("quality_of_education")), - "behaviour_attitudes": _parse_grade(row.get("behaviour_attitudes")), - "personal_development": _parse_grade(row.get("personal_development")), - "leadership_management": _parse_grade(row.get("leadership_management")), - "early_years_provision": _parse_grade(row.get("early_years_provision")), - "previous_overall": None, - # Report Card fields - "rc_safeguarding_met": _parse_safeguarding(row.get("rc_safeguarding")), - "rc_inclusion": _parse_rc_grade(row.get("rc_inclusion")), - "rc_curriculum_teaching": _parse_rc_grade(row.get("rc_curriculum_teaching")), - "rc_achievement": _parse_rc_grade(row.get("rc_achievement")), - "rc_attendance_behaviour": _parse_rc_grade(row.get("rc_attendance_behaviour")), - "rc_personal_development": _parse_rc_grade(row.get("rc_personal_development")), - "rc_leadership_governance": _parse_rc_grade(row.get("rc_leadership_governance")), - "rc_early_years": _parse_rc_grade(row.get("rc_early_years")), - "rc_sixth_form": _parse_rc_grade(row.get("rc_sixth_form")), - } - - session.execute( - text(""" - INSERT INTO ofsted_inspections - (urn, framework, inspection_date, publication_date, inspection_type, - overall_effectiveness, quality_of_education, behaviour_attitudes, - personal_development, leadership_management, early_years_provision, - previous_overall, - rc_safeguarding_met, rc_inclusion, rc_curriculum_teaching, - rc_achievement, rc_attendance_behaviour, rc_personal_development, - rc_leadership_governance, rc_early_years, rc_sixth_form) - VALUES - (:urn, :framework, :inspection_date, :publication_date, :inspection_type, - :overall_effectiveness, :quality_of_education, :behaviour_attitudes, - :personal_development, :leadership_management, :early_years_provision, - :previous_overall, - :rc_safeguarding_met, :rc_inclusion, :rc_curriculum_teaching, - :rc_achievement, :rc_attendance_behaviour, :rc_personal_development, - :rc_leadership_governance, :rc_early_years, :rc_sixth_form) - ON CONFLICT (urn) DO UPDATE SET - previous_overall = ofsted_inspections.overall_effectiveness, - framework = EXCLUDED.framework, - inspection_date = EXCLUDED.inspection_date, - publication_date = EXCLUDED.publication_date, - inspection_type = EXCLUDED.inspection_type, - overall_effectiveness = EXCLUDED.overall_effectiveness, - quality_of_education = EXCLUDED.quality_of_education, - behaviour_attitudes = EXCLUDED.behaviour_attitudes, - personal_development = EXCLUDED.personal_development, - leadership_management = EXCLUDED.leadership_management, - early_years_provision = EXCLUDED.early_years_provision, - rc_safeguarding_met = EXCLUDED.rc_safeguarding_met, - rc_inclusion = EXCLUDED.rc_inclusion, - rc_curriculum_teaching = EXCLUDED.rc_curriculum_teaching, - rc_achievement = EXCLUDED.rc_achievement, - rc_attendance_behaviour = EXCLUDED.rc_attendance_behaviour, - rc_personal_development = EXCLUDED.rc_personal_development, - rc_leadership_governance = EXCLUDED.rc_leadership_governance, - rc_early_years = EXCLUDED.rc_early_years, - rc_sixth_form = EXCLUDED.rc_sixth_form - """), - record, - ) - inserted += 1 - - if inserted % 5000 == 0: - session.flush() - print(f" Processed {inserted} records...") - - print(f" Ofsted: upserted {inserted} records") - return {"inserted": inserted, "updated": updated, "skipped": skipped} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - - if args.action in ("download", "all"): - path = download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/parent_view.py b/integrator/scripts/sources/parent_view.py deleted file mode 100644 index 535189e..0000000 --- a/integrator/scripts/sources/parent_view.py +++ /dev/null @@ -1,229 +0,0 @@ -""" -Ofsted Parent View open data downloader and loader. - -Source: https://parentview.ofsted.gov.uk/open-data -Update: ~3 times/year (Spring, Autumn, Summer) -""" -import argparse -import re -import sys -from datetime import date, datetime -from pathlib import Path - -import pandas as pd -import requests - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session - -DEST_DIR = SUPPLEMENTARY_DIR / "parent_view" -OPEN_DATA_PAGE = "https://parentview.ofsted.gov.uk/open-data" - -# Question column mapping — Parent View open data uses descriptive column headers -# Map any variant to our internal field names -QUESTION_MAP = { - # Q1 — happiness - "My child is happy at this school": "q_happy_pct", - "Happy": "q_happy_pct", - # Q2 — safety - "My child feels safe at this school": "q_safe_pct", - "Safe": "q_safe_pct", - # Q3 — bullying - "The school makes sure its pupils are well behaved": "q_behaviour_pct", - "Well Behaved": "q_behaviour_pct", - # Q4 — bullying dealt with (sometimes separate) - "My child has been bullied and the school dealt with the bullying quickly and effectively": "q_bullying_pct", - "Bullying": "q_bullying_pct", - # Q5 — curriculum info - "The school makes me aware of what my child will learn during the year": "q_communication_pct", - "Aware of learning": "q_communication_pct", - # Q6 — concerns dealt with - "When I have raised concerns with the school, they have been dealt with properly": "q_communication_pct", - # Q7 — child does well - "My child does well at this school": "q_progress_pct", - "Does well": "q_progress_pct", - # Q8 — teaching - "The teaching is good at this school": "q_teaching_pct", - "Good teaching": "q_teaching_pct", - # Q9 — progress info - "I receive valuable information from the school about my child's progress": "q_information_pct", - "Progress information": "q_information_pct", - # Q10 — curriculum breadth - "My child is taught a broad range of subjects": "q_curriculum_pct", - "Broad subjects": "q_curriculum_pct", - # Q11 — prepares for future - "The school prepares my child well for the future": "q_future_pct", - "Prepared for future": "q_future_pct", - # Q12 — leadership - "The school is led and managed effectively": "q_leadership_pct", - "Led well": "q_leadership_pct", - # Q13 — wellbeing - "The school supports my child's wider personal development": "q_wellbeing_pct", - "Personal development": "q_wellbeing_pct", - # Q14 — recommendation - "I would recommend this school to another parent": "q_recommend_pct", - "Recommend": "q_recommend_pct", -} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "parent_view") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - # Scrape the open data page for the download link - try: - resp = requests.get(OPEN_DATA_PAGE, timeout=30) - resp.raise_for_status() - pattern = r'href="([^"]+\.(?:xlsx|csv|zip))"' - urls = re.findall(pattern, resp.text, re.IGNORECASE) - if not urls: - raise RuntimeError("No download link found on Parent View open data page") - url = urls[0] if urls[0].startswith("http") else "https://parentview.ofsted.gov.uk" + urls[0] - except Exception as e: - raise RuntimeError(f"Could not discover Parent View download URL: {e}") - - filename = url.split("/")[-1].split("?")[0] - dest_file = dest / filename - - if dest_file.exists(): - print(f" ParentView: {filename} already exists, skipping download.") - return dest_file - - print(f" ParentView: downloading {url} ...") - resp = requests.get(url, timeout=120, stream=True) - resp.raise_for_status() - with open(dest_file, "wb") as f: - for chunk in resp.iter_content(chunk_size=65536): - f.write(chunk) - - print(f" ParentView: saved {dest_file}") - return dest_file - - -def _positive_pct(row: pd.Series, q_col_base: str) -> float | None: - """Sum 'Strongly agree' + 'Agree' percentages for a question.""" - # Parent View open data has columns like "Q1 - Strongly agree %", "Q1 - Agree %" - strongly = row.get(f"{q_col_base} - Strongly agree %") or row.get(f"{q_col_base} - Strongly Agree %") - agree = row.get(f"{q_col_base} - Agree %") - try: - total = 0.0 - if pd.notna(strongly): - total += float(strongly) - if pd.notna(agree): - total += float(agree) - return round(total, 1) if total > 0 else None - except (TypeError, ValueError): - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "parent_view") if data_dir else DEST_DIR - files = sorted(dest.glob("*.xlsx")) + sorted(dest.glob("*.csv")) - if not files: - raise FileNotFoundError(f"No Parent View file found in {dest}") - path = files[-1] - - print(f" ParentView: loading {path} ...") - - if str(path).endswith(".xlsx"): - df = pd.read_excel(path) - else: - df = pd.read_csv(path, encoding="latin-1", low_memory=False) - - # Normalise URN column - urn_col = next((c for c in df.columns if c.strip().upper() == "URN"), None) - if not urn_col: - raise ValueError(f"URN column not found. Columns: {list(df.columns)[:20]}") - df.rename(columns={urn_col: "urn"}, inplace=True) - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - # Try to find total responses column - resp_col = next((c for c in df.columns if "total" in c.lower() and "respon" in c.lower()), None) - - inserted = 0 - today = date.today() - - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - total = int(row[resp_col]) if resp_col and pd.notna(row.get(resp_col)) else None - - # Try to extract % positive per question from wide-format columns - # Parent View has numbered questions Q1–Q12 (or Q1–Q14 depending on year) - record = { - "urn": urn, - "survey_date": today, - "total_responses": total, - "q_happy_pct": _positive_pct(row, "Q1"), - "q_safe_pct": _positive_pct(row, "Q2"), - "q_behaviour_pct": _positive_pct(row, "Q3"), - "q_bullying_pct": _positive_pct(row, "Q4"), - "q_communication_pct": _positive_pct(row, "Q5"), - "q_progress_pct": _positive_pct(row, "Q7"), - "q_teaching_pct": _positive_pct(row, "Q8"), - "q_information_pct": _positive_pct(row, "Q9"), - "q_curriculum_pct": _positive_pct(row, "Q10"), - "q_future_pct": _positive_pct(row, "Q11"), - "q_leadership_pct": _positive_pct(row, "Q12"), - "q_wellbeing_pct": _positive_pct(row, "Q13"), - "q_recommend_pct": _positive_pct(row, "Q14"), - "q_sen_pct": None, - } - - session.execute( - text(""" - INSERT INTO ofsted_parent_view - (urn, survey_date, total_responses, - q_happy_pct, q_safe_pct, q_behaviour_pct, q_bullying_pct, - q_communication_pct, q_progress_pct, q_teaching_pct, - q_information_pct, q_curriculum_pct, q_future_pct, - q_leadership_pct, q_wellbeing_pct, q_recommend_pct, q_sen_pct) - VALUES - (:urn, :survey_date, :total_responses, - :q_happy_pct, :q_safe_pct, :q_behaviour_pct, :q_bullying_pct, - :q_communication_pct, :q_progress_pct, :q_teaching_pct, - :q_information_pct, :q_curriculum_pct, :q_future_pct, - :q_leadership_pct, :q_wellbeing_pct, :q_recommend_pct, :q_sen_pct) - ON CONFLICT (urn) DO UPDATE SET - survey_date = EXCLUDED.survey_date, - total_responses = EXCLUDED.total_responses, - q_happy_pct = EXCLUDED.q_happy_pct, - q_safe_pct = EXCLUDED.q_safe_pct, - q_behaviour_pct = EXCLUDED.q_behaviour_pct, - q_bullying_pct = EXCLUDED.q_bullying_pct, - q_communication_pct = EXCLUDED.q_communication_pct, - q_progress_pct = EXCLUDED.q_progress_pct, - q_teaching_pct = EXCLUDED.q_teaching_pct, - q_information_pct = EXCLUDED.q_information_pct, - q_curriculum_pct = EXCLUDED.q_curriculum_pct, - q_future_pct = EXCLUDED.q_future_pct, - q_leadership_pct = EXCLUDED.q_leadership_pct, - q_wellbeing_pct = EXCLUDED.q_wellbeing_pct, - q_recommend_pct = EXCLUDED.q_recommend_pct, - q_sen_pct = EXCLUDED.q_sen_pct - """), - record, - ) - inserted += 1 - if inserted % 2000 == 0: - session.flush() - - print(f" ParentView: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/phonics.py b/integrator/scripts/sources/phonics.py deleted file mode 100644 index 4a5264b..0000000 --- a/integrator/scripts/sources/phonics.py +++ /dev/null @@ -1,132 +0,0 @@ -""" -Phonics Screening Check downloader and loader. - -Source: EES publication "phonics-screening-check-and-key-stage-1-assessments-england" -Update: Annual (September/October) -""" -import argparse -import sys -from pathlib import Path - -import pandas as pd - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session -from sources.ees import get_latest_csv_url, download_csv - -DEST_DIR = SUPPLEMENTARY_DIR / "phonics" -PUBLICATION_SLUG = "phonics-screening-check-and-key-stage-1-assessments-england" - -# Known column names in the phonics CSV (vary by year) -COLUMN_MAP = { - "URN": "urn", - "urn": "urn", - # Year 1 pass rate - "PPTA1": "year1_phonics_pct", # % meeting expected standard Y1 - "PPTA1B": "year1_phonics_pct", - "PT_MET_PHON_Y1": "year1_phonics_pct", - "Y1_MET_EXPECTED_PCT": "year1_phonics_pct", - # Year 2 (re-takers) - "PPTA2": "year2_phonics_pct", - "PT_MET_PHON_Y2": "year2_phonics_pct", - "Y2_MET_EXPECTED_PCT": "year2_phonics_pct", - # Year label - "YEAR": "year", - "Year": "year", -} - -NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", ""} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "phonics") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school") - if not url: - raise RuntimeError(f"Could not find CSV URL for phonics publication") - - filename = url.split("/")[-1].split("?")[0] or "phonics_latest.csv" - return download_csv(url, dest / filename) - - -def _parse_pct(val) -> float | None: - if pd.isna(val): - return None - s = str(val).strip().upper().replace("%", "") - if s in NULL_VALUES: - return None - try: - return float(s) - except ValueError: - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "phonics") if data_dir else DEST_DIR - files = sorted(dest.glob("*.csv")) - if not files: - raise FileNotFoundError(f"No phonics CSV found in {dest}") - path = files[-1] - - print(f" Phonics: loading {path} ...") - df = pd.read_csv(path, encoding="latin-1", low_memory=False) - df.rename(columns=COLUMN_MAP, inplace=True) - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - # Infer year from filename if not in data - year = None - import re - m = re.search(r"20(\d{2})", path.stem) - if m: - year = int("20" + m.group(1)) - - inserted = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year - if not row_year: - continue - - session.execute( - text(""" - INSERT INTO phonics (urn, year, year1_phonics_pct, year2_phonics_pct) - VALUES (:urn, :year, :y1, :y2) - ON CONFLICT (urn, year) DO UPDATE SET - year1_phonics_pct = EXCLUDED.year1_phonics_pct, - year2_phonics_pct = EXCLUDED.year2_phonics_pct - """), - { - "urn": urn, - "year": row_year, - "y1": _parse_pct(row.get("year1_phonics_pct")), - "y2": _parse_pct(row.get("year2_phonics_pct")), - }, - ) - inserted += 1 - if inserted % 5000 == 0: - session.flush() - - print(f" Phonics: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/scripts/sources/sen_detail.py b/integrator/scripts/sources/sen_detail.py deleted file mode 100644 index 8f20ecd..0000000 --- a/integrator/scripts/sources/sen_detail.py +++ /dev/null @@ -1,150 +0,0 @@ -""" -SEN (Special Educational Needs) primary need type breakdown. - -Source: EES publication "special-educational-needs-in-england" -Update: Annual (September) -""" -import argparse -import re -import sys -from pathlib import Path - -import pandas as pd - -sys.path.insert(0, str(Path(__file__).parent.parent)) -from config import SUPPLEMENTARY_DIR -from db import get_session -from sources.ees import get_latest_csv_url, download_csv - -DEST_DIR = SUPPLEMENTARY_DIR / "sen_detail" -PUBLICATION_SLUG = "special-educational-needs-in-england" - -NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", ""} - -COLUMN_MAP = { - "URN": "urn", - "urn": "urn", - "YEAR": "year", - "Year": "year", - # Primary need types — DfE abbreviated codes - "PT_SPEECH": "primary_need_speech_pct", # SLCN - "PT_ASD": "primary_need_autism_pct", # ASD - "PT_MLD": "primary_need_mld_pct", # Moderate learning difficulty - "PT_SPLD": "primary_need_spld_pct", # Specific learning difficulty - "PT_SEMH": "primary_need_semh_pct", # Social, emotional, mental health - "PT_PHYSICAL": "primary_need_physical_pct", # Physical/sensory - "PT_OTHER": "primary_need_other_pct", - # Alternative naming - "SLCN_PCT": "primary_need_speech_pct", - "ASD_PCT": "primary_need_autism_pct", - "MLD_PCT": "primary_need_mld_pct", - "SPLD_PCT": "primary_need_spld_pct", - "SEMH_PCT": "primary_need_semh_pct", - "PHYSICAL_PCT": "primary_need_physical_pct", - "OTHER_PCT": "primary_need_other_pct", -} - - -def download(data_dir: Path | None = None) -> Path: - dest = (data_dir / "supplementary" / "sen_detail") if data_dir else DEST_DIR - dest.mkdir(parents=True, exist_ok=True) - - url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school") - if not url: - url = get_latest_csv_url(PUBLICATION_SLUG) - if not url: - raise RuntimeError("Could not find CSV URL for SEN publication") - - filename = url.split("/")[-1].split("?")[0] or "sen_latest.csv" - return download_csv(url, dest / filename) - - -def _parse_pct(val) -> float | None: - if pd.isna(val): - return None - s = str(val).strip().upper().replace("%", "") - if s in NULL_VALUES: - return None - try: - return float(s) - except ValueError: - return None - - -def load(path: Path | None = None, data_dir: Path | None = None) -> dict: - if path is None: - dest = (data_dir / "supplementary" / "sen_detail") if data_dir else DEST_DIR - files = sorted(dest.glob("*.csv")) - if not files: - raise FileNotFoundError(f"No SEN CSV found in {dest}") - path = files[-1] - - print(f" SEN Detail: loading {path} ...") - df = pd.read_csv(path, encoding="latin-1", low_memory=False) - df.rename(columns=COLUMN_MAP, inplace=True) - - if "urn" not in df.columns: - raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}") - - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - year = None - m = re.search(r"20(\d{2})", path.stem) - if m: - year = int("20" + m.group(1)) - - inserted = 0 - with get_session() as session: - from sqlalchemy import text - for _, row in df.iterrows(): - urn = int(row["urn"]) - row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year - if not row_year: - continue - - session.execute( - text(""" - INSERT INTO sen_detail - (urn, year, primary_need_speech_pct, primary_need_autism_pct, - primary_need_mld_pct, primary_need_spld_pct, primary_need_semh_pct, - primary_need_physical_pct, primary_need_other_pct) - VALUES (:urn, :year, :speech, :autism, :mld, :spld, :semh, :physical, :other) - ON CONFLICT (urn, year) DO UPDATE SET - primary_need_speech_pct = EXCLUDED.primary_need_speech_pct, - primary_need_autism_pct = EXCLUDED.primary_need_autism_pct, - primary_need_mld_pct = EXCLUDED.primary_need_mld_pct, - primary_need_spld_pct = EXCLUDED.primary_need_spld_pct, - primary_need_semh_pct = EXCLUDED.primary_need_semh_pct, - primary_need_physical_pct = EXCLUDED.primary_need_physical_pct, - primary_need_other_pct = EXCLUDED.primary_need_other_pct - """), - { - "urn": urn, "year": row_year, - "speech": _parse_pct(row.get("primary_need_speech_pct")), - "autism": _parse_pct(row.get("primary_need_autism_pct")), - "mld": _parse_pct(row.get("primary_need_mld_pct")), - "spld": _parse_pct(row.get("primary_need_spld_pct")), - "semh": _parse_pct(row.get("primary_need_semh_pct")), - "physical": _parse_pct(row.get("primary_need_physical_pct")), - "other": _parse_pct(row.get("primary_need_other_pct")), - }, - ) - inserted += 1 - if inserted % 5000 == 0: - session.flush() - - print(f" SEN Detail: upserted {inserted} records") - return {"inserted": inserted, "updated": 0, "skipped": 0} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--action", choices=["download", "load", "all"], default="all") - parser.add_argument("--data-dir", type=Path, default=None) - args = parser.parse_args() - if args.action in ("download", "all"): - download(args.data_dir) - if args.action in ("load", "all"): - load(data_dir=args.data_dir) diff --git a/integrator/server.py b/integrator/server.py deleted file mode 100644 index a0b1c2f..0000000 --- a/integrator/server.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Data integrator HTTP server. -Kestra calls this server via HTTP tasks to trigger download/load operations. -""" -import importlib -import sys -import traceback -from pathlib import Path - -from fastapi import FastAPI, HTTPException -from fastapi.responses import JSONResponse - -sys.path.insert(0, "/app/scripts") - -app = FastAPI(title="SchoolCompare Data Integrator", version="1.0.0") - -SOURCES = { - "ofsted", "gias", "parent_view", - "census", "admissions", "sen_detail", - "phonics", "idaci", "finance", "ks2", -} - - -@app.get("/health") -def health(): - return {"status": "ok"} - - -@app.post("/run/{source}") -def run_source(source: str, action: str = "all"): - """ - Trigger a data source download and/or load. - action: "download" | "load" | "all" - """ - if source not in SOURCES: - raise HTTPException(status_code=404, detail=f"Unknown source '{source}'. Available: {sorted(SOURCES)}") - if action not in ("download", "load", "all"): - raise HTTPException(status_code=400, detail="action must be 'download', 'load', or 'all'") - - try: - mod = importlib.import_module(f"sources.{source}") - result = {} - - if action in ("download", "all"): - mod.download() - - if action in ("load", "all"): - result = mod.load() - - return {"source": source, "action": action, "result": result} - - except Exception as e: - tb = traceback.format_exc() - raise HTTPException(status_code=500, detail={"error": str(e), "traceback": tb}) - - -@app.post("/run-all") -def run_all(action: str = "all"): - """Trigger all sources in sequence.""" - results = {} - for source in sorted(SOURCES): - try: - mod = importlib.import_module(f"sources.{source}") - if action in ("download", "all"): - mod.download() - if action in ("load", "all"): - results[source] = mod.load() - except Exception as e: - results[source] = {"error": str(e)} - return results