chore: remove Kestra and integrator legacy services
Some checks failed
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 35s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m11s
Build and Push Docker Images / Build Integrator (push) Failing after 30s
Build and Push Docker Images / Build Kestra Init (push) Failing after 29s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 30s
Build and Push Docker Images / Trigger Portainer Update (push) Has been skipped

Migration to Airflow + Meltano pipeline is complete. Remove:
- kestra, kestra-init, integrator services from docker-compose.portainer.yml
- kestra_storage and supplementary_data volumes
- KESTRA_USER/KESTRA_PASSWORD env var references
- integrator/ directory (Kestra flows, scripts, Dockerfiles)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-27 15:03:34 +00:00
parent 668e234eb2
commit 62284e7a94
30 changed files with 1 additions and 2453 deletions

View File

@@ -8,8 +8,6 @@
# TYPESENSE_API_KEY — Typesense admin API key
# TYPESENSE_SEARCH_KEY — Typesense search-only key (exposed to frontend)
# AIRFLOW_ADMIN_USER — Airflow admin username (password auto-generated, see api-server logs)
# KESTRA_USER — Kestra UI username (optional)
# KESTRA_PASSWORD — Kestra UI password (optional)
services:
@@ -103,87 +101,6 @@ services:
retries: 5
start_period: 10s
# ── Kestra — workflow orchestrator (legacy, kept during migration) ────
kestra:
image: kestra/kestra:latest
container_name: schoolcompare_kestra
command: server standalone
ports:
- "8090:8080"
volumes:
- kestra_storage:/app/storage
environment:
KESTRA_CONFIGURATION: |
datasources:
postgres:
url: jdbc:postgresql://sc_database:5432/kestra
driverClassName: org.postgresql.Driver
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
kestra:
repository:
type: postgres
queue:
type: postgres
storage:
type: local
local:
base-path: /app/storage
depends_on:
sc_database:
condition: service_healthy
networks:
- backend
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:8081/health | grep -q '\"status\":\"UP\"'"]
interval: 15s
timeout: 10s
retries: 10
start_period: 60s
# ── Kestra init (legacy, kept during migration) ──────────────────────
kestra-init:
image: privaterepo.sitaru.org/tudor/school_compare-kestra-init:latest
container_name: schoolcompare_kestra_init
environment:
KESTRA_URL: http://kestra:8080
KESTRA_USER: ${KESTRA_USER:-}
KESTRA_PASSWORD: ${KESTRA_PASSWORD:-}
depends_on:
kestra:
condition: service_healthy
networks:
- backend
restart: "no"
# ── Data integrator (legacy, kept during migration) ──────────────────
integrator:
image: privaterepo.sitaru.org/tudor/school_compare-integrator:latest
container_name: schoolcompare_integrator
ports:
- "8001:8001"
environment:
DATABASE_URL: postgresql://${DB_USERNAME}:${DB_PASSWORD}@sc_database:5432/${DB_DATABASE_NAME}
DATA_DIR: /data
BACKEND_URL: http://backend:80
ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme}
PYTHONUNBUFFERED: 1
volumes:
- supplementary_data:/data
depends_on:
sc_database:
condition: service_healthy
networks:
- backend
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 15s
# ── Airflow API Server + UI ───────────────────────────────────────────
airflow-api-server:
image: privaterepo.sitaru.org/tudor/school_compare-pipeline:latest
@@ -282,7 +199,5 @@ networks:
volumes:
postgres_data:
kestra_storage:
supplementary_data:
typesense_data:
airflow_logs:

View File

@@ -1,15 +0,0 @@
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY scripts/ ./scripts/
COPY server.py .
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8001"]

View File

@@ -1,6 +0,0 @@
FROM alpine:3.19
RUN apk add --no-cache curl
COPY flows/ /flows/
COPY docker/kestra-init.sh /kestra-init.sh
RUN chmod +x /kestra-init.sh
CMD ["/kestra-init.sh"]

View File

@@ -1,59 +0,0 @@
#!/bin/sh
set -e
KESTRA_URL="${KESTRA_URL:-http://kestra:8080}"
MAX_WAIT=120
# Basic auth — set KESTRA_USER / KESTRA_PASSWORD if authentication is enabled
AUTH=""
if [ -n "$KESTRA_USER" ] && [ -n "$KESTRA_PASSWORD" ]; then
AUTH="-u ${KESTRA_USER}:${KESTRA_PASSWORD}"
fi
echo "Waiting for Kestra API at ${KESTRA_URL}..."
elapsed=0
until curl -sf $AUTH "${KESTRA_URL}/api/v1/flows/search" > /dev/null 2>&1; do
if [ "$elapsed" -ge "$MAX_WAIT" ]; then
echo "ERROR: Kestra API not reachable after ${MAX_WAIT}s"
exit 1
fi
sleep 5
elapsed=$((elapsed + 5))
done
echo "Kestra API is ready."
echo "Importing flows..."
for f in /flows/*.yml; do
name="$(basename "$f")"
echo " -> $name"
http_code=$(curl -s $AUTH -o /tmp/kestra_resp -w "%{http_code}" \
-X POST "${KESTRA_URL}/api/v1/flows" \
-H "Content-Type: application/x-yaml" \
--data-binary "@${f}")
if [ "$http_code" = "200" ] || [ "$http_code" = "201" ]; then
echo " created"
elif [ "$http_code" = "409" ]; then
ns=$(grep '^namespace:' "$f" | awk '{print $2}')
id=$(grep '^id:' "$f" | awk '{print $2}')
http_code2=$(curl -s $AUTH -o /tmp/kestra_resp -w "%{http_code}" \
-X PUT "${KESTRA_URL}/api/v1/flows/${ns}/${id}" \
-H "Content-Type: application/x-yaml" \
--data-binary "@${f}")
if [ "$http_code2" = "200" ] || [ "$http_code2" = "201" ]; then
echo " updated"
else
echo " ERROR updating $name: HTTP $http_code2"
cat /tmp/kestra_resp; echo
exit 1
fi
else
echo " ERROR importing $name: HTTP $http_code"
cat /tmp/kestra_resp; echo
exit 1
fi
done
echo "All flows imported."

View File

@@ -1,26 +0,0 @@
id: admissions-annual-update
namespace: schoolcompare.data
description: Download and load school admissions data via EES API
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 4 1 7 *" # 1 July annually at 04:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/admissions?action=download
method: POST
timeout: PT20M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/admissions?action=load
method: POST
timeout: PT30M
retry:
type: constant
maxAttempts: 3
interval: PT15M

View File

@@ -1,26 +0,0 @@
id: census-annual-update
namespace: schoolcompare.data
description: Download and load School Census (SPC) data via EES API
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 4 1 9 *" # 1 September annually at 04:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/census?action=download
method: POST
timeout: PT20M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/census?action=load
method: POST
timeout: PT30M
retry:
type: constant
maxAttempts: 3
interval: PT15M

View File

@@ -1,26 +0,0 @@
id: finance-annual-update
namespace: schoolcompare.data
description: Fetch FBIT financial benchmarking data from DfE API for all schools
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 4 1 12 *" # 1 December annually at 04:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/finance?action=download
method: POST
timeout: PT120M # Fetches per-school from API — ~20k schools
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/finance?action=load
method: POST
timeout: PT30M
retry:
type: constant
maxAttempts: 2
interval: PT30M

View File

@@ -1,31 +0,0 @@
id: gias-weekly-update
namespace: schoolcompare.data
description: Download and load GIAS (Get Information About Schools) bulk CSV
triggers:
- id: weekly-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 3 * * 0" # Every Sunday at 03:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/gias?action=download
method: POST
timeout: PT30M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/gias?action=load
method: POST
timeout: PT30M
errors:
- id: notify-failure
type: io.kestra.plugin.core.log.Log
message: "GIAS update FAILED: {{ error.message }}"
retry:
type: constant
maxAttempts: 3
interval: PT10M

View File

@@ -1,26 +0,0 @@
id: idaci-annual-check
namespace: schoolcompare.data
description: Download IoD2019 IDACI file and compute deprivation scores for all schools
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 5 1 1 *" # 1 January annually at 05:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/idaci?action=download
method: POST
timeout: PT10M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/idaci?action=load
method: POST
timeout: PT60M
retry:
type: constant
maxAttempts: 2
interval: PT30M

View File

@@ -1,23 +0,0 @@
id: ks2-reimport
namespace: schoolcompare.data
description: Re-import KS2 attainment data from bundled CSV files (use after DB wipe)
# No scheduled trigger — run manually from the Kestra UI when needed.
tasks:
- id: reimport
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/ks2?action=load
method: POST
allowFailed: false
timeout: PT30S # fire-and-forget; backend runs migration in background
errors:
- id: notify-failure
type: io.kestra.plugin.core.log.Log
message: "KS2 re-import FAILED: {{ error.message }}"
retry:
type: constant
maxAttempts: 2
interval: PT5M

View File

@@ -1,33 +0,0 @@
id: ofsted-monthly-update
namespace: schoolcompare.data
description: Download and load Ofsted Monthly Management Information CSV
triggers:
- id: monthly-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 2 1 * *" # 1st of each month at 02:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/ofsted?action=download
method: POST
allowFailed: false
timeout: PT10M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/ofsted?action=load
method: POST
allowFailed: false
timeout: PT30M
errors:
- id: notify-failure
type: io.kestra.plugin.core.log.Log
message: "Ofsted update FAILED: {{ error.message }}"
retry:
type: constant
maxAttempts: 3
interval: PT10M

View File

@@ -1,31 +0,0 @@
id: parent-view-monthly-check
namespace: schoolcompare.data
description: Download and load Ofsted Parent View open data (released ~3x/year)
triggers:
- id: monthly-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 3 1 * *" # 1st of each month at 03:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/parent_view?action=download
method: POST
timeout: PT10M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/parent_view?action=load
method: POST
timeout: PT20M
errors:
- id: notify-failure
type: io.kestra.plugin.core.log.Log
message: "Parent View update FAILED: {{ error.message }}"
retry:
type: constant
maxAttempts: 3
interval: PT10M

View File

@@ -1,26 +0,0 @@
id: phonics-annual-update
namespace: schoolcompare.data
description: Download and load Phonics Screening Check data via EES API
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 5 1 9 *" # 1 September annually at 05:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/phonics?action=download
method: POST
timeout: PT20M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/phonics?action=load
method: POST
timeout: PT30M
retry:
type: constant
maxAttempts: 3
interval: PT15M

View File

@@ -1,26 +0,0 @@
id: sen-detail-annual-update
namespace: schoolcompare.data
description: Download and load SEN primary need breakdown via EES API
triggers:
- id: annual-schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 4 15 9 *" # 15 September annually at 04:00
tasks:
- id: download
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/sen_detail?action=download
method: POST
timeout: PT20M
- id: load
type: io.kestra.plugin.core.http.Request
uri: http://integrator:8001/run/sen_detail?action=load
method: POST
timeout: PT30M
retry:
type: constant
maxAttempts: 3
interval: PT15M

View File

@@ -1,7 +0,0 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
requests==2.32.3
pandas==2.2.3
openpyxl==3.1.5
psycopg2-binary==2.9.9
sqlalchemy==2.0.35

View File

@@ -1,14 +0,0 @@
"""Configuration for the data integrator."""
import os
from pathlib import Path
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare",
)
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
SUPPLEMENTARY_DIR = DATA_DIR / "supplementary"
BACKEND_URL = os.environ.get("BACKEND_URL", "http://backend:80")
ADMIN_API_KEY = os.environ.get("ADMIN_API_KEY", "changeme")

View File

@@ -1,23 +0,0 @@
"""Database connection for the integrator."""
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import DATABASE_URL
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@contextmanager
def get_session():
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()

View File

@@ -1,184 +0,0 @@
"""
School Admissions data downloader and loader.
Source: EES publication "primary-and-secondary-school-applications-and-offers"
Content API release ZIP → supporting-files/AppsandOffers_*_SchoolLevel*.csv
Update: Annual (June/July post-offer round)
"""
import argparse
import re
import sys
from pathlib import Path
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
from sources.ees import download_release_zip_csv
DEST_DIR = SUPPLEMENTARY_DIR / "admissions"
PUBLICATION_SLUG = "primary-and-secondary-school-applications-and-offers"
NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", "Z", ""}
# Maps actual CSV column names → internal field names
COLUMN_MAP = {
# School identifier
"school_urn": "urn",
# Year — e.g. 202526 → 2025
"time_period": "time_period_raw",
# PAN (places offered)
"total_number_places_offered": "pan",
# Applications (total times put as any preference)
"times_put_as_any_preferred_school": "total_applications",
# 1st-preference applications
"times_put_as_1st_preference": "times_1st_pref",
# 1st-preference offers
"number_1st_preference_offers": "offers_1st_pref",
}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "admissions") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
dest_file = dest / "admissions_school_level_latest.csv"
return download_release_zip_csv(
PUBLICATION_SLUG,
dest_file,
zip_member_keyword="schoollevel",
)
def _parse_int(val) -> int | None:
if pd.isna(val):
return None
s = str(val).strip().upper().replace(",", "")
if s in NULL_VALUES:
return None
try:
return int(float(s))
except ValueError:
return None
def _parse_pct(val) -> float | None:
if pd.isna(val):
return None
s = str(val).strip().upper().replace("%", "")
if s in NULL_VALUES:
return None
try:
return float(s)
except ValueError:
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "admissions") if data_dir else DEST_DIR
files = sorted(dest.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No admissions CSV found in {dest}")
path = files[-1]
print(f" Admissions: loading {path} ...")
df = pd.read_csv(path, encoding="utf-8-sig", low_memory=False)
# Rename columns we care about
df.rename(columns=COLUMN_MAP, inplace=True)
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
# Filter to primary schools only
if "school_phase" in df.columns:
df = df[df["school_phase"].str.lower() == "primary"]
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
# Derive year from time_period (e.g. 202526 → 2025)
def _extract_year(val) -> int | None:
s = str(val).strip()
m = re.match(r"(\d{4})\d{2}", s)
if m:
return int(m.group(1))
m2 = re.search(r"20(\d{2})", s)
if m2:
return int("20" + m2.group(1))
return None
if "time_period_raw" in df.columns:
df["year"] = df["time_period_raw"].apply(_extract_year)
else:
year_m = re.search(r"20(\d{2})", path.stem)
df["year"] = int("20" + year_m.group(1)) if year_m else None
df = df.dropna(subset=["year"])
df["year"] = df["year"].astype(int)
# Keep most recent year per school (file may contain multiple years)
df = df.sort_values("year", ascending=False).groupby("urn").first().reset_index()
inserted = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
year = int(row["year"])
pan = _parse_int(row.get("pan"))
total_apps = _parse_int(row.get("total_applications"))
times_1st = _parse_int(row.get("times_1st_pref"))
offers_1st = _parse_int(row.get("offers_1st_pref"))
# % of 1st-preference applicants who received an offer
if times_1st and times_1st > 0 and offers_1st is not None:
pct_1st = round(offers_1st / times_1st * 100, 1)
else:
pct_1st = None
oversubscribed = (
True if (pan and times_1st and times_1st > pan) else
False if (pan and times_1st and times_1st <= pan) else
None
)
session.execute(
text("""
INSERT INTO school_admissions
(urn, year, published_admission_number, total_applications,
first_preference_offers_pct, oversubscribed)
VALUES (:urn, :year, :pan, :total_apps, :pct_1st, :oversubscribed)
ON CONFLICT (urn, year) DO UPDATE SET
published_admission_number = EXCLUDED.published_admission_number,
total_applications = EXCLUDED.total_applications,
first_preference_offers_pct = EXCLUDED.first_preference_offers_pct,
oversubscribed = EXCLUDED.oversubscribed
"""),
{
"urn": urn, "year": year, "pan": pan,
"total_apps": total_apps, "pct_1st": pct_1st,
"oversubscribed": oversubscribed,
},
)
inserted += 1
if inserted % 5000 == 0:
session.flush()
print(f" Processed {inserted} records...")
print(f" Admissions: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,148 +0,0 @@
"""
School Census (SPC) downloader and loader.
Source: EES publication "schools-pupils-and-their-characteristics"
Update: Annual (June)
Adds: class_size_avg, ethnicity breakdown by school
"""
import argparse
import re
import sys
from pathlib import Path
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
from sources.ees import get_latest_csv_url, download_csv
DEST_DIR = SUPPLEMENTARY_DIR / "census"
PUBLICATION_SLUG = "schools-pupils-and-their-characteristics"
NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", ""}
COLUMN_MAP = {
"URN": "urn",
"urn": "urn",
"YEAR": "year",
"Year": "year",
# Class size
"average_class_size": "class_size_avg",
"AVCLAS": "class_size_avg",
"avg_class_size": "class_size_avg",
# Ethnicity — DfE uses ethnicity major group percentages
"perc_white": "ethnicity_white_pct",
"perc_asian": "ethnicity_asian_pct",
"perc_black": "ethnicity_black_pct",
"perc_mixed": "ethnicity_mixed_pct",
"perc_other_ethnic": "ethnicity_other_pct",
"PTWHITE": "ethnicity_white_pct",
"PTASIAN": "ethnicity_asian_pct",
"PTBLACK": "ethnicity_black_pct",
"PTMIXED": "ethnicity_mixed_pct",
"PTOTHER": "ethnicity_other_pct",
}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "census") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school")
if not url:
raise RuntimeError(f"Could not find CSV URL for census publication")
filename = url.split("/")[-1].split("?")[0] or "census_latest.csv"
return download_csv(url, dest / filename)
def _parse_pct(val) -> float | None:
if pd.isna(val):
return None
s = str(val).strip().upper().replace("%", "")
if s in NULL_VALUES:
return None
try:
return float(s)
except ValueError:
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "census") if data_dir else DEST_DIR
files = sorted(dest.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No census CSV found in {dest}")
path = files[-1]
print(f" Census: loading {path} ...")
df = pd.read_csv(path, encoding="latin-1", low_memory=False)
df.rename(columns=COLUMN_MAP, inplace=True)
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
year = None
m = re.search(r"20(\d{2})", path.stem)
if m:
year = int("20" + m.group(1))
inserted = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year
if not row_year:
continue
session.execute(
text("""
INSERT INTO school_census
(urn, year, class_size_avg,
ethnicity_white_pct, ethnicity_asian_pct, ethnicity_black_pct,
ethnicity_mixed_pct, ethnicity_other_pct)
VALUES (:urn, :year, :class_size_avg,
:white, :asian, :black, :mixed, :other)
ON CONFLICT (urn, year) DO UPDATE SET
class_size_avg = EXCLUDED.class_size_avg,
ethnicity_white_pct = EXCLUDED.ethnicity_white_pct,
ethnicity_asian_pct = EXCLUDED.ethnicity_asian_pct,
ethnicity_black_pct = EXCLUDED.ethnicity_black_pct,
ethnicity_mixed_pct = EXCLUDED.ethnicity_mixed_pct,
ethnicity_other_pct = EXCLUDED.ethnicity_other_pct
"""),
{
"urn": urn,
"year": row_year,
"class_size_avg": _parse_pct(row.get("class_size_avg")),
"white": _parse_pct(row.get("ethnicity_white_pct")),
"asian": _parse_pct(row.get("ethnicity_asian_pct")),
"black": _parse_pct(row.get("ethnicity_black_pct")),
"mixed": _parse_pct(row.get("ethnicity_mixed_pct")),
"other": _parse_pct(row.get("ethnicity_other_pct")),
},
)
inserted += 1
if inserted % 5000 == 0:
session.flush()
print(f" Census: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,111 +0,0 @@
"""
Shared EES (Explore Education Statistics) API client.
Two APIs are available:
- Statistics API: https://api.education.gov.uk/statistics/v1 (only ~13 publications)
- Content API: https://content.explore-education-statistics.service.gov.uk/api
Covers all publications; use this for admissions and other data not in the stats API.
Download all files for a release as a ZIP from /api/releases/{id}/files.
"""
import io
import zipfile
from pathlib import Path
from typing import Optional
import requests
STATS_API_BASE = "https://api.education.gov.uk/statistics/v1"
CONTENT_API_BASE = "https://content.explore-education-statistics.service.gov.uk/api"
TIMEOUT = 60
def get_publication_files(publication_slug: str) -> list[dict]:
"""Return list of data-set file descriptors for a publication (statistics API)."""
url = f"{STATS_API_BASE}/publications/{publication_slug}/data-set-files"
resp = requests.get(url, timeout=TIMEOUT)
resp.raise_for_status()
return resp.json().get("results", [])
def get_latest_csv_url(publication_slug: str, keyword: str = "") -> Optional[str]:
"""
Find the most recent CSV download URL for a publication (statistics API).
Optionally filter by a keyword in the file name.
"""
files = get_publication_files(publication_slug)
for entry in files:
name = entry.get("name", "").lower()
if keyword and keyword.lower() not in name:
continue
csv_url = entry.get("csvDownloadUrl") or entry.get("file", {}).get("url")
if csv_url:
return csv_url
return None
def get_content_release_id(publication_slug: str) -> str:
"""Return the latest release ID for a publication via the content API."""
url = f"{CONTENT_API_BASE}/publications/{publication_slug}/releases/latest"
resp = requests.get(url, timeout=TIMEOUT)
resp.raise_for_status()
return resp.json()["id"]
def download_release_zip_csv(
publication_slug: str,
dest_path: Path,
zip_member_keyword: str = "",
) -> Path:
"""
Download the full-release ZIP from the EES content API and extract one CSV.
If zip_member_keyword is given, the first member whose path contains that
keyword (case-insensitive) is extracted; otherwise the first .csv found is used.
Returns dest_path (the extracted CSV file).
"""
if dest_path.exists():
print(f" EES: {dest_path.name} already exists, skipping.")
return dest_path
release_id = get_content_release_id(publication_slug)
zip_url = f"{CONTENT_API_BASE}/releases/{release_id}/files"
print(f" EES: downloading release ZIP for '{publication_slug}' ...")
resp = requests.get(zip_url, timeout=300, stream=True)
resp.raise_for_status()
data = b"".join(resp.iter_content(chunk_size=65536))
with zipfile.ZipFile(io.BytesIO(data)) as z:
members = z.namelist()
target = None
kw = zip_member_keyword.lower()
for m in members:
if m.endswith(".csv") and (not kw or kw in m.lower()):
target = m
break
if not target:
raise ValueError(
f"No CSV matching '{zip_member_keyword}' in ZIP. Members: {members}"
)
print(f" EES: extracting '{target}' ...")
dest_path.parent.mkdir(parents=True, exist_ok=True)
with z.open(target) as src, open(dest_path, "wb") as dst:
dst.write(src.read())
print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)")
return dest_path
def download_csv(url: str, dest_path: Path) -> Path:
"""Download a CSV from EES to dest_path."""
if dest_path.exists():
print(f" EES: {dest_path.name} already exists, skipping.")
return dest_path
print(f" EES: downloading {url} ...")
resp = requests.get(url, timeout=300, stream=True)
resp.raise_for_status()
dest_path.parent.mkdir(parents=True, exist_ok=True)
with open(dest_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
print(f" EES: saved {dest_path} ({dest_path.stat().st_size // 1024} KB)")
return dest_path

View File

@@ -1,143 +0,0 @@
"""
FBIT (Financial Benchmarking and Insights Tool) financial data loader.
Source: https://schools-financial-benchmarking.service.gov.uk/api/
Update: Annual (December — data for the prior financial year)
"""
import argparse
import sys
import time
from pathlib import Path
import pandas as pd
import requests
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
DEST_DIR = SUPPLEMENTARY_DIR / "finance"
API_BASE = "https://schools-financial-benchmarking.service.gov.uk/api"
RATE_LIMIT_DELAY = 0.1 # seconds between requests
def download(data_dir: Path | None = None) -> Path:
"""
Fetch per-URN financial data from FBIT API and save as CSV.
Batches all school URNs from the database.
"""
dest = (data_dir / "supplementary" / "finance") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
# Determine year from API (use current year minus 1 for completed financials)
from datetime import date
year = date.today().year - 1
dest_file = dest / f"fbit_{year}.csv"
if dest_file.exists():
print(f" Finance: {dest_file.name} already exists, skipping download.")
return dest_file
# Get all URNs from the database
with get_session() as session:
from sqlalchemy import text
rows = session.execute(text("SELECT urn FROM schools")).fetchall()
urns = [r[0] for r in rows]
print(f" Finance: fetching FBIT data for {len(urns)} schools (year {year}) ...")
records = []
errors = 0
for i, urn in enumerate(urns):
if i % 500 == 0:
print(f" {i}/{len(urns)} ...")
try:
resp = requests.get(
f"{API_BASE}/schoolFinancialDataObject/{urn}",
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data:
records.append({
"urn": urn,
"year": year,
"per_pupil_spend": data.get("totalExpenditure") and
data.get("numberOfPupils") and
round(data["totalExpenditure"] / data["numberOfPupils"], 2),
"staff_cost_pct": data.get("staffCostPercent"),
"teacher_cost_pct": data.get("teachingStaffCostPercent"),
"support_staff_cost_pct": data.get("educationSupportStaffCostPercent"),
"premises_cost_pct": data.get("premisesStaffCostPercent"),
})
elif resp.status_code not in (404, 400):
errors += 1
except Exception:
errors += 1
time.sleep(RATE_LIMIT_DELAY)
df = pd.DataFrame(records)
df.to_csv(dest_file, index=False)
print(f" Finance: saved {len(records)} records to {dest_file} ({errors} errors)")
return dest_file
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "finance") if data_dir else DEST_DIR
files = sorted(dest.glob("fbit_*.csv"))
if not files:
raise FileNotFoundError(f"No finance CSV found in {dest}")
path = files[-1]
print(f" Finance: loading {path} ...")
df = pd.read_csv(path)
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
inserted = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
session.execute(
text("""
INSERT INTO school_finance
(urn, year, per_pupil_spend, staff_cost_pct, teacher_cost_pct,
support_staff_cost_pct, premises_cost_pct)
VALUES (:urn, :year, :per_pupil, :staff, :teacher, :support, :premises)
ON CONFLICT (urn, year) DO UPDATE SET
per_pupil_spend = EXCLUDED.per_pupil_spend,
staff_cost_pct = EXCLUDED.staff_cost_pct,
teacher_cost_pct = EXCLUDED.teacher_cost_pct,
support_staff_cost_pct = EXCLUDED.support_staff_cost_pct,
premises_cost_pct = EXCLUDED.premises_cost_pct
"""),
{
"urn": int(row["urn"]),
"year": int(row["year"]),
"per_pupil": float(row["per_pupil_spend"]) if pd.notna(row.get("per_pupil_spend")) else None,
"staff": float(row["staff_cost_pct"]) if pd.notna(row.get("staff_cost_pct")) else None,
"teacher": float(row["teacher_cost_pct"]) if pd.notna(row.get("teacher_cost_pct")) else None,
"support": float(row["support_staff_cost_pct"]) if pd.notna(row.get("support_staff_cost_pct")) else None,
"premises": float(row["premises_cost_pct"]) if pd.notna(row.get("premises_cost_pct")) else None,
},
)
inserted += 1
if inserted % 2000 == 0:
session.flush()
print(f" Finance: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,159 +0,0 @@
"""
GIAS (Get Information About Schools) bulk CSV downloader and loader.
Source: https://get-information-schools.service.gov.uk/Downloads
Update: Daily; we refresh weekly.
Adds: website, headteacher_name, capacity, trust_name, trust_uid, gender, nursery_provision
"""
import argparse
import sys
from datetime import date
from pathlib import Path
import pandas as pd
import requests
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
DEST_DIR = SUPPLEMENTARY_DIR / "gias"
# GIAS bulk download URL — date is injected at runtime
GIAS_URL_TEMPLATE = "https://ea-edubase-api-prod.azurewebsites.net/edubase/downloads/public/edubasealldata{date}.csv"
COLUMN_MAP = {
"URN": "urn",
"SchoolWebsite": "website",
"SchoolCapacity": "capacity",
"TrustName": "trust_name",
"TrustUID": "trust_uid",
"Gender (name)": "gender",
"NurseryProvision (name)": "nursery_provision_raw",
"HeadTitle": "head_title",
"HeadFirstName": "head_first",
"HeadLastName": "head_last",
}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "gias") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
today = date.today().strftime("%Y%m%d")
url = GIAS_URL_TEMPLATE.format(date=today)
filename = f"gias_{today}.csv"
dest_file = dest / filename
if dest_file.exists():
print(f" GIAS: {filename} already exists, skipping download.")
return dest_file
print(f" GIAS: downloading {url} ...")
resp = requests.get(url, timeout=300, stream=True)
# GIAS may not have today's file yet — fall back to yesterday
if resp.status_code == 404:
from datetime import timedelta
yesterday = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
url = GIAS_URL_TEMPLATE.format(date=yesterday)
filename = f"gias_{yesterday}.csv"
dest_file = dest / filename
if dest_file.exists():
print(f" GIAS: {filename} already exists, skipping download.")
return dest_file
resp = requests.get(url, timeout=300, stream=True)
resp.raise_for_status()
with open(dest_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
print(f" GIAS: saved {dest_file} ({dest_file.stat().st_size // 1024} KB)")
return dest_file
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "gias") if data_dir else DEST_DIR
files = sorted(dest.glob("gias_*.csv"))
if not files:
raise FileNotFoundError(f"No GIAS CSV found in {dest}")
path = files[-1]
print(f" GIAS: loading {path} ...")
df = pd.read_csv(path, encoding="latin-1", low_memory=False)
df.rename(columns=COLUMN_MAP, inplace=True)
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
# Build headteacher_name from parts
def build_name(row):
parts = [
str(row.get("head_title", "") or "").strip(),
str(row.get("head_first", "") or "").strip(),
str(row.get("head_last", "") or "").strip(),
]
return " ".join(p for p in parts if p) or None
df["headteacher_name"] = df.apply(build_name, axis=1)
df["nursery_provision"] = df.get("nursery_provision_raw", pd.Series()).apply(
lambda v: True if str(v).strip().lower().startswith("has") else False if pd.notna(v) else None
)
def clean_str(val):
s = str(val).strip() if pd.notna(val) else None
return s if s and s.lower() not in ("nan", "none", "") else None
updated = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
session.execute(
text("""
UPDATE schools SET
website = :website,
headteacher_name = :headteacher_name,
capacity = :capacity,
trust_name = :trust_name,
trust_uid = :trust_uid,
gender = :gender,
nursery_provision = :nursery_provision
WHERE urn = :urn
"""),
{
"urn": urn,
"website": clean_str(row.get("website")),
"headteacher_name": row.get("headteacher_name"),
"capacity": int(row["capacity"]) if pd.notna(row.get("capacity")) and str(row.get("capacity")).strip().isdigit() else None,
"trust_name": clean_str(row.get("trust_name")),
"trust_uid": clean_str(row.get("trust_uid")),
"gender": clean_str(row.get("gender")),
"nursery_provision": row.get("nursery_provision"),
},
)
updated += 1
if updated % 5000 == 0:
session.flush()
print(f" Updated {updated} schools...")
print(f" GIAS: updated {updated} school records")
return {"inserted": 0, "updated": updated, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
path = download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,176 +0,0 @@
"""
IDACI (Income Deprivation Affecting Children Index) loader.
Source: English Indices of Deprivation 2019
https://www.gov.uk/government/statistics/english-indices-of-deprivation-2019
This is a one-time download (5-yearly release). We join school postcodes to LSOAs
via postcodes.io, then look up IDACI scores from the IoD2019 file.
Update: ~5-yearly (next release expected 2025/26)
"""
import argparse
import sys
from pathlib import Path
import pandas as pd
import requests
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
DEST_DIR = SUPPLEMENTARY_DIR / "idaci"
# IoD 2019 supplementary data — "Income Deprivation Affecting Children Index (IDACI)"
IOD_2019_URL = (
"https://assets.publishing.service.gov.uk/government/uploads/system/uploads/"
"attachment_data/file/833970/File_1_-_IMD2019_Index_of_Multiple_Deprivation.xlsx"
)
POSTCODES_IO_BATCH = "https://api.postcodes.io/postcodes"
BATCH_SIZE = 100
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
filename = "iod2019_idaci.xlsx"
dest_file = dest / filename
if dest_file.exists():
print(f" IDACI: {filename} already exists, skipping download.")
return dest_file
print(f" IDACI: downloading IoD2019 file ...")
resp = requests.get(IOD_2019_URL, timeout=300, stream=True)
resp.raise_for_status()
with open(dest_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
print(f" IDACI: saved {dest_file}")
return dest_file
def _postcode_to_lsoa(postcodes: list[str]) -> dict[str, str]:
"""Batch-resolve postcodes to LSOA codes via postcodes.io."""
result = {}
valid = [p.strip().upper() for p in postcodes if p and len(str(p).strip()) >= 5]
valid = list(set(valid))
for i in range(0, len(valid), BATCH_SIZE):
batch = valid[i:i + BATCH_SIZE]
try:
resp = requests.post(POSTCODES_IO_BATCH, json={"postcodes": batch}, timeout=30)
if resp.status_code == 200:
for item in resp.json().get("result", []):
if item and item.get("result"):
lsoa = item["result"].get("lsoa")
if lsoa:
result[item["query"].upper()] = lsoa
except Exception as e:
print(f" Warning: postcodes.io batch failed: {e}")
return result
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR
if path is None:
files = sorted(dest.glob("*.xlsx"))
if not files:
raise FileNotFoundError(f"No IDACI file found in {dest}")
path = files[-1]
print(f" IDACI: loading IoD2019 from {path} ...")
# IoD2019 File 1 — sheet "IoD2019 IDACI" or similar
try:
iod_df = pd.read_excel(path, sheet_name=None)
# Find sheet with IDACI data
idaci_sheet = None
for name, df in iod_df.items():
if "IDACI" in name.upper() or "IDACI" in str(df.columns.tolist()).upper():
idaci_sheet = name
break
if idaci_sheet is None:
idaci_sheet = list(iod_df.keys())[0]
df_iod = iod_df[idaci_sheet]
except Exception as e:
raise RuntimeError(f"Could not read IoD2019 file: {e}")
# Normalise column names — IoD2019 uses specific headers
col_lsoa = next((c for c in df_iod.columns if "LSOA" in str(c).upper() and "code" in str(c).lower()), None)
col_score = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "score" in str(c).lower()), None)
col_rank = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "rank" in str(c).lower()), None)
if not col_lsoa or not col_score:
print(f" IDACI columns available: {list(df_iod.columns)[:20]}")
raise ValueError("Could not find LSOA code or IDACI score columns")
df_iod = df_iod[[col_lsoa, col_score]].copy()
df_iod.columns = ["lsoa_code", "idaci_score"]
df_iod = df_iod.dropna()
# Compute decile from rank (or from score distribution)
total = len(df_iod)
df_iod = df_iod.sort_values("idaci_score", ascending=False)
df_iod["idaci_decile"] = (pd.qcut(df_iod["idaci_score"], 10, labels=False) + 1).astype(int)
# Decile 1 = most deprived (highest IDACI score)
df_iod["idaci_decile"] = 11 - df_iod["idaci_decile"]
lsoa_lookup = df_iod.set_index("lsoa_code")[["idaci_score", "idaci_decile"]].to_dict("index")
print(f" IDACI: loaded {len(lsoa_lookup)} LSOA records")
# Fetch all school postcodes from the database
with get_session() as session:
from sqlalchemy import text
rows = session.execute(text("SELECT urn, postcode FROM schools WHERE postcode IS NOT NULL")).fetchall()
postcodes = [r[1] for r in rows]
print(f" IDACI: resolving {len(postcodes)} postcodes via postcodes.io ...")
pc_to_lsoa = _postcode_to_lsoa(postcodes)
print(f" IDACI: resolved {len(pc_to_lsoa)} postcodes to LSOAs")
inserted = skipped = 0
with get_session() as session:
from sqlalchemy import text
for urn, postcode in rows:
lsoa = pc_to_lsoa.get(str(postcode).strip().upper())
if not lsoa:
skipped += 1
continue
iod = lsoa_lookup.get(lsoa)
if not iod:
skipped += 1
continue
session.execute(
text("""
INSERT INTO school_deprivation (urn, lsoa_code, idaci_score, idaci_decile)
VALUES (:urn, :lsoa, :score, :decile)
ON CONFLICT (urn) DO UPDATE SET
lsoa_code = EXCLUDED.lsoa_code,
idaci_score = EXCLUDED.idaci_score,
idaci_decile = EXCLUDED.idaci_decile
"""),
{"urn": urn, "lsoa": lsoa, "score": float(iod["idaci_score"]), "decile": int(iod["idaci_decile"])},
)
inserted += 1
if inserted % 2000 == 0:
session.flush()
print(f" IDACI: upserted {inserted}, skipped {skipped}")
return {"inserted": inserted, "updated": 0, "skipped": skipped}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,49 +0,0 @@
"""
KS2 attainment data re-importer.
Triggers a full re-import of the KS2 CSV data by calling the backend's
admin endpoint. The backend owns the migration logic and CSV column mappings;
this module is a thin trigger so the re-import can be orchestrated via Kestra
like all other data sources.
The CSV files must already be present in the data volume under
/data/{year}/england_ks2final.csv
(populated at deploy time from the repo's data/ directory).
"""
import requests
from config import BACKEND_URL, ADMIN_API_KEY
HEADERS = {"X-API-Key": ADMIN_API_KEY}
def download():
"""No download step — CSVs are shipped with the repo."""
print("KS2 CSVs are bundled in the data volume; no download needed.")
return {"skipped": True}
def load():
"""Trigger KS2 re-import on the backend and return immediately.
The migration (including geocoding) runs as a background thread on the
backend and can take up to an hour. Poll GET /api/admin/reimport-ks2/status
to check progress, or simply wait for schools to appear in the UI.
"""
url = f"{BACKEND_URL}/api/admin/reimport-ks2?geocode=true"
print(f"POST {url}")
resp = requests.post(url, headers=HEADERS, timeout=30)
resp.raise_for_status()
result = resp.json()
print(f"Result: {result}")
return result
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
args = parser.parse_args()
if args.action in ("download", "all"):
download()
if args.action in ("load", "all"):
load()

View File

@@ -1,418 +0,0 @@
"""
Ofsted Monthly Management Information CSV downloader and loader.
Source: https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes
Update: Monthly (released ~2 weeks into each month)
"""
import argparse
import re
import sys
from datetime import date, datetime
from pathlib import Path
import pandas as pd
import requests
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
# Current Ofsted MI download URL — update this when Ofsted releases a new file.
# The URL follows a predictable pattern; we attempt to discover it from the GOV.UK page.
GOV_UK_PAGE = "https://www.gov.uk/government/statistical-data-sets/monthly-management-information-ofsteds-school-inspections-outcomes"
# Column name → internal field, listed in priority order per field.
# First matching column wins; later entries are fallbacks for older file formats.
COLUMN_PRIORITY = {
"urn": ["URN", "Urn", "urn"],
"inspection_date": [
"Inspection start date of latest OEIF graded inspection",
"Inspection start date",
"Inspection date",
"InspectionDate",
],
"publication_date": [
"Publication date of latest OEIF graded inspection",
"Publication date",
"PublicationDate",
],
"inspection_type": [
"Inspection type of latest OEIF graded inspection",
"Inspection type",
"InspectionType",
],
"overall_effectiveness": [
"Latest OEIF overall effectiveness",
"Overall effectiveness",
"OverallEffectiveness",
],
"quality_of_education": [
"Latest OEIF quality of education",
"Quality of education",
"QualityOfEducation",
],
"behaviour_attitudes": [
"Latest OEIF behaviour and attitudes",
"Behaviour and attitudes",
"BehaviourAndAttitudes",
],
"personal_development": [
"Latest OEIF personal development",
"Personal development",
"PersonalDevelopment",
],
"leadership_management": [
"Latest OEIF effectiveness of leadership and management",
"Leadership and management",
"LeadershipAndManagement",
],
"early_years_provision": [
"Latest OEIF early years provision (where applicable)",
"Early years provision",
"EarlyYearsProvision",
],
}
GRADE_MAP = {
"Outstanding": 1, "1": 1, 1: 1,
"Good": 2, "2": 2, 2: 2,
"Requires improvement": 3, "3": 3, 3: 3,
"Requires Improvement": 3,
"Inadequate": 4, "4": 4, 4: 4,
}
# Report Card grade text → integer (1=Exceptional … 5=Urgent improvement)
RC_GRADE_MAP = {
"exceptional": 1,
"strong standard": 2,
"strong": 2,
"expected standard": 3,
"expected": 3,
"needs attention": 4,
"urgent improvement": 5,
}
# Column name priority for Report Card fields (best-guess names; Ofsted may vary)
RC_COLUMN_PRIORITY = {
"rc_safeguarding": [
"Safeguarding",
"safeguarding",
"Safeguarding standards",
],
"rc_inclusion": [
"Inclusion",
"inclusion",
],
"rc_curriculum_teaching": [
"Curriculum and teaching",
"curriculum_and_teaching",
"Curriculum & teaching",
],
"rc_achievement": [
"Achievement",
"achievement",
],
"rc_attendance_behaviour": [
"Attendance and behaviour",
"attendance_and_behaviour",
"Attendance & behaviour",
],
"rc_personal_development": [
"Personal development and well-being",
"Personal development and wellbeing",
"personal_development_and_wellbeing",
"Personal development & well-being",
],
"rc_leadership_governance": [
"Leadership and governance",
"leadership_and_governance",
"Leadership & governance",
],
"rc_early_years": [
"Early years",
"early_years",
"Early years provision",
],
"rc_sixth_form": [
"Sixth form",
"sixth_form",
"Sixth form in schools",
],
}
DEST_DIR = SUPPLEMENTARY_DIR / "ofsted"
def _discover_csv_url() -> str | None:
"""Scrape the GOV.UK page for the most recent CSV/ZIP link."""
try:
resp = requests.get(GOV_UK_PAGE, timeout=30)
resp.raise_for_status()
# Look for links to assets.publishing.service.gov.uk CSV or ZIP files
pattern = r'href="(https://assets\.publishing\.service\.gov\.uk[^"]+\.(?:csv|zip))"'
urls = re.findall(pattern, resp.text, re.IGNORECASE)
if urls:
return urls[0]
except Exception as e:
print(f" Warning: could not scrape GOV.UK page: {e}")
return None
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "ofsted") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
url = _discover_csv_url()
if not url:
raise RuntimeError(
"Could not discover Ofsted MI download URL. "
"Visit https://www.gov.uk/government/statistical-data-sets/"
"monthly-management-information-ofsteds-school-inspections-outcomes "
"to get the latest URL and update MANUAL_URL in ofsted.py"
)
filename = url.split("/")[-1]
dest_file = dest / filename
if dest_file.exists():
print(f" Ofsted: {filename} already exists, skipping download.")
return dest_file
print(f" Ofsted: downloading {url} ...")
resp = requests.get(url, timeout=120, stream=True)
resp.raise_for_status()
with open(dest_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
print(f" Ofsted: saved {dest_file} ({dest_file.stat().st_size // 1024} KB)")
return dest_file
def _parse_grade(val) -> int | None:
if pd.isna(val):
return None
key = str(val).strip()
return GRADE_MAP.get(key)
def _parse_rc_grade(val) -> int | None:
"""Parse a Report Card grade text to integer 15."""
if pd.isna(val):
return None
key = str(val).strip().lower()
return RC_GRADE_MAP.get(key)
def _parse_safeguarding(val) -> bool | None:
"""Parse safeguarding 'Met'/'Not met' to boolean."""
if pd.isna(val):
return None
s = str(val).strip().lower()
if s == "met":
return True
if s in ("not met", "not_met"):
return False
return None
def _parse_date(val) -> date | None:
if pd.isna(val):
return None
for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y", "%d %B %Y"):
try:
return datetime.strptime(str(val).strip(), fmt).date()
except ValueError:
pass
return None
def _framework_for_row(row) -> str | None:
"""Determine inspection framework for a single school row.
Check RC columns first — if any have a value, it's a Report Card inspection.
Fall back to OEIF columns. If neither has data, the school has no graded
inspection on record (return None).
"""
rc_check_cols = [
"rc_inclusion", "rc_curriculum_teaching", "rc_achievement",
"rc_attendance_behaviour", "rc_personal_development",
"rc_leadership_governance", "rc_safeguarding",
]
for col in rc_check_cols:
val = row.get(col)
if val is not None and not (isinstance(val, float) and pd.isna(val)):
return "ReportCard"
oeif_check_cols = ["overall_effectiveness", "quality_of_education"]
for col in oeif_check_cols:
val = row.get(col)
if val is not None and not (isinstance(val, float) and pd.isna(val)):
return "OEIF"
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "ofsted") if data_dir else DEST_DIR
files = sorted(dest.glob("*.csv")) + sorted(dest.glob("*.zip"))
if not files:
raise FileNotFoundError(f"No Ofsted MI file found in {dest}")
path = files[-1]
print(f" Ofsted: loading {path} ...")
def _find_header_row(filepath, encoding="latin-1"):
"""Scan up to 10 rows to find the one containing a URN column."""
for i in range(10):
peek = pd.read_csv(filepath, encoding=encoding, header=i, nrows=0)
if any(str(c).strip() in ("URN", "Urn", "urn") for c in peek.columns):
return i
return 0
if str(path).endswith(".zip"):
import zipfile, io
with zipfile.ZipFile(path) as z:
csv_names = [n for n in z.namelist() if n.endswith(".csv")]
if not csv_names:
raise ValueError("No CSV found inside Ofsted ZIP")
# Extract to a temp file so we can scan for the header row
import tempfile, os
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(z.read(csv_names[0]))
tmp_path = tmp.name
try:
hdr = _find_header_row(tmp_path)
df = pd.read_csv(tmp_path, encoding="latin-1", low_memory=False, header=hdr)
finally:
os.unlink(tmp_path)
else:
hdr = _find_header_row(path)
df = pd.read_csv(path, encoding="latin-1", low_memory=False, header=hdr)
# Normalise OEIF column names: for each target field pick the first source column present
available = set(df.columns)
for target, sources in COLUMN_PRIORITY.items():
for src in sources:
if src in available:
df.rename(columns={src: target}, inplace=True)
break
# Normalise Report Card column names (if present)
available = set(df.columns)
for target, sources in RC_COLUMN_PRIORITY.items():
for src in sources:
if src in available:
df.rename(columns={src: target}, inplace=True)
break
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
# Only keep rows with a valid URN
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
inserted = updated = skipped = 0
with get_session() as session:
# Keep only the most recent inspection per URN
if "inspection_date" in df.columns:
df["_date_parsed"] = df["inspection_date"].apply(_parse_date)
df = df.sort_values("_date_parsed", ascending=False).groupby("urn").first().reset_index()
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
record = {
"urn": urn,
"framework": _framework_for_row(row),
"inspection_date": _parse_date(row.get("inspection_date")),
"publication_date": _parse_date(row.get("publication_date")),
"inspection_type": str(row.get("inspection_type", "")).strip() or None,
# OEIF fields
"overall_effectiveness": _parse_grade(row.get("overall_effectiveness")),
"quality_of_education": _parse_grade(row.get("quality_of_education")),
"behaviour_attitudes": _parse_grade(row.get("behaviour_attitudes")),
"personal_development": _parse_grade(row.get("personal_development")),
"leadership_management": _parse_grade(row.get("leadership_management")),
"early_years_provision": _parse_grade(row.get("early_years_provision")),
"previous_overall": None,
# Report Card fields
"rc_safeguarding_met": _parse_safeguarding(row.get("rc_safeguarding")),
"rc_inclusion": _parse_rc_grade(row.get("rc_inclusion")),
"rc_curriculum_teaching": _parse_rc_grade(row.get("rc_curriculum_teaching")),
"rc_achievement": _parse_rc_grade(row.get("rc_achievement")),
"rc_attendance_behaviour": _parse_rc_grade(row.get("rc_attendance_behaviour")),
"rc_personal_development": _parse_rc_grade(row.get("rc_personal_development")),
"rc_leadership_governance": _parse_rc_grade(row.get("rc_leadership_governance")),
"rc_early_years": _parse_rc_grade(row.get("rc_early_years")),
"rc_sixth_form": _parse_rc_grade(row.get("rc_sixth_form")),
}
session.execute(
text("""
INSERT INTO ofsted_inspections
(urn, framework, inspection_date, publication_date, inspection_type,
overall_effectiveness, quality_of_education, behaviour_attitudes,
personal_development, leadership_management, early_years_provision,
previous_overall,
rc_safeguarding_met, rc_inclusion, rc_curriculum_teaching,
rc_achievement, rc_attendance_behaviour, rc_personal_development,
rc_leadership_governance, rc_early_years, rc_sixth_form)
VALUES
(:urn, :framework, :inspection_date, :publication_date, :inspection_type,
:overall_effectiveness, :quality_of_education, :behaviour_attitudes,
:personal_development, :leadership_management, :early_years_provision,
:previous_overall,
:rc_safeguarding_met, :rc_inclusion, :rc_curriculum_teaching,
:rc_achievement, :rc_attendance_behaviour, :rc_personal_development,
:rc_leadership_governance, :rc_early_years, :rc_sixth_form)
ON CONFLICT (urn) DO UPDATE SET
previous_overall = ofsted_inspections.overall_effectiveness,
framework = EXCLUDED.framework,
inspection_date = EXCLUDED.inspection_date,
publication_date = EXCLUDED.publication_date,
inspection_type = EXCLUDED.inspection_type,
overall_effectiveness = EXCLUDED.overall_effectiveness,
quality_of_education = EXCLUDED.quality_of_education,
behaviour_attitudes = EXCLUDED.behaviour_attitudes,
personal_development = EXCLUDED.personal_development,
leadership_management = EXCLUDED.leadership_management,
early_years_provision = EXCLUDED.early_years_provision,
rc_safeguarding_met = EXCLUDED.rc_safeguarding_met,
rc_inclusion = EXCLUDED.rc_inclusion,
rc_curriculum_teaching = EXCLUDED.rc_curriculum_teaching,
rc_achievement = EXCLUDED.rc_achievement,
rc_attendance_behaviour = EXCLUDED.rc_attendance_behaviour,
rc_personal_development = EXCLUDED.rc_personal_development,
rc_leadership_governance = EXCLUDED.rc_leadership_governance,
rc_early_years = EXCLUDED.rc_early_years,
rc_sixth_form = EXCLUDED.rc_sixth_form
"""),
record,
)
inserted += 1
if inserted % 5000 == 0:
session.flush()
print(f" Processed {inserted} records...")
print(f" Ofsted: upserted {inserted} records")
return {"inserted": inserted, "updated": updated, "skipped": skipped}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
path = download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,229 +0,0 @@
"""
Ofsted Parent View open data downloader and loader.
Source: https://parentview.ofsted.gov.uk/open-data
Update: ~3 times/year (Spring, Autumn, Summer)
"""
import argparse
import re
import sys
from datetime import date, datetime
from pathlib import Path
import pandas as pd
import requests
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
DEST_DIR = SUPPLEMENTARY_DIR / "parent_view"
OPEN_DATA_PAGE = "https://parentview.ofsted.gov.uk/open-data"
# Question column mapping — Parent View open data uses descriptive column headers
# Map any variant to our internal field names
QUESTION_MAP = {
# Q1 — happiness
"My child is happy at this school": "q_happy_pct",
"Happy": "q_happy_pct",
# Q2 — safety
"My child feels safe at this school": "q_safe_pct",
"Safe": "q_safe_pct",
# Q3 — bullying
"The school makes sure its pupils are well behaved": "q_behaviour_pct",
"Well Behaved": "q_behaviour_pct",
# Q4 — bullying dealt with (sometimes separate)
"My child has been bullied and the school dealt with the bullying quickly and effectively": "q_bullying_pct",
"Bullying": "q_bullying_pct",
# Q5 — curriculum info
"The school makes me aware of what my child will learn during the year": "q_communication_pct",
"Aware of learning": "q_communication_pct",
# Q6 — concerns dealt with
"When I have raised concerns with the school, they have been dealt with properly": "q_communication_pct",
# Q7 — child does well
"My child does well at this school": "q_progress_pct",
"Does well": "q_progress_pct",
# Q8 — teaching
"The teaching is good at this school": "q_teaching_pct",
"Good teaching": "q_teaching_pct",
# Q9 — progress info
"I receive valuable information from the school about my child's progress": "q_information_pct",
"Progress information": "q_information_pct",
# Q10 — curriculum breadth
"My child is taught a broad range of subjects": "q_curriculum_pct",
"Broad subjects": "q_curriculum_pct",
# Q11 — prepares for future
"The school prepares my child well for the future": "q_future_pct",
"Prepared for future": "q_future_pct",
# Q12 — leadership
"The school is led and managed effectively": "q_leadership_pct",
"Led well": "q_leadership_pct",
# Q13 — wellbeing
"The school supports my child's wider personal development": "q_wellbeing_pct",
"Personal development": "q_wellbeing_pct",
# Q14 — recommendation
"I would recommend this school to another parent": "q_recommend_pct",
"Recommend": "q_recommend_pct",
}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "parent_view") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
# Scrape the open data page for the download link
try:
resp = requests.get(OPEN_DATA_PAGE, timeout=30)
resp.raise_for_status()
pattern = r'href="([^"]+\.(?:xlsx|csv|zip))"'
urls = re.findall(pattern, resp.text, re.IGNORECASE)
if not urls:
raise RuntimeError("No download link found on Parent View open data page")
url = urls[0] if urls[0].startswith("http") else "https://parentview.ofsted.gov.uk" + urls[0]
except Exception as e:
raise RuntimeError(f"Could not discover Parent View download URL: {e}")
filename = url.split("/")[-1].split("?")[0]
dest_file = dest / filename
if dest_file.exists():
print(f" ParentView: {filename} already exists, skipping download.")
return dest_file
print(f" ParentView: downloading {url} ...")
resp = requests.get(url, timeout=120, stream=True)
resp.raise_for_status()
with open(dest_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=65536):
f.write(chunk)
print(f" ParentView: saved {dest_file}")
return dest_file
def _positive_pct(row: pd.Series, q_col_base: str) -> float | None:
"""Sum 'Strongly agree' + 'Agree' percentages for a question."""
# Parent View open data has columns like "Q1 - Strongly agree %", "Q1 - Agree %"
strongly = row.get(f"{q_col_base} - Strongly agree %") or row.get(f"{q_col_base} - Strongly Agree %")
agree = row.get(f"{q_col_base} - Agree %")
try:
total = 0.0
if pd.notna(strongly):
total += float(strongly)
if pd.notna(agree):
total += float(agree)
return round(total, 1) if total > 0 else None
except (TypeError, ValueError):
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "parent_view") if data_dir else DEST_DIR
files = sorted(dest.glob("*.xlsx")) + sorted(dest.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No Parent View file found in {dest}")
path = files[-1]
print(f" ParentView: loading {path} ...")
if str(path).endswith(".xlsx"):
df = pd.read_excel(path)
else:
df = pd.read_csv(path, encoding="latin-1", low_memory=False)
# Normalise URN column
urn_col = next((c for c in df.columns if c.strip().upper() == "URN"), None)
if not urn_col:
raise ValueError(f"URN column not found. Columns: {list(df.columns)[:20]}")
df.rename(columns={urn_col: "urn"}, inplace=True)
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
# Try to find total responses column
resp_col = next((c for c in df.columns if "total" in c.lower() and "respon" in c.lower()), None)
inserted = 0
today = date.today()
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
total = int(row[resp_col]) if resp_col and pd.notna(row.get(resp_col)) else None
# Try to extract % positive per question from wide-format columns
# Parent View has numbered questions Q1Q12 (or Q1Q14 depending on year)
record = {
"urn": urn,
"survey_date": today,
"total_responses": total,
"q_happy_pct": _positive_pct(row, "Q1"),
"q_safe_pct": _positive_pct(row, "Q2"),
"q_behaviour_pct": _positive_pct(row, "Q3"),
"q_bullying_pct": _positive_pct(row, "Q4"),
"q_communication_pct": _positive_pct(row, "Q5"),
"q_progress_pct": _positive_pct(row, "Q7"),
"q_teaching_pct": _positive_pct(row, "Q8"),
"q_information_pct": _positive_pct(row, "Q9"),
"q_curriculum_pct": _positive_pct(row, "Q10"),
"q_future_pct": _positive_pct(row, "Q11"),
"q_leadership_pct": _positive_pct(row, "Q12"),
"q_wellbeing_pct": _positive_pct(row, "Q13"),
"q_recommend_pct": _positive_pct(row, "Q14"),
"q_sen_pct": None,
}
session.execute(
text("""
INSERT INTO ofsted_parent_view
(urn, survey_date, total_responses,
q_happy_pct, q_safe_pct, q_behaviour_pct, q_bullying_pct,
q_communication_pct, q_progress_pct, q_teaching_pct,
q_information_pct, q_curriculum_pct, q_future_pct,
q_leadership_pct, q_wellbeing_pct, q_recommend_pct, q_sen_pct)
VALUES
(:urn, :survey_date, :total_responses,
:q_happy_pct, :q_safe_pct, :q_behaviour_pct, :q_bullying_pct,
:q_communication_pct, :q_progress_pct, :q_teaching_pct,
:q_information_pct, :q_curriculum_pct, :q_future_pct,
:q_leadership_pct, :q_wellbeing_pct, :q_recommend_pct, :q_sen_pct)
ON CONFLICT (urn) DO UPDATE SET
survey_date = EXCLUDED.survey_date,
total_responses = EXCLUDED.total_responses,
q_happy_pct = EXCLUDED.q_happy_pct,
q_safe_pct = EXCLUDED.q_safe_pct,
q_behaviour_pct = EXCLUDED.q_behaviour_pct,
q_bullying_pct = EXCLUDED.q_bullying_pct,
q_communication_pct = EXCLUDED.q_communication_pct,
q_progress_pct = EXCLUDED.q_progress_pct,
q_teaching_pct = EXCLUDED.q_teaching_pct,
q_information_pct = EXCLUDED.q_information_pct,
q_curriculum_pct = EXCLUDED.q_curriculum_pct,
q_future_pct = EXCLUDED.q_future_pct,
q_leadership_pct = EXCLUDED.q_leadership_pct,
q_wellbeing_pct = EXCLUDED.q_wellbeing_pct,
q_recommend_pct = EXCLUDED.q_recommend_pct,
q_sen_pct = EXCLUDED.q_sen_pct
"""),
record,
)
inserted += 1
if inserted % 2000 == 0:
session.flush()
print(f" ParentView: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,132 +0,0 @@
"""
Phonics Screening Check downloader and loader.
Source: EES publication "phonics-screening-check-and-key-stage-1-assessments-england"
Update: Annual (September/October)
"""
import argparse
import sys
from pathlib import Path
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
from sources.ees import get_latest_csv_url, download_csv
DEST_DIR = SUPPLEMENTARY_DIR / "phonics"
PUBLICATION_SLUG = "phonics-screening-check-and-key-stage-1-assessments-england"
# Known column names in the phonics CSV (vary by year)
COLUMN_MAP = {
"URN": "urn",
"urn": "urn",
# Year 1 pass rate
"PPTA1": "year1_phonics_pct", # % meeting expected standard Y1
"PPTA1B": "year1_phonics_pct",
"PT_MET_PHON_Y1": "year1_phonics_pct",
"Y1_MET_EXPECTED_PCT": "year1_phonics_pct",
# Year 2 (re-takers)
"PPTA2": "year2_phonics_pct",
"PT_MET_PHON_Y2": "year2_phonics_pct",
"Y2_MET_EXPECTED_PCT": "year2_phonics_pct",
# Year label
"YEAR": "year",
"Year": "year",
}
NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", ""}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "phonics") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school")
if not url:
raise RuntimeError(f"Could not find CSV URL for phonics publication")
filename = url.split("/")[-1].split("?")[0] or "phonics_latest.csv"
return download_csv(url, dest / filename)
def _parse_pct(val) -> float | None:
if pd.isna(val):
return None
s = str(val).strip().upper().replace("%", "")
if s in NULL_VALUES:
return None
try:
return float(s)
except ValueError:
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "phonics") if data_dir else DEST_DIR
files = sorted(dest.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No phonics CSV found in {dest}")
path = files[-1]
print(f" Phonics: loading {path} ...")
df = pd.read_csv(path, encoding="latin-1", low_memory=False)
df.rename(columns=COLUMN_MAP, inplace=True)
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
# Infer year from filename if not in data
year = None
import re
m = re.search(r"20(\d{2})", path.stem)
if m:
year = int("20" + m.group(1))
inserted = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year
if not row_year:
continue
session.execute(
text("""
INSERT INTO phonics (urn, year, year1_phonics_pct, year2_phonics_pct)
VALUES (:urn, :year, :y1, :y2)
ON CONFLICT (urn, year) DO UPDATE SET
year1_phonics_pct = EXCLUDED.year1_phonics_pct,
year2_phonics_pct = EXCLUDED.year2_phonics_pct
"""),
{
"urn": urn,
"year": row_year,
"y1": _parse_pct(row.get("year1_phonics_pct")),
"y2": _parse_pct(row.get("year2_phonics_pct")),
},
)
inserted += 1
if inserted % 5000 == 0:
session.flush()
print(f" Phonics: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,150 +0,0 @@
"""
SEN (Special Educational Needs) primary need type breakdown.
Source: EES publication "special-educational-needs-in-england"
Update: Annual (September)
"""
import argparse
import re
import sys
from pathlib import Path
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SUPPLEMENTARY_DIR
from db import get_session
from sources.ees import get_latest_csv_url, download_csv
DEST_DIR = SUPPLEMENTARY_DIR / "sen_detail"
PUBLICATION_SLUG = "special-educational-needs-in-england"
NULL_VALUES = {"SUPP", "NE", "NA", "NP", "NEW", "LOW", "X", ""}
COLUMN_MAP = {
"URN": "urn",
"urn": "urn",
"YEAR": "year",
"Year": "year",
# Primary need types — DfE abbreviated codes
"PT_SPEECH": "primary_need_speech_pct", # SLCN
"PT_ASD": "primary_need_autism_pct", # ASD
"PT_MLD": "primary_need_mld_pct", # Moderate learning difficulty
"PT_SPLD": "primary_need_spld_pct", # Specific learning difficulty
"PT_SEMH": "primary_need_semh_pct", # Social, emotional, mental health
"PT_PHYSICAL": "primary_need_physical_pct", # Physical/sensory
"PT_OTHER": "primary_need_other_pct",
# Alternative naming
"SLCN_PCT": "primary_need_speech_pct",
"ASD_PCT": "primary_need_autism_pct",
"MLD_PCT": "primary_need_mld_pct",
"SPLD_PCT": "primary_need_spld_pct",
"SEMH_PCT": "primary_need_semh_pct",
"PHYSICAL_PCT": "primary_need_physical_pct",
"OTHER_PCT": "primary_need_other_pct",
}
def download(data_dir: Path | None = None) -> Path:
dest = (data_dir / "supplementary" / "sen_detail") if data_dir else DEST_DIR
dest.mkdir(parents=True, exist_ok=True)
url = get_latest_csv_url(PUBLICATION_SLUG, keyword="school")
if not url:
url = get_latest_csv_url(PUBLICATION_SLUG)
if not url:
raise RuntimeError("Could not find CSV URL for SEN publication")
filename = url.split("/")[-1].split("?")[0] or "sen_latest.csv"
return download_csv(url, dest / filename)
def _parse_pct(val) -> float | None:
if pd.isna(val):
return None
s = str(val).strip().upper().replace("%", "")
if s in NULL_VALUES:
return None
try:
return float(s)
except ValueError:
return None
def load(path: Path | None = None, data_dir: Path | None = None) -> dict:
if path is None:
dest = (data_dir / "supplementary" / "sen_detail") if data_dir else DEST_DIR
files = sorted(dest.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No SEN CSV found in {dest}")
path = files[-1]
print(f" SEN Detail: loading {path} ...")
df = pd.read_csv(path, encoding="latin-1", low_memory=False)
df.rename(columns=COLUMN_MAP, inplace=True)
if "urn" not in df.columns:
raise ValueError(f"URN column not found. Available: {list(df.columns)[:20]}")
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
df["urn"] = df["urn"].astype(int)
year = None
m = re.search(r"20(\d{2})", path.stem)
if m:
year = int("20" + m.group(1))
inserted = 0
with get_session() as session:
from sqlalchemy import text
for _, row in df.iterrows():
urn = int(row["urn"])
row_year = int(row["year"]) if "year" in df.columns and pd.notna(row.get("year")) else year
if not row_year:
continue
session.execute(
text("""
INSERT INTO sen_detail
(urn, year, primary_need_speech_pct, primary_need_autism_pct,
primary_need_mld_pct, primary_need_spld_pct, primary_need_semh_pct,
primary_need_physical_pct, primary_need_other_pct)
VALUES (:urn, :year, :speech, :autism, :mld, :spld, :semh, :physical, :other)
ON CONFLICT (urn, year) DO UPDATE SET
primary_need_speech_pct = EXCLUDED.primary_need_speech_pct,
primary_need_autism_pct = EXCLUDED.primary_need_autism_pct,
primary_need_mld_pct = EXCLUDED.primary_need_mld_pct,
primary_need_spld_pct = EXCLUDED.primary_need_spld_pct,
primary_need_semh_pct = EXCLUDED.primary_need_semh_pct,
primary_need_physical_pct = EXCLUDED.primary_need_physical_pct,
primary_need_other_pct = EXCLUDED.primary_need_other_pct
"""),
{
"urn": urn, "year": row_year,
"speech": _parse_pct(row.get("primary_need_speech_pct")),
"autism": _parse_pct(row.get("primary_need_autism_pct")),
"mld": _parse_pct(row.get("primary_need_mld_pct")),
"spld": _parse_pct(row.get("primary_need_spld_pct")),
"semh": _parse_pct(row.get("primary_need_semh_pct")),
"physical": _parse_pct(row.get("primary_need_physical_pct")),
"other": _parse_pct(row.get("primary_need_other_pct")),
},
)
inserted += 1
if inserted % 5000 == 0:
session.flush()
print(f" SEN Detail: upserted {inserted} records")
return {"inserted": inserted, "updated": 0, "skipped": 0}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
parser.add_argument("--data-dir", type=Path, default=None)
args = parser.parse_args()
if args.action in ("download", "all"):
download(args.data_dir)
if args.action in ("load", "all"):
load(data_dir=args.data_dir)

View File

@@ -1,70 +0,0 @@
"""
Data integrator HTTP server.
Kestra calls this server via HTTP tasks to trigger download/load operations.
"""
import importlib
import sys
import traceback
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
sys.path.insert(0, "/app/scripts")
app = FastAPI(title="SchoolCompare Data Integrator", version="1.0.0")
SOURCES = {
"ofsted", "gias", "parent_view",
"census", "admissions", "sen_detail",
"phonics", "idaci", "finance", "ks2",
}
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/run/{source}")
def run_source(source: str, action: str = "all"):
"""
Trigger a data source download and/or load.
action: "download" | "load" | "all"
"""
if source not in SOURCES:
raise HTTPException(status_code=404, detail=f"Unknown source '{source}'. Available: {sorted(SOURCES)}")
if action not in ("download", "load", "all"):
raise HTTPException(status_code=400, detail="action must be 'download', 'load', or 'all'")
try:
mod = importlib.import_module(f"sources.{source}")
result = {}
if action in ("download", "all"):
mod.download()
if action in ("load", "all"):
result = mod.load()
return {"source": source, "action": action, "result": result}
except Exception as e:
tb = traceback.format_exc()
raise HTTPException(status_code=500, detail={"error": str(e), "traceback": tb})
@app.post("/run-all")
def run_all(action: str = "all"):
"""Trigger all sources in sequence."""
results = {}
for source in sorted(SOURCES):
try:
mod = importlib.import_module(f"sources.{source}")
if action in ("download", "all"):
mod.download()
if action in ("load", "all"):
results[source] = mod.load()
except Exception as e:
results[source] = {"error": str(e)}
return results