Compare commits

..

2 Commits

Author SHA1 Message Date
f4f0257447 fix(ees-tap): add latin-1 encoding for census/admissions, default utf-8 for others
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 52s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Integrator (push) Successful in 55s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m40s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
DfE supporting-files CSVs (spc_school_level_underlying_data, AppsandOffers
SchoolLevel) are Latin-1 encoded. Add _encoding class attribute to base
stream class and override to 'latin-1' for census and admissions streams.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 09:41:40 +00:00
ca351e9d73 feat: migrate backend to marts schema, update EES tap for verified datasets
Pipeline:
- EES tap: split KS4 into performance + info streams, fix admissions filename
  (SchoolLevel keyword match), fix census filename (yearly suffix), remove
  phonics (no school-level data on EES), change endswith → in for matching
- stg_ees_ks4: rewrite to filter long-format data and extract Attainment 8,
  Progress 8, EBacc, English/Maths metrics; join KS4 info for context
- stg_ees_admissions: map real CSV columns (total_number_places_offered, etc.)
- stg_ees_census: update source reference, stub with TODO for data columns
- Remove stg_ees_phonics, fact_phonics (no school-level EES data)
- Add ees_ks4_performance + ees_ks4_info sources, remove ees_ks4 + ees_phonics
- Update int_ks4_with_lineage + fact_ks4_performance with new KS4 columns
- Annual EES DAG: remove stg_ees_phonics+ from selector

Backend:
- models.py: replace all models to point at marts.* tables with schema='marts'
  (DimSchool, DimLocation, KS2Performance, FactOfstedInspection, etc.)
- data_loader.py: rewrite load_school_data_as_dataframe() using raw SQL joining
  dim_school + dim_location + fact_ks2_performance; update get_supplementary_data()
- database.py: remove migration machinery, keep only connection setup
- app.py: remove check_and_migrate_if_needed, remove /api/admin/reimport-ks2
  endpoints (pipeline handles all imports)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 09:29:27 +00:00
18 changed files with 809 additions and 1246 deletions

View File

@@ -28,8 +28,6 @@ from .data_loader import (
get_supplementary_data,
)
from .data_loader import get_data_info as get_db_info
from .database import check_and_migrate_if_needed
from .migration import run_full_migration
from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS
from .utils import clean_for_json
@@ -138,20 +136,15 @@ def validate_postcode(postcode: Optional[str]) -> Optional[str]:
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - startup and shutdown events."""
# Startup: check schema version and migrate if needed
print("Starting up: Checking database schema...")
check_and_migrate_if_needed()
print("Loading school data from database...")
print("Loading school data from marts...")
df = load_school_data()
if df.empty:
print("Warning: No data in database. Check CSV files in data/ folder.")
print("Warning: No data in marts. Run the annual EES pipeline to populate KS2 data.")
else:
print(f"Data loaded successfully: {len(df)} records.")
yield # Application runs here
yield
# Shutdown: cleanup if needed
print("Shutting down...")
@@ -585,7 +578,7 @@ async def get_data_info(request: Request):
if db_info["total_schools"] == 0:
return {
"status": "no_data",
"message": "No data in database. Run the migration script: python scripts/migrate_csv_to_db.py",
"message": "No data in marts. Run the annual EES pipeline to load KS2 data.",
"data_source": "PostgreSQL",
}
@@ -635,56 +628,6 @@ async def reload_data(
return {"status": "reloaded"}
_reimport_status: dict = {"running": False, "done": False, "error": None}
@app.post("/api/admin/reimport-ks2")
@limiter.limit("2/minute")
async def reimport_ks2(
request: Request,
geocode: bool = True,
_: bool = Depends(verify_admin_api_key)
):
"""
Start a full KS2 CSV migration in the background and return immediately.
Poll GET /api/admin/reimport-ks2/status to check progress.
Pass ?geocode=false to skip postcode → lat/lng resolution.
Requires X-API-Key header with valid admin API key.
"""
global _reimport_status
if _reimport_status["running"]:
return {"status": "already_running"}
_reimport_status = {"running": True, "done": False, "error": None}
def _run():
global _reimport_status
try:
success = run_full_migration(geocode=geocode)
if not success:
_reimport_status = {"running": False, "done": False, "error": "No CSV data found"}
return
clear_cache()
load_school_data()
_reimport_status = {"running": False, "done": True, "error": None}
except Exception as exc:
_reimport_status = {"running": False, "done": False, "error": str(exc)}
import threading
threading.Thread(target=_run, daemon=True).start()
return {"status": "started"}
@app.get("/api/admin/reimport-ks2/status")
async def reimport_ks2_status(
request: Request,
_: bool = Depends(verify_admin_api_key)
):
"""Poll this endpoint to check reimport progress."""
s = _reimport_status
if s["error"]:
raise HTTPException(status_code=500, detail=s["error"])
return {"running": s["running"], "done": s["done"]}
# =============================================================================

View File

@@ -1,29 +1,24 @@
"""
Data loading module that queries from PostgreSQL database.
Provides efficient queries with caching and lazy loading.
Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py).
Only user search postcodes are geocoded on-demand via geocode_single_postcode().
Data loading module — reads from marts.* tables built by dbt.
Provides efficient queries with caching.
"""
import pandas as pd
import numpy as np
from functools import lru_cache
from typing import Optional, Dict, Tuple, List
import requests
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload, Session
from sqlalchemy import text
from sqlalchemy.orm import Session
from .config import settings
from .database import SessionLocal, get_db_session
from .database import SessionLocal, engine
from .models import (
School, SchoolResult,
OfstedInspection, OfstedParentView, SchoolCensus,
SchoolAdmissions, SenDetail, Phonics, SchoolDeprivation, SchoolFinance,
DimSchool, DimLocation, KS2Performance,
FactOfstedInspection, FactParentView, FactAdmissions,
FactDeprivation, FactFinance,
)
from .schemas import SCHOOL_TYPE_MAP
# Cache for user search postcode geocoding (not for school data)
_postcode_cache: Dict[str, Tuple[float, float]] = {}
@@ -31,515 +26,165 @@ def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
"""Convert cryptic school type codes to user-friendly names."""
if not school_type:
return None
# Check if it's a code that needs mapping
code = school_type.strip().upper()
if code in SCHOOL_TYPE_MAP:
return SCHOOL_TYPE_MAP[code]
# Return original if already a friendly name or unknown code
return school_type
def get_school_type_codes_for_filter(school_type: str) -> List[str]:
"""Get all database codes that map to a given friendly name."""
if not school_type:
return []
school_type_lower = school_type.lower()
# Collect all codes that map to this friendly name
codes = []
for code, friendly_name in SCHOOL_TYPE_MAP.items():
if friendly_name.lower() == school_type_lower:
codes.append(code.lower())
# Also include the school_type itself (case-insensitive) in case it's stored as-is
codes.append(school_type_lower)
return codes
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}',
timeout=10
f"https://api.postcodes.io/postcodes/{postcode}",
timeout=10,
)
if response.status_code == 200:
data = response.json()
if data.get('result'):
lat = data['result'].get('latitude')
lon = data['result'].get('longitude')
if data.get("result"):
lat = data["result"].get("latitude")
lon = data["result"].get("longitude")
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in miles).
"""
"""Calculate great-circle distance between two points (miles)."""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
return 2 * asin(sqrt(a)) * 3956
# =============================================================================
# DATABASE QUERY FUNCTIONS
# MAIN DATA LOAD — joins dim_school + dim_location + fact_ks2_performance
# =============================================================================
def get_db():
"""Get a database session."""
return SessionLocal()
_MAIN_QUERY = text("""
SELECT
s.urn,
s.school_name,
s.phase,
s.school_type,
s.academy_trust_name AS trust_name,
s.academy_trust_uid AS trust_uid,
s.religious_character AS religious_denomination,
s.gender,
s.age_range,
s.capacity,
s.headteacher_name,
s.website,
s.ofsted_grade,
s.ofsted_date,
s.ofsted_framework,
l.local_authority_name AS local_authority,
l.local_authority_code,
l.address_line1 AS address1,
l.address_line2 AS address2,
l.town,
l.postcode,
l.latitude,
l.longitude,
-- KS2 performance
k.year,
k.source_urn,
k.total_pupils,
k.eligible_pupils,
k.rwm_expected_pct,
k.rwm_high_pct,
k.reading_expected_pct,
k.reading_high_pct,
k.reading_avg_score,
k.reading_progress,
k.writing_expected_pct,
k.writing_high_pct,
k.writing_progress,
k.maths_expected_pct,
k.maths_high_pct,
k.maths_avg_score,
k.maths_progress,
k.gps_expected_pct,
k.gps_high_pct,
k.gps_avg_score,
k.science_expected_pct,
k.reading_absence_pct,
k.writing_absence_pct,
k.maths_absence_pct,
k.gps_absence_pct,
k.science_absence_pct,
k.rwm_expected_boys_pct,
k.rwm_high_boys_pct,
k.rwm_expected_girls_pct,
k.rwm_high_girls_pct,
k.rwm_expected_disadvantaged_pct,
k.rwm_expected_non_disadvantaged_pct,
k.disadvantaged_gap,
k.disadvantaged_pct,
k.eal_pct,
k.sen_support_pct,
k.sen_ehcp_pct,
k.stability_pct
FROM marts.dim_school s
JOIN marts.dim_location l ON s.urn = l.urn
JOIN marts.fact_ks2_performance k ON s.urn = k.urn
ORDER BY s.school_name, k.year
""")
def get_available_years(db: Session = None) -> List[int]:
"""Get list of available years in the database."""
close_db = db is None
if db is None:
db = get_db()
def load_school_data_as_dataframe() -> pd.DataFrame:
"""Load all school + KS2 data as a pandas DataFrame."""
try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
return [r[0] for r in result]
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types (normalized to user-friendly names)."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.all()
# Normalize codes to friendly names and deduplicate
normalized = set()
for r in result:
if r[0]:
friendly_name = normalize_school_type(r[0])
if friendly_name:
normalized.add(friendly_name)
return sorted(normalized)
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
"""
Get paginated list of schools with optional filters.
Returns (schools, total_count).
"""
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
return schools, total
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
"""
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
"""
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get all matching schools and calculate distances
all_schools = query.all()
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
"""
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
"""
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
return query.limit(limit).all()
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
}
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
"""
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
"""
close_db = db is None
if db is None:
db = get_db()
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
# Load Ofsted data into a lookup dict (urn → grade, date)
ofsted_lookup: Dict[int, dict] = {}
try:
ofsted_rows = db.query(
OfstedInspection.urn,
OfstedInspection.overall_effectiveness,
OfstedInspection.inspection_date,
).all()
for o in ofsted_rows:
ofsted_lookup[o.urn] = {
"ofsted_grade": o.overall_effectiveness,
"ofsted_date": o.inspection_date.isoformat() if o.inspection_date else None,
}
except Exception:
pass # Table may not exist yet on first run
rows = []
for school in schools:
ofsted = ofsted_lookup.get(school.urn, {})
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
# Ofsted (for list view)
"ofsted_grade": ofsted.get("ofsted_grade"),
"ofsted_date": ofsted.get("ofsted_date"),
**result_to_dict(result)
}
rows.append(row)
if rows:
return pd.DataFrame(rows)
df = pd.read_sql(_MAIN_QUERY, engine)
except Exception as exc:
print(f"Warning: Could not load school data from marts: {exc}")
return pd.DataFrame()
finally:
if close_db:
db.close()
if df.empty:
return df
# Build address string
df["address"] = df.apply(
lambda r: ", ".join(
p for p in [r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode")]
if p and str(p) != "None"
),
axis=1,
)
# Normalize school type
df["school_type"] = df["school_type"].apply(normalize_school_type)
return df
# Cache for DataFrame (legacy compatibility)
# Cache for DataFrame
_df_cache: Optional[pd.DataFrame] = None
def load_school_data() -> pd.DataFrame:
"""
Legacy function to load school data as DataFrame.
Uses caching for performance.
"""
"""Load school data with caching."""
global _df_cache
if _df_cache is not None:
return _df_cache
print("Loading school data from database...")
print("Loading school data from marts...")
_df_cache = load_school_data_as_dataframe()
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}")
else:
print("No data found in database")
print("No data found in marts (EES data may not have been loaded yet)")
return _df_cache
@@ -549,136 +194,198 @@ def clear_cache():
_df_cache = None
# =============================================================================
# METADATA QUERIES
# =============================================================================
def get_available_years(db: Session = None) -> List[int]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = db.query(KS2Performance.year).distinct().order_by(KS2Performance.year).all()
return [r[0] for r in result]
except Exception:
return []
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = (
db.query(DimLocation.local_authority_name)
.filter(DimLocation.local_authority_name.isnot(None))
.distinct()
.order_by(DimLocation.local_authority_name)
.all()
)
return [r[0] for r in result if r[0]]
except Exception:
return []
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
close_db = db is None
if db is None:
db = SessionLocal()
try:
return db.query(DimSchool).count()
except Exception:
return 0
finally:
if close_db:
db.close()
def get_data_info(db: Session = None) -> dict:
close_db = db is None
if db is None:
db = SessionLocal()
try:
school_count = get_schools_count(db)
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL (marts)",
}
finally:
if close_db:
db.close()
# =============================================================================
# SUPPLEMENTARY DATA — per-school detail page
# =============================================================================
def get_supplementary_data(db: Session, urn: int) -> dict:
"""
Fetch all supplementary data for a single school URN.
Returns a dict with keys: ofsted, parent_view, census, admissions, sen_detail,
phonics, deprivation, finance. Values are dicts or None.
"""
"""Fetch all supplementary data for a single school URN."""
result = {}
def safe_query(model, pk_field, latest_year_field=None):
def safe_query(model, pk_field, latest_field=None):
try:
if latest_year_field:
row = (
db.query(model)
.filter(getattr(model, pk_field) == urn)
.order_by(getattr(model, latest_year_field).desc())
.first()
)
else:
row = db.query(model).filter(getattr(model, pk_field) == urn).first()
return row
q = db.query(model).filter(getattr(model, pk_field) == urn)
if latest_field:
q = q.order_by(getattr(model, latest_field).desc())
return q.first()
except Exception:
return None
# Ofsted inspection
o = safe_query(OfstedInspection, "urn")
result["ofsted"] = {
"framework": o.framework,
"inspection_date": o.inspection_date.isoformat() if o.inspection_date else None,
"inspection_type": o.inspection_type,
# OEIF fields (old framework)
"overall_effectiveness": o.overall_effectiveness,
"quality_of_education": o.quality_of_education,
"behaviour_attitudes": o.behaviour_attitudes,
"personal_development": o.personal_development,
"leadership_management": o.leadership_management,
"early_years_provision": o.early_years_provision,
"previous_overall": o.previous_overall,
# Report Card fields (new framework, from Nov 2025)
"rc_safeguarding_met": o.rc_safeguarding_met,
"rc_inclusion": o.rc_inclusion,
"rc_curriculum_teaching": o.rc_curriculum_teaching,
"rc_achievement": o.rc_achievement,
"rc_attendance_behaviour": o.rc_attendance_behaviour,
"rc_personal_development": o.rc_personal_development,
"rc_leadership_governance": o.rc_leadership_governance,
"rc_early_years": o.rc_early_years,
"rc_sixth_form": o.rc_sixth_form,
} if o else None
# Latest Ofsted inspection
o = safe_query(FactOfstedInspection, "urn", "inspection_date")
result["ofsted"] = (
{
"framework": o.framework,
"inspection_date": o.inspection_date.isoformat() if o.inspection_date else None,
"inspection_type": o.inspection_type,
"overall_effectiveness": o.overall_effectiveness,
"quality_of_education": o.quality_of_education,
"behaviour_attitudes": o.behaviour_attitudes,
"personal_development": o.personal_development,
"leadership_management": o.leadership_management,
"early_years_provision": o.early_years_provision,
"sixth_form_provision": o.sixth_form_provision,
"previous_overall": None, # Not available in new schema
"rc_safeguarding_met": o.rc_safeguarding_met,
"rc_inclusion": o.rc_inclusion,
"rc_curriculum_teaching": o.rc_curriculum_teaching,
"rc_achievement": o.rc_achievement,
"rc_attendance_behaviour": o.rc_attendance_behaviour,
"rc_personal_development": o.rc_personal_development,
"rc_leadership_governance": o.rc_leadership_governance,
"rc_early_years": o.rc_early_years,
"rc_sixth_form": o.rc_sixth_form,
"report_url": o.report_url,
}
if o
else None
)
# Parent View
pv = safe_query(OfstedParentView, "urn")
result["parent_view"] = {
"survey_date": pv.survey_date.isoformat() if pv.survey_date else None,
"total_responses": pv.total_responses,
"q_happy_pct": pv.q_happy_pct,
"q_safe_pct": pv.q_safe_pct,
"q_behaviour_pct": pv.q_behaviour_pct,
"q_bullying_pct": pv.q_bullying_pct,
"q_communication_pct": pv.q_communication_pct,
"q_progress_pct": pv.q_progress_pct,
"q_teaching_pct": pv.q_teaching_pct,
"q_information_pct": pv.q_information_pct,
"q_curriculum_pct": pv.q_curriculum_pct,
"q_future_pct": pv.q_future_pct,
"q_leadership_pct": pv.q_leadership_pct,
"q_wellbeing_pct": pv.q_wellbeing_pct,
"q_recommend_pct": pv.q_recommend_pct,
"q_sen_pct": pv.q_sen_pct,
} if pv else None
pv = safe_query(FactParentView, "urn")
result["parent_view"] = (
{
"survey_date": pv.survey_date.isoformat() if pv.survey_date else None,
"total_responses": pv.total_responses,
"q_happy_pct": pv.q_happy_pct,
"q_safe_pct": pv.q_safe_pct,
"q_behaviour_pct": pv.q_behaviour_pct,
"q_bullying_pct": pv.q_bullying_pct,
"q_communication_pct": pv.q_communication_pct,
"q_progress_pct": pv.q_progress_pct,
"q_teaching_pct": pv.q_teaching_pct,
"q_information_pct": pv.q_information_pct,
"q_curriculum_pct": pv.q_curriculum_pct,
"q_future_pct": pv.q_future_pct,
"q_leadership_pct": pv.q_leadership_pct,
"q_wellbeing_pct": pv.q_wellbeing_pct,
"q_recommend_pct": pv.q_recommend_pct,
}
if pv
else None
)
# School Census (latest year)
c = safe_query(SchoolCensus, "urn", "year")
result["census"] = {
"year": c.year,
"class_size_avg": c.class_size_avg,
"ethnicity_white_pct": c.ethnicity_white_pct,
"ethnicity_asian_pct": c.ethnicity_asian_pct,
"ethnicity_black_pct": c.ethnicity_black_pct,
"ethnicity_mixed_pct": c.ethnicity_mixed_pct,
"ethnicity_other_pct": c.ethnicity_other_pct,
} if c else None
# Census (fact_pupil_characteristics — minimal until census columns are verified)
result["census"] = None
# Admissions (latest year)
a = safe_query(SchoolAdmissions, "urn", "year")
result["admissions"] = {
"year": a.year,
"published_admission_number": a.published_admission_number,
"total_applications": a.total_applications,
"first_preference_offers_pct": a.first_preference_offers_pct,
"oversubscribed": a.oversubscribed,
} if a else None
a = safe_query(FactAdmissions, "urn", "year")
result["admissions"] = (
{
"year": a.year,
"school_phase": a.school_phase,
"published_admission_number": a.published_admission_number,
"total_applications": a.total_applications,
"first_preference_applications": a.first_preference_applications,
"first_preference_offers": a.first_preference_offers,
"first_preference_offer_pct": a.first_preference_offer_pct,
"oversubscribed": a.oversubscribed,
}
if a
else None
)
# SEN Detail (latest year)
s = safe_query(SenDetail, "urn", "year")
result["sen_detail"] = {
"year": s.year,
"primary_need_speech_pct": s.primary_need_speech_pct,
"primary_need_autism_pct": s.primary_need_autism_pct,
"primary_need_mld_pct": s.primary_need_mld_pct,
"primary_need_spld_pct": s.primary_need_spld_pct,
"primary_need_semh_pct": s.primary_need_semh_pct,
"primary_need_physical_pct": s.primary_need_physical_pct,
"primary_need_other_pct": s.primary_need_other_pct,
} if s else None
# SEN detail — not available in current marts
result["sen_detail"] = None
# Phonics (latest year)
ph = safe_query(Phonics, "urn", "year")
result["phonics"] = {
"year": ph.year,
"year1_phonics_pct": ph.year1_phonics_pct,
"year2_phonics_pct": ph.year2_phonics_pct,
} if ph else None
# Phonics — no school-level data on EES
result["phonics"] = None
# Deprivation
d = safe_query(SchoolDeprivation, "urn")
result["deprivation"] = {
"lsoa_code": d.lsoa_code,
"idaci_score": d.idaci_score,
"idaci_decile": d.idaci_decile,
} if d else None
d = safe_query(FactDeprivation, "urn")
result["deprivation"] = (
{
"lsoa_code": d.lsoa_code,
"idaci_score": d.idaci_score,
"idaci_decile": d.idaci_decile,
}
if d
else None
)
# Finance (latest year)
f = safe_query(SchoolFinance, "urn", "year")
result["finance"] = {
"year": f.year,
"per_pupil_spend": f.per_pupil_spend,
"staff_cost_pct": f.staff_cost_pct,
"teacher_cost_pct": f.teacher_cost_pct,
"support_staff_cost_pct": f.support_staff_cost_pct,
"premises_cost_pct": f.premises_cost_pct,
} if f else None
f = safe_query(FactFinance, "urn", "year")
result["finance"] = (
{
"year": f.year,
"per_pupil_spend": f.per_pupil_spend,
"staff_cost_pct": f.staff_cost_pct,
"teacher_cost_pct": f.teacher_cost_pct,
"support_staff_cost_pct": f.support_staff_cost_pct,
"premises_cost_pct": f.premises_cost_pct,
}
if f
else None
)
return result

View File

@@ -1,36 +1,30 @@
"""
Database connection setup using SQLAlchemy.
The schema is managed by dbt — the backend only reads from marts.* tables.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker, declarative_base
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from .config import settings
# Create engine
engine = create_engine(
settings.database_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Verify connections before use
echo=False, # Set to True for SQL debugging
pool_pre_ping=True,
echo=False,
)
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base class for models
Base = declarative_base()
def get_db():
"""
Dependency for FastAPI routes to get a database session.
"""
"""Dependency for FastAPI routes."""
db = SessionLocal()
try:
yield db
@@ -40,10 +34,7 @@ def get_db():
@contextmanager
def get_db_session():
"""
Context manager for database sessions.
Use in non-FastAPI contexts (scripts, etc).
"""
"""Context manager for non-FastAPI contexts."""
db = SessionLocal()
try:
yield db
@@ -53,95 +44,3 @@ def get_db_session():
raise
finally:
db.close()
def init_db():
"""
Initialize database - create all tables.
"""
Base.metadata.create_all(bind=engine)
def drop_db():
"""
Drop all tables - use with caution!
"""
Base.metadata.drop_all(bind=engine)
def get_db_schema_version() -> Optional[int]:
"""
Get the current schema version from the database.
Returns None if table doesn't exist or no version is set.
"""
from .models import SchemaVersion # Import here to avoid circular imports
# Check if schema_version table exists
inspector = inspect(engine)
if "schema_version" not in inspector.get_table_names():
return None
try:
with get_db_session() as db:
row = db.query(SchemaVersion).first()
return row.version if row else None
except Exception:
return None
def set_db_schema_version(version: int):
"""
Set/update the schema version in the database.
Creates the row if it doesn't exist.
"""
from .models import SchemaVersion
with get_db_session() as db:
row = db.query(SchemaVersion).first()
if row:
row.version = version
row.migrated_at = datetime.utcnow()
else:
db.add(SchemaVersion(id=1, version=version, migrated_at=datetime.utcnow()))
def check_and_migrate_if_needed():
"""
Check schema version and run migration if needed.
Called during application startup.
"""
from .version import SCHEMA_VERSION
from .migration import run_full_migration
db_version = get_db_schema_version()
if db_version == SCHEMA_VERSION:
print(f"Schema version {SCHEMA_VERSION} matches. Fast startup.")
# Still ensure tables exist (they should if version matches)
init_db()
return
if db_version is None:
print(f"No schema version found. Running initial migration (v{SCHEMA_VERSION})...")
else:
print(f"Schema mismatch: DB has v{db_version}, code expects v{SCHEMA_VERSION}")
print("Running full migration...")
try:
# Set schema version BEFORE migration so a crash mid-migration
# doesn't cause an infinite re-migration loop on every restart.
init_db()
set_db_schema_version(SCHEMA_VERSION)
success = run_full_migration(geocode=False)
if success:
print(f"Migration complete. Schema version {SCHEMA_VERSION}.")
else:
print("Warning: Migration completed but no data was imported.")
except Exception as e:
print(f"FATAL: Migration failed: {e}")
print("Application cannot start. Please check database and CSV files.")
raise

View File

@@ -1,408 +1,216 @@
"""
SQLAlchemy database models for school data.
Normalized schema with separate tables for schools and yearly results.
SQLAlchemy models — all tables live in the marts schema, built by dbt.
Read-only: the pipeline writes to these tables; the backend only reads.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Float, Boolean, Date, Text, Index
from sqlalchemy import (
Column, Integer, String, Float, ForeignKey, Index, UniqueConstraint,
Text, Boolean, DateTime, Date
)
from sqlalchemy.orm import relationship
from .database import Base
MARTS = {"schema": "marts"}
class School(Base):
"""
Core school information - relatively static data.
"""
__tablename__ = "schools"
id = Column(Integer, primary_key=True, autoincrement=True)
urn = Column(Integer, unique=True, nullable=False, index=True)
class DimSchool(Base):
"""Canonical school dimension — one row per active URN."""
__tablename__ = "dim_school"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
school_name = Column(String(255), nullable=False)
local_authority = Column(String(100))
local_authority_code = Column(Integer)
phase = Column(String(100))
school_type = Column(String(100))
school_type_code = Column(String(10))
religious_denomination = Column(String(100))
academy_trust_name = Column(String(255))
academy_trust_uid = Column(String(20))
religious_character = Column(String(100))
gender = Column(String(20))
age_range = Column(String(20))
# Address
address1 = Column(String(255))
address2 = Column(String(255))
capacity = Column(Integer)
total_pupils = Column(Integer)
headteacher_name = Column(String(200))
website = Column(String(255))
telephone = Column(String(30))
status = Column(String(50))
nursery_provision = Column(Boolean)
admissions_policy = Column(String(50))
# Denormalised Ofsted summary (updated by monthly pipeline)
ofsted_grade = Column(Integer)
ofsted_date = Column(Date)
ofsted_framework = Column(String(20))
class DimLocation(Base):
"""School location — address, lat/lng from easting/northing (BNG→WGS84)."""
__tablename__ = "dim_location"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
address_line1 = Column(String(255))
address_line2 = Column(String(255))
town = Column(String(100))
postcode = Column(String(20), index=True)
# Geocoding (cached)
county = Column(String(100))
postcode = Column(String(20))
local_authority_code = Column(Integer)
local_authority_name = Column(String(100))
parliamentary_constituency = Column(String(100))
urban_rural = Column(String(50))
easting = Column(Integer)
northing = Column(Integer)
latitude = Column(Float)
longitude = Column(Float)
# GIAS enrichment fields
website = Column(String(255))
headteacher_name = Column(String(200))
capacity = Column(Integer)
trust_name = Column(String(255))
trust_uid = Column(String(20))
gender = Column(String(20)) # Mixed / Girls / Boys
nursery_provision = Column(Boolean)
# Relationships
results = relationship("SchoolResult", back_populates="school", cascade="all, delete-orphan")
def __repr__(self):
return f"<School(urn={self.urn}, name='{self.school_name}')>"
@property
def address(self):
"""Combine address fields into single string."""
parts = [self.address1, self.address2, self.town, self.postcode]
return ", ".join(p for p in parts if p)
# geom is a PostGIS geometry — not mapped to SQLAlchemy (accessed via raw SQL)
class SchoolResult(Base):
"""
Yearly KS2 results for a school.
Each school can have multiple years of results.
"""
__tablename__ = "school_results"
id = Column(Integer, primary_key=True, autoincrement=True)
school_id = Column(Integer, ForeignKey("schools.id", ondelete="CASCADE"), nullable=False)
year = Column(Integer, nullable=False, index=True)
# Pupil numbers
class KS2Performance(Base):
"""KS2 attainment — one row per URN per year (includes predecessor data)."""
__tablename__ = "fact_ks2_performance"
__table_args__ = (
Index("ix_ks2_urn_year", "urn", "year"),
MARTS,
)
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
source_urn = Column(Integer)
total_pupils = Column(Integer)
eligible_pupils = Column(Integer)
# Core KS2 metrics - Expected Standard
# Core attainment
rwm_expected_pct = Column(Float)
reading_expected_pct = Column(Float)
writing_expected_pct = Column(Float)
maths_expected_pct = Column(Float)
gps_expected_pct = Column(Float)
science_expected_pct = Column(Float)
# Higher Standard
rwm_high_pct = Column(Float)
reading_expected_pct = Column(Float)
reading_high_pct = Column(Float)
writing_high_pct = Column(Float)
maths_high_pct = Column(Float)
gps_high_pct = Column(Float)
# Progress Scores
reading_progress = Column(Float)
writing_progress = Column(Float)
maths_progress = Column(Float)
# Average Scores
reading_avg_score = Column(Float)
reading_progress = Column(Float)
writing_expected_pct = Column(Float)
writing_high_pct = Column(Float)
writing_progress = Column(Float)
maths_expected_pct = Column(Float)
maths_high_pct = Column(Float)
maths_avg_score = Column(Float)
maths_progress = Column(Float)
gps_expected_pct = Column(Float)
gps_high_pct = Column(Float)
gps_avg_score = Column(Float)
# School Context
science_expected_pct = Column(Float)
# Absence
reading_absence_pct = Column(Float)
writing_absence_pct = Column(Float)
maths_absence_pct = Column(Float)
gps_absence_pct = Column(Float)
science_absence_pct = Column(Float)
# Gender
rwm_expected_boys_pct = Column(Float)
rwm_high_boys_pct = Column(Float)
rwm_expected_girls_pct = Column(Float)
rwm_high_girls_pct = Column(Float)
# Disadvantaged
rwm_expected_disadvantaged_pct = Column(Float)
rwm_expected_non_disadvantaged_pct = Column(Float)
disadvantaged_gap = Column(Float)
# Context
disadvantaged_pct = Column(Float)
eal_pct = Column(Float)
sen_support_pct = Column(Float)
sen_ehcp_pct = Column(Float)
stability_pct = Column(Float)
# Pupil Absence from Tests
reading_absence_pct = Column(Float)
gps_absence_pct = Column(Float)
maths_absence_pct = Column(Float)
writing_absence_pct = Column(Float)
science_absence_pct = Column(Float)
# Gender Breakdown
rwm_expected_boys_pct = Column(Float)
rwm_expected_girls_pct = Column(Float)
rwm_high_boys_pct = Column(Float)
rwm_high_girls_pct = Column(Float)
# Disadvantaged Performance
rwm_expected_disadvantaged_pct = Column(Float)
rwm_expected_non_disadvantaged_pct = Column(Float)
disadvantaged_gap = Column(Float)
# 3-Year Averages
rwm_expected_3yr_pct = Column(Float)
reading_avg_3yr = Column(Float)
maths_avg_3yr = Column(Float)
# Relationship
school = relationship("School", back_populates="results")
# Constraints
class FactOfstedInspection(Base):
"""Full Ofsted inspection history — one row per inspection."""
__tablename__ = "fact_ofsted_inspection"
__table_args__ = (
UniqueConstraint('school_id', 'year', name='uq_school_year'),
Index('ix_school_results_school_year', 'school_id', 'year'),
Index("ix_ofsted_urn_date", "urn", "inspection_date"),
MARTS,
)
def __repr__(self):
return f"<SchoolResult(school_id={self.school_id}, year={self.year})>"
class SchemaVersion(Base):
"""
Tracks database schema version for automatic migrations.
Single-row table that stores the current schema version.
"""
__tablename__ = "schema_version"
id = Column(Integer, primary_key=True, default=1)
version = Column(Integer, nullable=False)
migrated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<SchemaVersion(version={self.version}, migrated_at={self.migrated_at})>"
# ---------------------------------------------------------------------------
# Supplementary data tables (populated by the Kestra data integrator)
# ---------------------------------------------------------------------------
class OfstedInspection(Base):
"""Latest Ofsted inspection judgement per school."""
__tablename__ = "ofsted_inspections"
urn = Column(Integer, primary_key=True)
inspection_date = Column(Date)
publication_date = Column(Date)
inspection_type = Column(String(100)) # Section 5 / Section 8 etc.
# Which inspection framework was used: 'OEIF' or 'ReportCard'
inspection_date = Column(Date, primary_key=True)
inspection_type = Column(String(100))
framework = Column(String(20))
# --- OEIF grades (old framework, pre-Nov 2025) ---
# 1=Outstanding 2=Good 3=Requires improvement 4=Inadequate
overall_effectiveness = Column(Integer)
quality_of_education = Column(Integer)
behaviour_attitudes = Column(Integer)
personal_development = Column(Integer)
leadership_management = Column(Integer)
early_years_provision = Column(Integer) # nullable — not all schools
previous_overall = Column(Integer) # for trend display
# --- Report Card grades (new framework, from Nov 2025) ---
# 1=Exceptional 2=Strong 3=Expected standard 4=Needs attention 5=Urgent improvement
rc_safeguarding_met = Column(Boolean) # True=Met, False=Not met
early_years_provision = Column(Integer)
sixth_form_provision = Column(Integer)
rc_safeguarding_met = Column(Boolean)
rc_inclusion = Column(Integer)
rc_curriculum_teaching = Column(Integer)
rc_achievement = Column(Integer)
rc_attendance_behaviour = Column(Integer)
rc_personal_development = Column(Integer)
rc_leadership_governance = Column(Integer)
rc_early_years = Column(Integer) # nullable — not all schools
rc_sixth_form = Column(Integer) # nullable — secondary only
def __repr__(self):
return f"<OfstedInspection(urn={self.urn}, framework={self.framework}, overall={self.overall_effectiveness})>"
rc_early_years = Column(Integer)
rc_sixth_form = Column(Integer)
report_url = Column(Text)
class OfstedParentView(Base):
"""Ofsted Parent View survey — latest per school. 14 questions, % saying Yes."""
__tablename__ = "ofsted_parent_view"
class FactParentView(Base):
"""Ofsted Parent View survey — latest per school."""
__tablename__ = "fact_parent_view"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
survey_date = Column(Date)
total_responses = Column(Integer)
q_happy_pct = Column(Float) # My child is happy at this school
q_safe_pct = Column(Float) # My child feels safe at this school
q_bullying_pct = Column(Float) # School deals with bullying well
q_communication_pct = Column(Float) # School keeps me informed
q_progress_pct = Column(Float) # My child does well / good progress
q_teaching_pct = Column(Float) # Teaching is good
q_information_pct = Column(Float) # I receive valuable info about progress
q_curriculum_pct = Column(Float) # Broad range of subjects taught
q_future_pct = Column(Float) # Prepares child well for the future
q_leadership_pct = Column(Float) # Led and managed effectively
q_wellbeing_pct = Column(Float) # Supports wider personal development
q_behaviour_pct = Column(Float) # Pupils are well behaved
q_recommend_pct = Column(Float) # I would recommend this school
q_sen_pct = Column(Float) # Good information about child's SEN (where applicable)
def __repr__(self):
return f"<OfstedParentView(urn={self.urn}, responses={self.total_responses})>"
q_happy_pct = Column(Float)
q_safe_pct = Column(Float)
q_behaviour_pct = Column(Float)
q_bullying_pct = Column(Float)
q_communication_pct = Column(Float)
q_progress_pct = Column(Float)
q_teaching_pct = Column(Float)
q_information_pct = Column(Float)
q_curriculum_pct = Column(Float)
q_future_pct = Column(Float)
q_leadership_pct = Column(Float)
q_wellbeing_pct = Column(Float)
q_recommend_pct = Column(Float)
class SchoolCensus(Base):
"""Annual school census snapshot — class sizes and ethnicity breakdown."""
__tablename__ = "school_census"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
class_size_avg = Column(Float)
ethnicity_white_pct = Column(Float)
ethnicity_asian_pct = Column(Float)
ethnicity_black_pct = Column(Float)
ethnicity_mixed_pct = Column(Float)
ethnicity_other_pct = Column(Float)
class FactAdmissions(Base):
"""School admissions — one row per URN per year."""
__tablename__ = "fact_admissions"
__table_args__ = (
Index('ix_school_census_urn_year', 'urn', 'year'),
Index("ix_admissions_urn_year", "urn", "year"),
MARTS,
)
def __repr__(self):
return f"<SchoolCensus(urn={self.urn}, year={self.year})>"
class SchoolAdmissions(Base):
"""Annual admissions statistics per school."""
__tablename__ = "school_admissions"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
published_admission_number = Column(Integer) # PAN
school_phase = Column(String(50))
published_admission_number = Column(Integer)
total_applications = Column(Integer)
first_preference_offers_pct = Column(Float) # % receiving 1st choice
first_preference_applications = Column(Integer)
first_preference_offers = Column(Integer)
first_preference_offer_pct = Column(Float)
oversubscribed = Column(Boolean)
__table_args__ = (
Index('ix_school_admissions_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SchoolAdmissions(urn={self.urn}, year={self.year})>"
admissions_policy = Column(String(100))
class SenDetail(Base):
"""SEN primary need type breakdown — more granular than school_results context fields."""
__tablename__ = "sen_detail"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
primary_need_speech_pct = Column(Float) # SLCN
primary_need_autism_pct = Column(Float) # ASD
primary_need_mld_pct = Column(Float) # Moderate learning difficulty
primary_need_spld_pct = Column(Float) # Specific learning difficulty (dyslexia etc.)
primary_need_semh_pct = Column(Float) # Social, emotional, mental health
primary_need_physical_pct = Column(Float) # Physical/sensory
primary_need_other_pct = Column(Float)
__table_args__ = (
Index('ix_sen_detail_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SenDetail(urn={self.urn}, year={self.year})>"
class Phonics(Base):
"""Phonics Screening Check pass rates."""
__tablename__ = "phonics"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
year1_phonics_pct = Column(Float) # % reaching expected standard in Year 1
year2_phonics_pct = Column(Float) # % reaching standard in Year 2 (re-takers)
__table_args__ = (
Index('ix_phonics_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<Phonics(urn={self.urn}, year={self.year})>"
class SchoolDeprivation(Base):
"""IDACI deprivation index — derived via postcode → LSOA lookup."""
__tablename__ = "school_deprivation"
class FactDeprivation(Base):
"""IDACI deprivation index — one row per URN."""
__tablename__ = "fact_deprivation"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
lsoa_code = Column(String(20))
idaci_score = Column(Float) # 01, higher = more deprived
idaci_decile = Column(Integer) # 1 = most deprived, 10 = least deprived
def __repr__(self):
return f"<SchoolDeprivation(urn={self.urn}, decile={self.idaci_decile})>"
idaci_score = Column(Float)
idaci_decile = Column(Integer)
class SchoolFinance(Base):
"""FBIT financial benchmarking data."""
__tablename__ = "school_finance"
class FactFinance(Base):
"""FBIT financial benchmarking — one row per URN per year."""
__tablename__ = "fact_finance"
__table_args__ = (
Index("ix_finance_urn_year", "urn", "year"),
MARTS,
)
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
per_pupil_spend = Column(Float) # £ total expenditure per pupil
staff_cost_pct = Column(Float) # % of budget on all staff
teacher_cost_pct = Column(Float) # % on teachers specifically
per_pupil_spend = Column(Float)
staff_cost_pct = Column(Float)
teacher_cost_pct = Column(Float)
support_staff_cost_pct = Column(Float)
premises_cost_pct = Column(Float)
__table_args__ = (
Index('ix_school_finance_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SchoolFinance(urn={self.urn}, year={self.year})>"
# Mapping from CSV columns to model fields
SCHOOL_FIELD_MAPPING = {
'urn': 'urn',
'school_name': 'school_name',
'local_authority': 'local_authority',
'local_authority_code': 'local_authority_code',
'school_type': 'school_type',
'school_type_code': 'school_type_code',
'religious_denomination': 'religious_denomination',
'age_range': 'age_range',
'address1': 'address1',
'address2': 'address2',
'town': 'town',
'postcode': 'postcode',
}
RESULT_FIELD_MAPPING = {
'year': 'year',
'total_pupils': 'total_pupils',
'eligible_pupils': 'eligible_pupils',
# Expected Standard
'rwm_expected_pct': 'rwm_expected_pct',
'reading_expected_pct': 'reading_expected_pct',
'writing_expected_pct': 'writing_expected_pct',
'maths_expected_pct': 'maths_expected_pct',
'gps_expected_pct': 'gps_expected_pct',
'science_expected_pct': 'science_expected_pct',
# Higher Standard
'rwm_high_pct': 'rwm_high_pct',
'reading_high_pct': 'reading_high_pct',
'writing_high_pct': 'writing_high_pct',
'maths_high_pct': 'maths_high_pct',
'gps_high_pct': 'gps_high_pct',
# Progress
'reading_progress': 'reading_progress',
'writing_progress': 'writing_progress',
'maths_progress': 'maths_progress',
# Averages
'reading_avg_score': 'reading_avg_score',
'maths_avg_score': 'maths_avg_score',
'gps_avg_score': 'gps_avg_score',
# Context
'disadvantaged_pct': 'disadvantaged_pct',
'eal_pct': 'eal_pct',
'sen_support_pct': 'sen_support_pct',
'sen_ehcp_pct': 'sen_ehcp_pct',
'stability_pct': 'stability_pct',
# Absence
'reading_absence_pct': 'reading_absence_pct',
'gps_absence_pct': 'gps_absence_pct',
'maths_absence_pct': 'maths_absence_pct',
'writing_absence_pct': 'writing_absence_pct',
'science_absence_pct': 'science_absence_pct',
# Gender
'rwm_expected_boys_pct': 'rwm_expected_boys_pct',
'rwm_expected_girls_pct': 'rwm_expected_girls_pct',
'rwm_high_boys_pct': 'rwm_high_boys_pct',
'rwm_high_girls_pct': 'rwm_high_girls_pct',
# Disadvantaged
'rwm_expected_disadvantaged_pct': 'rwm_expected_disadvantaged_pct',
'rwm_expected_non_disadvantaged_pct': 'rwm_expected_non_disadvantaged_pct',
'disadvantaged_gap': 'disadvantaged_gap',
# 3-Year
'rwm_expected_3yr_pct': 'rwm_expected_3yr_pct',
'reading_avg_3yr': 'reading_avg_3yr',
'maths_avg_3yr': 'maths_avg_3yr',
}

View File

@@ -120,12 +120,12 @@ with DAG(
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ───────────
# ── Annual DAG (EES: KS2, KS4, Census, Admissions) ───────────────────
with DAG(
dag_id="school_data_annual_ees",
default_args=default_args,
description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)",
description="Annual EES data extraction (KS2, KS4, Census, Admissions)",
schedule=None, # Triggered manually when new releases are published
start_date=datetime(2025, 1, 1),
catchup=False,
@@ -140,7 +140,7 @@ with DAG(
dbt_build_ees = BashOperator(
task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_phonics+",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+",
)
sync_typesense_ees = BashOperator(

View File

@@ -1,9 +1,11 @@
"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data.
"""EES Singer tap — extracts KS2, KS4, Census, Admissions data.
Each stream targets a specific CSV file within an EES release ZIP.
The EES data uses 'school_urn' for school-level records and 'z' for
suppressed values. Column names vary by file — schemas declare all
columns needed by downstream dbt staging models.
Phonics has no school-level data on EES and is not included.
"""
from __future__ import annotations
@@ -38,12 +40,17 @@ def download_release_zip(release_id: str) -> zipfile.ZipFile:
class EESDatasetStream(Stream):
"""Base stream for an EES dataset extracted from a release ZIP."""
"""Base stream for an EES dataset extracted from a release ZIP.
Subclasses set _target_filename to a keyword that appears in the
target CSV path inside the ZIP (substring match, not exact).
"""
replication_key = None
_publication_slug: str = ""
_target_filename: str = "" # exact filename within the ZIP
_target_filename: str = "" # keyword that appears in the CSV path
_urn_column: str = "school_urn" # column name for URN in the CSV
_encoding: str = "utf-8" # CSV file encoding (some DfE files use latin-1)
def get_records(self, context):
import pandas as pd
@@ -56,17 +63,17 @@ class EESDatasetStream(Stream):
)
zf = download_release_zip(release_id)
# Find the target file
# Find the target file (substring match)
all_files = zf.namelist()
target = None
for name in all_files:
if name.endswith(self._target_filename):
if self._target_filename in name and name.endswith(".csv"):
target = name
break
if not target:
self.logger.error(
"File '%s' not found in ZIP. Available: %s",
"File matching '%s' not found in ZIP. Available: %s",
self._target_filename,
[n for n in all_files if n.endswith(".csv")],
)
@@ -74,7 +81,7 @@ class EESDatasetStream(Stream):
self.logger.info("Reading %s from ZIP", target)
with zf.open(target) as f:
df = pd.read_csv(f, dtype=str, keep_default_na=False)
df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding)
# Filter to school-level data if the column exists
if "geographic_level" in df.columns:
@@ -96,7 +103,7 @@ class EESKS2AttainmentStream(EESDatasetStream):
name = "ees_ks2_attainment"
primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"]
_publication_slug = "key-stage-2-attainment"
_target_filename = "ks2_school_attainment_data.csv"
_target_filename = "ks2_school_attainment_data"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
@@ -126,7 +133,7 @@ class EESKS2InfoStream(EESDatasetStream):
name = "ees_ks2_info"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-2-attainment"
_target_filename = "ks2_school_information_data.csv"
_target_filename = "ks2_school_information_data"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
@@ -150,60 +157,174 @@ class EESKS2InfoStream(EESDatasetStream):
).to_dict()
# ── KS4 Attainment ──────────────────────────────────────────────────────────
# ── KS4 Performance (long format: one row per school × breakdown × sex) ─────
# File: 202425_performance_tables_schools_revised.csv (156 cols)
# Dimensions: breakdown_topic, breakdown, sex, disadvantage_status, etc.
# Metrics are already in separate columns (attainment8_average, progress8_average, etc.)
class EESKS4Stream(EESDatasetStream):
name = "ees_ks4"
primary_keys = ["school_urn", "time_period"]
class EESKS4PerformanceStream(EESDatasetStream):
name = "ees_ks4_performance"
primary_keys = ["school_urn", "time_period", "breakdown_topic", "breakdown", "sex"]
_publication_slug = "key-stage-4-performance"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
_target_filename = "performance_tables_schools"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("establishment_type_group", th.StringType),
th.Property("breakdown_topic", th.StringType, required=True),
th.Property("breakdown", th.StringType, required=True),
th.Property("sex", th.StringType, required=True),
th.Property("disadvantage_status", th.StringType),
th.Property("first_language", th.StringType),
th.Property("prior_attainment", th.StringType),
th.Property("mobility", th.StringType),
# Pupil counts
th.Property("pupil_count", th.StringType),
th.Property("pupil_percent", th.StringType),
# Attainment 8
th.Property("attainment8_sum", th.StringType),
th.Property("attainment8_average", th.StringType),
# English & Maths
th.Property("engmath_entering_total", th.StringType),
th.Property("engmath_entering_percent", th.StringType),
th.Property("engmath_95_total", th.StringType),
th.Property("engmath_95_percent", th.StringType),
th.Property("engmath_94_total", th.StringType),
th.Property("engmath_94_percent", th.StringType),
# EBacc
th.Property("ebacc_entering_total", th.StringType),
th.Property("ebacc_entering_percent", th.StringType),
th.Property("ebacc_95_total", th.StringType),
th.Property("ebacc_95_percent", th.StringType),
th.Property("ebacc_94_total", th.StringType),
th.Property("ebacc_94_percent", th.StringType),
th.Property("ebacc_aps_sum", th.StringType),
th.Property("ebacc_aps_average", th.StringType),
# Progress 8
th.Property("progress8_pupil_count", th.StringType),
th.Property("progress8_sum", th.StringType),
th.Property("progress8_average", th.StringType),
th.Property("progress8_lower_95_ci", th.StringType),
th.Property("progress8_upper_95_ci", th.StringType),
# Progress 8 elements
th.Property("progress8eng_average", th.StringType),
th.Property("progress8mat_average", th.StringType),
th.Property("progress8ebacc_average", th.StringType),
th.Property("progress8open_average", th.StringType),
# GCSE grades
th.Property("gcse_91_total", th.StringType),
th.Property("gcse_91_percent", th.StringType),
# EBacc subject entry/achievement
th.Property("ebacceng_entering_percent", th.StringType),
th.Property("ebaccmat_entering_percent", th.StringType),
th.Property("ebaccsci_entering_percent", th.StringType),
th.Property("ebacchum_entering_percent", th.StringType),
th.Property("ebacclan_entering_percent", th.StringType),
).to_dict()
# ── KS4 Information (wide format: one row per school, context/demographics) ──
# File: 202425_information_about_schools_provisional.csv (38 cols)
class EESKS4InfoStream(EESDatasetStream):
name = "ees_ks4_info"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-4-performance"
_target_filename = "information_about_schools"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("establishment_type_group", th.StringType),
th.Property("reldenom", th.StringType),
th.Property("admpol_pt", th.StringType),
th.Property("egender", th.StringType),
th.Property("agerange", th.StringType),
th.Property("allks_pupil_count", th.StringType),
th.Property("allks_boys_count", th.StringType),
th.Property("allks_girls_count", th.StringType),
th.Property("endks4_pupil_count", th.StringType),
th.Property("ks2_scaledscore_average", th.StringType),
th.Property("sen_with_ehcp_pupil_percent", th.StringType),
th.Property("sen_pupil_percent", th.StringType),
th.Property("sen_no_ehcp_pupil_percent", th.StringType),
th.Property("attainment8_diffn", th.StringType),
th.Property("progress8_diffn", th.StringType),
th.Property("progress8_banding", th.StringType),
).to_dict()
# ── Census (school-level pupil characteristics) ─────────────────────────────
# File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/)
# Uses 'urn' not 'school_urn'. Filename has yearly suffix that changes.
class EESCensusStream(EESDatasetStream):
name = "ees_census"
primary_keys = ["urn", "time_period"]
primary_keys = ["school_urn", "time_period"]
_publication_slug = "school-pupils-and-their-characteristics"
_target_filename = "spc_school_level_underlying_data_2025.csv"
_target_filename = "spc_school_level_underlying_data"
_urn_column = "urn"
_encoding = "latin-1"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("urn", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_name", th.StringType),
th.Property("laestab", th.StringType),
th.Property("phase_type_grouping", th.StringType),
# TODO: Add data columns (ethnicity %, FSM %, SEN %, etc.) once
# actual column names are verified on the container. The CSV has
# 269 columns — only the first 30 (metadata) have been inspected.
).to_dict()
# ── Admissions ───────────────────────────────────────────────────────────────
# File: AppsandOffers_YYYY_SchoolLevelDDMMYYYY.csv (37 cols, in supporting-files/)
# Wide format, no geographic_level column. Uses school_urn.
class EESAdmissionsStream(EESDatasetStream):
name = "ees_admissions"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "primary-and-secondary-school-applications-and-offers"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
_target_filename = "SchoolLevel"
_encoding = "latin-1"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_name", th.StringType),
th.Property("school_laestab_as_used", th.StringType),
th.Property("school_phase", th.StringType),
th.Property("entry_year", th.StringType),
# Places and offers
th.Property("total_number_places_offered", th.StringType),
th.Property("number_preferred_offers", th.StringType),
th.Property("number_1st_preference_offers", th.StringType),
th.Property("number_2nd_preference_offers", th.StringType),
th.Property("number_3rd_preference_offers", th.StringType),
# Applications
th.Property("times_put_as_any_preferred_school", th.StringType),
th.Property("times_put_as_1st_preference", th.StringType),
th.Property("times_put_as_2nd_preference", th.StringType),
th.Property("times_put_as_3rd_preference", th.StringType),
# Proportions
th.Property("proportion_1stprefs_v_1stprefoffers", th.StringType),
th.Property("proportion_1stprefs_v_totaloffers", th.StringType),
# Cross-LA
th.Property("all_applications_from_another_LA", th.StringType),
th.Property("offers_to_applicants_from_another_LA", th.StringType),
# Context
th.Property("establishment_type", th.StringType),
th.Property("denomination", th.StringType),
th.Property("FSM_eligible_percent", th.StringType),
th.Property("admissions_policy", th.StringType),
th.Property("urban_rural", th.StringType),
).to_dict()
# ── Phonics ──────────────────────────────────────────────────────────────────
class EESPhonicsStream(EESDatasetStream):
name = "ees_phonics"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "phonics-screening-check-attainment"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
).to_dict()
# Note: Phonics (phonics-screening-check-attainment) has NO school-level data
# on EES. Only national and LA-level files are published.
class TapUKEES(Tap):
@@ -219,10 +340,10 @@ class TapUKEES(Tap):
return [
EESKS2AttainmentStream(self),
EESKS2InfoStream(self),
EESKS4Stream(self),
EESKS4PerformanceStream(self),
EESKS4InfoStream(self),
EESCensusStream(self),
EESAdmissionsStream(self),
EESPhonicsStream(self),
]

View File

@@ -4,16 +4,14 @@ with current_ks4 as (
select
urn as current_urn,
urn as source_urn,
year,
total_pupils,
progress_8_score,
year, total_pupils, eligible_pupils, prior_attainment_avg,
attainment_8_score,
ebacc_entry_pct,
ebacc_achievement_pct,
english_strong_pass_pct,
maths_strong_pass_pct,
english_maths_strong_pass_pct,
staying_in_education_pct
progress_8_score, progress_8_lower_ci, progress_8_upper_ci,
progress_8_english, progress_8_maths, progress_8_ebacc, progress_8_open,
english_maths_strong_pass_pct, english_maths_standard_pass_pct,
ebacc_entry_pct, ebacc_strong_pass_pct, ebacc_standard_pass_pct, ebacc_avg_score,
gcse_grade_91_pct,
sen_pct, sen_ehcp_pct, sen_support_pct
from {{ ref('stg_ees_ks4') }}
),
@@ -21,16 +19,14 @@ predecessor_ks4 as (
select
lin.current_urn,
ks4.urn as source_urn,
ks4.year,
ks4.total_pupils,
ks4.progress_8_score,
ks4.year, ks4.total_pupils, ks4.eligible_pupils, ks4.prior_attainment_avg,
ks4.attainment_8_score,
ks4.ebacc_entry_pct,
ks4.ebacc_achievement_pct,
ks4.english_strong_pass_pct,
ks4.maths_strong_pass_pct,
ks4.english_maths_strong_pass_pct,
ks4.staying_in_education_pct
ks4.progress_8_score, ks4.progress_8_lower_ci, ks4.progress_8_upper_ci,
ks4.progress_8_english, ks4.progress_8_maths, ks4.progress_8_ebacc, ks4.progress_8_open,
ks4.english_maths_strong_pass_pct, ks4.english_maths_standard_pass_pct,
ks4.ebacc_entry_pct, ks4.ebacc_strong_pass_pct, ks4.ebacc_standard_pass_pct, ks4.ebacc_avg_score,
ks4.gcse_grade_91_pct,
ks4.sen_pct, ks4.sen_ehcp_pct, ks4.sen_support_pct
from {{ ref('stg_ees_ks4') }} ks4
inner join {{ ref('int_school_lineage') }} lin
on ks4.urn = lin.predecessor_urn

View File

@@ -1,18 +1,8 @@
-- Intermediate model: Merged pupil characteristics from census data
-- TODO: Expand once census data columns are verified and added to stg_ees_census
select
urn,
year,
fsm_pct,
sen_support_pct,
sen_ehcp_pct,
eal_pct,
disadvantaged_pct,
ethnicity_white_pct,
ethnicity_asian_pct,
ethnicity_black_pct,
ethnicity_mixed_pct,
ethnicity_other_pct,
class_size_avg,
stability_pct
phase_type_grouping
from {{ ref('stg_ees_census') }}

View File

@@ -88,14 +88,6 @@ models:
- name: year
tests: [not_null]
- name: fact_phonics
description: Phonics screening results — one row per URN per year
columns:
- name: urn
tests: [not_null]
- name: year
tests: [not_null]
- name: fact_parent_view
description: Parent View survey responses
columns:

View File

@@ -3,8 +3,12 @@
select
urn,
year,
school_phase,
published_admission_number,
total_applications,
first_preference_offers_pct,
oversubscribed
first_preference_applications,
first_preference_offers,
first_preference_offer_pct,
oversubscribed,
admissions_policy
from {{ ref('stg_ees_admissions') }}

View File

@@ -1,16 +1,42 @@
-- Mart: KS4 performance fact table — one row per URN per year
-- Includes predecessor data via lineage resolution
select
current_urn as urn,
source_urn,
year,
total_pupils,
progress_8_score,
eligible_pupils,
prior_attainment_avg,
-- Attainment 8
attainment_8_score,
ebacc_entry_pct,
ebacc_achievement_pct,
english_strong_pass_pct,
maths_strong_pass_pct,
-- Progress 8
progress_8_score,
progress_8_lower_ci,
progress_8_upper_ci,
progress_8_english,
progress_8_maths,
progress_8_ebacc,
progress_8_open,
-- English & Maths
english_maths_strong_pass_pct,
staying_in_education_pct
english_maths_standard_pass_pct,
-- EBacc
ebacc_entry_pct,
ebacc_strong_pass_pct,
ebacc_standard_pass_pct,
ebacc_avg_score,
-- GCSE
gcse_grade_91_pct,
-- Context
sen_pct,
sen_ehcp_pct,
sen_support_pct
from {{ ref('int_ks4_with_lineage') }}

View File

@@ -1,8 +0,0 @@
-- Mart: Phonics screening results — one row per URN per year
select
urn,
year,
year1_phonics_pct,
year2_phonics_pct
from {{ ref('stg_ees_phonics') }}

View File

@@ -1,18 +1,8 @@
-- Mart: Pupil characteristics — one row per URN per year
-- TODO: Expand once census data columns are verified and added to staging
select
urn,
year,
fsm_pct,
sen_support_pct,
sen_ehcp_pct,
eal_pct,
disadvantaged_pct,
ethnicity_white_pct,
ethnicity_asian_pct,
ethnicity_black_pct,
ethnicity_mixed_pct,
ethnicity_other_pct,
class_size_avg,
stability_pct
phase_type_grouping
from {{ ref('int_pupil_chars_merged') }}

View File

@@ -30,8 +30,11 @@ sources:
- name: ees_ks2_info
description: KS2 school information (wide format — context/demographics per school)
- name: ees_ks4
description: KS4 attainment data from Explore Education Statistics
- name: ees_ks4_performance
description: KS4 performance tables (long format — one row per school × breakdown × sex)
- name: ees_ks4_info
description: KS4 school information (wide format — context/demographics per school)
- name: ees_census
description: School census pupil characteristics
@@ -39,8 +42,7 @@ sources:
- name: ees_admissions
description: Primary and secondary school admissions data
- name: ees_phonics
description: Phonics screening check results
# Phonics: no school-level data on EES (only national/LA level)
- name: parent_view
description: Ofsted Parent View survey responses

View File

@@ -1,19 +1,48 @@
-- Staging model: Primary and secondary school admissions from EES
-- Wide format, one row per school per year. No geographic_level column.
-- File is in supporting-files/ subdirectory of the release ZIP.
with source as (
select * from {{ source('raw', 'ees_admissions') }}
where school_urn is not null
),
renamed as (
select
cast(urn as integer) as urn,
cast(time_period as integer) as year,
cast(published_admission_number as integer) as published_admission_number,
cast(total_applications as integer) as total_applications,
cast(first_preference_offers_pct as numeric) as first_preference_offers_pct,
cast(oversubscribed as boolean) as oversubscribed
cast(school_urn as integer) as urn,
cast(time_period as integer) as year,
school_phase,
entry_year,
-- Places and offers
cast(nullif(total_number_places_offered, 'z') as integer) as published_admission_number,
cast(nullif(number_preferred_offers, 'z') as integer) as total_offers,
cast(nullif(number_1st_preference_offers, 'z') as integer) as first_preference_offers,
cast(nullif(number_2nd_preference_offers, 'z') as integer) as second_preference_offers,
cast(nullif(number_3rd_preference_offers, 'z') as integer) as third_preference_offers,
-- Applications
cast(nullif(times_put_as_any_preferred_school, 'z') as integer) as total_applications,
cast(nullif(times_put_as_1st_preference, 'z') as integer) as first_preference_applications,
-- Proportions
cast(nullif(proportion_1stprefs_v_totaloffers, 'z') as numeric) as first_preference_offer_pct,
-- Derived: oversubscribed if applications > places
case
when nullif(times_put_as_1st_preference, 'z') is not null
and nullif(total_number_places_offered, 'z') is not null
and cast(times_put_as_1st_preference as integer)
> cast(total_number_places_offered as integer)
then true
else false
end as oversubscribed,
-- Context
admissions_policy,
nullif(FSM_eligible_percent, 'z') as fsm_eligible_pct
from source
where urn is not null
)
select * from renamed

View File

@@ -1,27 +1,30 @@
-- Staging model: School census pupil characteristics from EES
-- File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/)
-- Uses 'urn' column (not school_urn). Tap normalises to school_urn.
--
-- TODO: The CSV has 269 columns but only metadata columns have been verified.
-- Data columns (ethnicity %, FSM %, SEN %, class sizes) need to be discovered
-- by inspecting the CSV on the Airflow container. The column references below
-- are placeholders and will fail until the tap schema and this model are updated
-- with the actual column names.
with source as (
select * from {{ source('raw', 'ees_census') }}
where school_urn is not null
),
renamed as (
select
cast(urn as integer) as urn,
cast(time_period as integer) as year,
cast(fsm_pct as numeric) as fsm_pct,
cast(sen_support_pct as numeric) as sen_support_pct,
cast(sen_ehcp_pct as numeric) as sen_ehcp_pct,
cast(eal_pct as numeric) as eal_pct,
cast(disadvantaged_pct as numeric) as disadvantaged_pct,
cast(ethnicity_white_pct as numeric) as ethnicity_white_pct,
cast(ethnicity_asian_pct as numeric) as ethnicity_asian_pct,
cast(ethnicity_black_pct as numeric) as ethnicity_black_pct,
cast(ethnicity_mixed_pct as numeric) as ethnicity_mixed_pct,
cast(ethnicity_other_pct as numeric) as ethnicity_other_pct,
cast(class_size_avg as numeric) as class_size_avg,
cast(stability_pct as numeric) as stability_pct
cast(school_urn as integer) as urn,
cast(time_period as integer) as year,
school_name,
phase_type_grouping
-- TODO: Add census data columns once verified:
-- fsm_pct, sen_support_pct, sen_ehcp_pct, eal_pct,
-- disadvantaged_pct, ethnicity_white_pct, ethnicity_asian_pct,
-- ethnicity_black_pct, ethnicity_mixed_pct, ethnicity_other_pct,
-- class_size_avg, stability_pct
from source
where urn is not null
)
select * from renamed

View File

@@ -1,24 +1,102 @@
-- Staging model: KS4 attainment data from EES (secondary schools — NEW)
-- Staging model: KS4 attainment data from EES
-- KS4 performance data is long-format with breakdown dimensions (breakdown_topic,
-- breakdown, sex). Unlike KS2 which has a subject dimension, KS4 metrics are
-- already in separate columns — we just filter to the 'All pupils' breakdown.
-- EES uses 'z' for suppressed values — cast to null via nullif.
with source as (
select * from {{ source('raw', 'ees_ks4') }}
with performance as (
select * from {{ source('raw', 'ees_ks4_performance') }}
where school_urn is not null
),
renamed as (
-- Filter to all-pupils totals (one row per school per year)
all_pupils as (
select
cast(urn as integer) as urn,
cast(time_period as integer) as year,
cast(t_pupils as integer) as total_pupils,
cast(progress_8_score as numeric) as progress_8_score,
cast(attainment_8_score as numeric) as attainment_8_score,
cast(ebacc_entry_pct as numeric) as ebacc_entry_pct,
cast(ebacc_achievement_pct as numeric) as ebacc_achievement_pct,
cast(english_strong_pass_pct as numeric) as english_strong_pass_pct,
cast(maths_strong_pass_pct as numeric) as maths_strong_pass_pct,
cast(english_maths_strong_pass_pct as numeric) as english_maths_strong_pass_pct,
cast(staying_in_education_pct as numeric) as staying_in_education_pct
from source
where urn is not null
cast(school_urn as integer) as urn,
cast(time_period as integer) as year,
cast(nullif(pupil_count, 'z') as integer) as total_pupils,
-- Attainment 8
cast(nullif(attainment8_average, 'z') as numeric) as attainment_8_score,
-- Progress 8
cast(nullif(progress8_average, 'z') as numeric) as progress_8_score,
cast(nullif(progress8_lower_95_ci, 'z') as numeric) as progress_8_lower_ci,
cast(nullif(progress8_upper_95_ci, 'z') as numeric) as progress_8_upper_ci,
cast(nullif(progress8eng_average, 'z') as numeric) as progress_8_english,
cast(nullif(progress8mat_average, 'z') as numeric) as progress_8_maths,
cast(nullif(progress8ebacc_average, 'z') as numeric) as progress_8_ebacc,
cast(nullif(progress8open_average, 'z') as numeric) as progress_8_open,
-- English & Maths pass rates
cast(nullif(engmath_95_percent, 'z') as numeric) as english_maths_strong_pass_pct,
cast(nullif(engmath_94_percent, 'z') as numeric) as english_maths_standard_pass_pct,
-- EBacc
cast(nullif(ebacc_entering_percent, 'z') as numeric) as ebacc_entry_pct,
cast(nullif(ebacc_95_percent, 'z') as numeric) as ebacc_strong_pass_pct,
cast(nullif(ebacc_94_percent, 'z') as numeric) as ebacc_standard_pass_pct,
cast(nullif(ebacc_aps_average, 'z') as numeric) as ebacc_avg_score,
-- GCSE grade 9-1
cast(nullif(gcse_91_percent, 'z') as numeric) as gcse_grade_91_pct
from performance
where breakdown_topic = 'All pupils'
and breakdown = 'Total'
and sex = 'Total'
),
-- KS4 info table for context/demographics
info as (
select
cast(school_urn as integer) as urn,
cast(time_period as integer) as year,
cast(nullif(endks4_pupil_count, 'z') as integer) as eligible_pupils,
cast(nullif(ks2_scaledscore_average, 'z') as numeric) as prior_attainment_avg,
cast(nullif(sen_pupil_percent, 'z') as numeric) as sen_pct,
cast(nullif(sen_with_ehcp_pupil_percent, 'z') as numeric) as sen_ehcp_pct,
cast(nullif(sen_no_ehcp_pupil_percent, 'z') as numeric) as sen_support_pct
from {{ source('raw', 'ees_ks4_info') }}
where school_urn is not null
)
select * from renamed
select
p.urn,
p.year,
p.total_pupils,
i.eligible_pupils,
i.prior_attainment_avg,
-- Attainment 8
p.attainment_8_score,
-- Progress 8
p.progress_8_score,
p.progress_8_lower_ci,
p.progress_8_upper_ci,
p.progress_8_english,
p.progress_8_maths,
p.progress_8_ebacc,
p.progress_8_open,
-- English & Maths
p.english_maths_strong_pass_pct,
p.english_maths_standard_pass_pct,
-- EBacc
p.ebacc_entry_pct,
p.ebacc_strong_pass_pct,
p.ebacc_standard_pass_pct,
p.ebacc_avg_score,
-- GCSE
p.gcse_grade_91_pct,
-- Context
i.sen_pct,
i.sen_ehcp_pct,
i.sen_support_pct
from all_pupils p
left join info i on p.urn = i.urn and p.year = i.year

View File

@@ -1,17 +0,0 @@
-- Staging model: Phonics screening check results from EES
with source as (
select * from {{ source('raw', 'ees_phonics') }}
),
renamed as (
select
cast(urn as integer) as urn,
cast(time_period as integer) as year,
cast(year1_phonics_pct as numeric) as year1_phonics_pct,
cast(year2_phonics_pct as numeric) as year2_phonics_pct
from source
where urn is not null
)
select * from renamed