Compare commits

...

2 Commits

Author SHA1 Message Date
f4f0257447 fix(ees-tap): add latin-1 encoding for census/admissions, default utf-8 for others
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 52s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Integrator (push) Successful in 55s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m40s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
DfE supporting-files CSVs (spc_school_level_underlying_data, AppsandOffers
SchoolLevel) are Latin-1 encoded. Add _encoding class attribute to base
stream class and override to 'latin-1' for census and admissions streams.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 09:41:40 +00:00
ca351e9d73 feat: migrate backend to marts schema, update EES tap for verified datasets
Pipeline:
- EES tap: split KS4 into performance + info streams, fix admissions filename
  (SchoolLevel keyword match), fix census filename (yearly suffix), remove
  phonics (no school-level data on EES), change endswith → in for matching
- stg_ees_ks4: rewrite to filter long-format data and extract Attainment 8,
  Progress 8, EBacc, English/Maths metrics; join KS4 info for context
- stg_ees_admissions: map real CSV columns (total_number_places_offered, etc.)
- stg_ees_census: update source reference, stub with TODO for data columns
- Remove stg_ees_phonics, fact_phonics (no school-level EES data)
- Add ees_ks4_performance + ees_ks4_info sources, remove ees_ks4 + ees_phonics
- Update int_ks4_with_lineage + fact_ks4_performance with new KS4 columns
- Annual EES DAG: remove stg_ees_phonics+ from selector

Backend:
- models.py: replace all models to point at marts.* tables with schema='marts'
  (DimSchool, DimLocation, KS2Performance, FactOfstedInspection, etc.)
- data_loader.py: rewrite load_school_data_as_dataframe() using raw SQL joining
  dim_school + dim_location + fact_ks2_performance; update get_supplementary_data()
- database.py: remove migration machinery, keep only connection setup
- app.py: remove check_and_migrate_if_needed, remove /api/admin/reimport-ks2
  endpoints (pipeline handles all imports)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 09:29:27 +00:00
18 changed files with 809 additions and 1246 deletions

View File

@@ -28,8 +28,6 @@ from .data_loader import (
get_supplementary_data, get_supplementary_data,
) )
from .data_loader import get_data_info as get_db_info from .data_loader import get_data_info as get_db_info
from .database import check_and_migrate_if_needed
from .migration import run_full_migration
from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS
from .utils import clean_for_json from .utils import clean_for_json
@@ -138,20 +136,15 @@ def validate_postcode(postcode: Optional[str]) -> Optional[str]:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Application lifespan - startup and shutdown events.""" """Application lifespan - startup and shutdown events."""
# Startup: check schema version and migrate if needed print("Loading school data from marts...")
print("Starting up: Checking database schema...")
check_and_migrate_if_needed()
print("Loading school data from database...")
df = load_school_data() df = load_school_data()
if df.empty: if df.empty:
print("Warning: No data in database. Check CSV files in data/ folder.") print("Warning: No data in marts. Run the annual EES pipeline to populate KS2 data.")
else: else:
print(f"Data loaded successfully: {len(df)} records.") print(f"Data loaded successfully: {len(df)} records.")
yield # Application runs here yield
# Shutdown: cleanup if needed
print("Shutting down...") print("Shutting down...")
@@ -585,7 +578,7 @@ async def get_data_info(request: Request):
if db_info["total_schools"] == 0: if db_info["total_schools"] == 0:
return { return {
"status": "no_data", "status": "no_data",
"message": "No data in database. Run the migration script: python scripts/migrate_csv_to_db.py", "message": "No data in marts. Run the annual EES pipeline to load KS2 data.",
"data_source": "PostgreSQL", "data_source": "PostgreSQL",
} }
@@ -635,56 +628,6 @@ async def reload_data(
return {"status": "reloaded"} return {"status": "reloaded"}
_reimport_status: dict = {"running": False, "done": False, "error": None}
@app.post("/api/admin/reimport-ks2")
@limiter.limit("2/minute")
async def reimport_ks2(
request: Request,
geocode: bool = True,
_: bool = Depends(verify_admin_api_key)
):
"""
Start a full KS2 CSV migration in the background and return immediately.
Poll GET /api/admin/reimport-ks2/status to check progress.
Pass ?geocode=false to skip postcode → lat/lng resolution.
Requires X-API-Key header with valid admin API key.
"""
global _reimport_status
if _reimport_status["running"]:
return {"status": "already_running"}
_reimport_status = {"running": True, "done": False, "error": None}
def _run():
global _reimport_status
try:
success = run_full_migration(geocode=geocode)
if not success:
_reimport_status = {"running": False, "done": False, "error": "No CSV data found"}
return
clear_cache()
load_school_data()
_reimport_status = {"running": False, "done": True, "error": None}
except Exception as exc:
_reimport_status = {"running": False, "done": False, "error": str(exc)}
import threading
threading.Thread(target=_run, daemon=True).start()
return {"status": "started"}
@app.get("/api/admin/reimport-ks2/status")
async def reimport_ks2_status(
request: Request,
_: bool = Depends(verify_admin_api_key)
):
"""Poll this endpoint to check reimport progress."""
s = _reimport_status
if s["error"]:
raise HTTPException(status_code=500, detail=s["error"])
return {"running": s["running"], "done": s["done"]}
# ============================================================================= # =============================================================================

View File

@@ -1,29 +1,24 @@
""" """
Data loading module that queries from PostgreSQL database. Data loading module — reads from marts.* tables built by dbt.
Provides efficient queries with caching and lazy loading. Provides efficient queries with caching.
Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py).
Only user search postcodes are geocoded on-demand via geocode_single_postcode().
""" """
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from functools import lru_cache
from typing import Optional, Dict, Tuple, List from typing import Optional, Dict, Tuple, List
import requests import requests
from sqlalchemy import select, func, and_, or_ from sqlalchemy import text
from sqlalchemy.orm import joinedload, Session from sqlalchemy.orm import Session
from .config import settings from .config import settings
from .database import SessionLocal, get_db_session from .database import SessionLocal, engine
from .models import ( from .models import (
School, SchoolResult, DimSchool, DimLocation, KS2Performance,
OfstedInspection, OfstedParentView, SchoolCensus, FactOfstedInspection, FactParentView, FactAdmissions,
SchoolAdmissions, SenDetail, Phonics, SchoolDeprivation, SchoolFinance, FactDeprivation, FactFinance,
) )
from .schemas import SCHOOL_TYPE_MAP from .schemas import SCHOOL_TYPE_MAP
# Cache for user search postcode geocoding (not for school data)
_postcode_cache: Dict[str, Tuple[float, float]] = {} _postcode_cache: Dict[str, Tuple[float, float]] = {}
@@ -31,515 +26,165 @@ def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
"""Convert cryptic school type codes to user-friendly names.""" """Convert cryptic school type codes to user-friendly names."""
if not school_type: if not school_type:
return None return None
# Check if it's a code that needs mapping
code = school_type.strip().upper() code = school_type.strip().upper()
if code in SCHOOL_TYPE_MAP: if code in SCHOOL_TYPE_MAP:
return SCHOOL_TYPE_MAP[code] return SCHOOL_TYPE_MAP[code]
# Return original if already a friendly name or unknown code
return school_type return school_type
def get_school_type_codes_for_filter(school_type: str) -> List[str]:
"""Get all database codes that map to a given friendly name."""
if not school_type:
return []
school_type_lower = school_type.lower()
# Collect all codes that map to this friendly name
codes = []
for code, friendly_name in SCHOOL_TYPE_MAP.items():
if friendly_name.lower() == school_type_lower:
codes.append(code.lower())
# Also include the school_type itself (case-insensitive) in case it's stored as-is
codes.append(school_type_lower)
return codes
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API.""" """Geocode a single postcode using postcodes.io API."""
if not postcode: if not postcode:
return None return None
postcode = postcode.strip().upper() postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache: if postcode in _postcode_cache:
return _postcode_cache[postcode] return _postcode_cache[postcode]
try: try:
response = requests.get( response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}', f"https://api.postcodes.io/postcodes/{postcode}",
timeout=10 timeout=10,
) )
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
if data.get('result'): if data.get("result"):
lat = data['result'].get('latitude') lat = data["result"].get("latitude")
lon = data['result'].get('longitude') lon = data["result"].get("longitude")
if lat and lon: if lat and lon:
_postcode_cache[postcode] = (lat, lon) _postcode_cache[postcode] = (lat, lon)
return (lat, lon) return (lat, lon)
except Exception: except Exception:
pass pass
return None return None
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
""" """Calculate great-circle distance between two points (miles)."""
Calculate the great circle distance between two points on Earth (in miles).
"""
from math import radians, cos, sin, asin, sqrt from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1 dlat = lat2 - lat1
dlon = lon2 - lon1 dlon = lon2 - lon1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
c = 2 * asin(sqrt(a)) return 2 * asin(sqrt(a)) * 3956
# Earth's radius in miles
r = 3956
return c * r
# ============================================================================= # =============================================================================
# DATABASE QUERY FUNCTIONS # MAIN DATA LOAD — joins dim_school + dim_location + fact_ks2_performance
# ============================================================================= # =============================================================================
def get_db(): _MAIN_QUERY = text("""
"""Get a database session.""" SELECT
return SessionLocal() s.urn,
s.school_name,
s.phase,
s.school_type,
s.academy_trust_name AS trust_name,
s.academy_trust_uid AS trust_uid,
s.religious_character AS religious_denomination,
s.gender,
s.age_range,
s.capacity,
s.headteacher_name,
s.website,
s.ofsted_grade,
s.ofsted_date,
s.ofsted_framework,
l.local_authority_name AS local_authority,
l.local_authority_code,
l.address_line1 AS address1,
l.address_line2 AS address2,
l.town,
l.postcode,
l.latitude,
l.longitude,
-- KS2 performance
k.year,
k.source_urn,
k.total_pupils,
k.eligible_pupils,
k.rwm_expected_pct,
k.rwm_high_pct,
k.reading_expected_pct,
k.reading_high_pct,
k.reading_avg_score,
k.reading_progress,
k.writing_expected_pct,
k.writing_high_pct,
k.writing_progress,
k.maths_expected_pct,
k.maths_high_pct,
k.maths_avg_score,
k.maths_progress,
k.gps_expected_pct,
k.gps_high_pct,
k.gps_avg_score,
k.science_expected_pct,
k.reading_absence_pct,
k.writing_absence_pct,
k.maths_absence_pct,
k.gps_absence_pct,
k.science_absence_pct,
k.rwm_expected_boys_pct,
k.rwm_high_boys_pct,
k.rwm_expected_girls_pct,
k.rwm_high_girls_pct,
k.rwm_expected_disadvantaged_pct,
k.rwm_expected_non_disadvantaged_pct,
k.disadvantaged_gap,
k.disadvantaged_pct,
k.eal_pct,
k.sen_support_pct,
k.sen_ehcp_pct,
k.stability_pct
FROM marts.dim_school s
JOIN marts.dim_location l ON s.urn = l.urn
JOIN marts.fact_ks2_performance k ON s.urn = k.urn
ORDER BY s.school_name, k.year
""")
def get_available_years(db: Session = None) -> List[int]: def load_school_data_as_dataframe() -> pd.DataFrame:
"""Get list of available years in the database.""" """Load all school + KS2 data as a pandas DataFrame."""
close_db = db is None
if db is None:
db = get_db()
try: try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all() df = pd.read_sql(_MAIN_QUERY, engine)
return [r[0] for r in result] except Exception as exc:
finally: print(f"Warning: Could not load school data from marts: {exc}")
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types (normalized to user-friendly names)."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.all()
# Normalize codes to friendly names and deduplicate
normalized = set()
for r in result:
if r[0]:
friendly_name = normalize_school_type(r[0])
if friendly_name:
normalized.add(friendly_name)
return sorted(normalized)
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
"""
Get paginated list of schools with optional filters.
Returns (schools, total_count).
"""
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
return schools, total
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
"""
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
"""
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get all matching schools and calculate distances
all_schools = query.all()
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
"""
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
"""
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
return query.limit(limit).all()
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
}
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
"""
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
"""
close_db = db is None
if db is None:
db = get_db()
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
# Load Ofsted data into a lookup dict (urn → grade, date)
ofsted_lookup: Dict[int, dict] = {}
try:
ofsted_rows = db.query(
OfstedInspection.urn,
OfstedInspection.overall_effectiveness,
OfstedInspection.inspection_date,
).all()
for o in ofsted_rows:
ofsted_lookup[o.urn] = {
"ofsted_grade": o.overall_effectiveness,
"ofsted_date": o.inspection_date.isoformat() if o.inspection_date else None,
}
except Exception:
pass # Table may not exist yet on first run
rows = []
for school in schools:
ofsted = ofsted_lookup.get(school.urn, {})
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
# Ofsted (for list view)
"ofsted_grade": ofsted.get("ofsted_grade"),
"ofsted_date": ofsted.get("ofsted_date"),
**result_to_dict(result)
}
rows.append(row)
if rows:
return pd.DataFrame(rows)
return pd.DataFrame() return pd.DataFrame()
finally:
if close_db: if df.empty:
db.close() return df
# Build address string
df["address"] = df.apply(
lambda r: ", ".join(
p for p in [r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode")]
if p and str(p) != "None"
),
axis=1,
)
# Normalize school type
df["school_type"] = df["school_type"].apply(normalize_school_type)
return df
# Cache for DataFrame (legacy compatibility) # Cache for DataFrame
_df_cache: Optional[pd.DataFrame] = None _df_cache: Optional[pd.DataFrame] = None
def load_school_data() -> pd.DataFrame: def load_school_data() -> pd.DataFrame:
""" """Load school data with caching."""
Legacy function to load school data as DataFrame.
Uses caching for performance.
"""
global _df_cache global _df_cache
if _df_cache is not None: if _df_cache is not None:
return _df_cache return _df_cache
print("Loading school data from marts...")
print("Loading school data from database...")
_df_cache = load_school_data_as_dataframe() _df_cache = load_school_data_as_dataframe()
if not _df_cache.empty: if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}") print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}") print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}") print(f"Years: {sorted(_df_cache['year'].unique())}")
else: else:
print("No data found in database") print("No data found in marts (EES data may not have been loaded yet)")
return _df_cache return _df_cache
@@ -549,44 +194,108 @@ def clear_cache():
_df_cache = None _df_cache = None
# =============================================================================
# METADATA QUERIES
# =============================================================================
def get_available_years(db: Session = None) -> List[int]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = db.query(KS2Performance.year).distinct().order_by(KS2Performance.year).all()
return [r[0] for r in result]
except Exception:
return []
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = (
db.query(DimLocation.local_authority_name)
.filter(DimLocation.local_authority_name.isnot(None))
.distinct()
.order_by(DimLocation.local_authority_name)
.all()
)
return [r[0] for r in result if r[0]]
except Exception:
return []
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
close_db = db is None
if db is None:
db = SessionLocal()
try:
return db.query(DimSchool).count()
except Exception:
return 0
finally:
if close_db:
db.close()
def get_data_info(db: Session = None) -> dict:
close_db = db is None
if db is None:
db = SessionLocal()
try:
school_count = get_schools_count(db)
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL (marts)",
}
finally:
if close_db:
db.close()
# =============================================================================
# SUPPLEMENTARY DATA — per-school detail page
# =============================================================================
def get_supplementary_data(db: Session, urn: int) -> dict: def get_supplementary_data(db: Session, urn: int) -> dict:
""" """Fetch all supplementary data for a single school URN."""
Fetch all supplementary data for a single school URN.
Returns a dict with keys: ofsted, parent_view, census, admissions, sen_detail,
phonics, deprivation, finance. Values are dicts or None.
"""
result = {} result = {}
def safe_query(model, pk_field, latest_year_field=None): def safe_query(model, pk_field, latest_field=None):
try: try:
if latest_year_field: q = db.query(model).filter(getattr(model, pk_field) == urn)
row = ( if latest_field:
db.query(model) q = q.order_by(getattr(model, latest_field).desc())
.filter(getattr(model, pk_field) == urn) return q.first()
.order_by(getattr(model, latest_year_field).desc())
.first()
)
else:
row = db.query(model).filter(getattr(model, pk_field) == urn).first()
return row
except Exception: except Exception:
return None return None
# Ofsted inspection # Latest Ofsted inspection
o = safe_query(OfstedInspection, "urn") o = safe_query(FactOfstedInspection, "urn", "inspection_date")
result["ofsted"] = { result["ofsted"] = (
{
"framework": o.framework, "framework": o.framework,
"inspection_date": o.inspection_date.isoformat() if o.inspection_date else None, "inspection_date": o.inspection_date.isoformat() if o.inspection_date else None,
"inspection_type": o.inspection_type, "inspection_type": o.inspection_type,
# OEIF fields (old framework)
"overall_effectiveness": o.overall_effectiveness, "overall_effectiveness": o.overall_effectiveness,
"quality_of_education": o.quality_of_education, "quality_of_education": o.quality_of_education,
"behaviour_attitudes": o.behaviour_attitudes, "behaviour_attitudes": o.behaviour_attitudes,
"personal_development": o.personal_development, "personal_development": o.personal_development,
"leadership_management": o.leadership_management, "leadership_management": o.leadership_management,
"early_years_provision": o.early_years_provision, "early_years_provision": o.early_years_provision,
"previous_overall": o.previous_overall, "sixth_form_provision": o.sixth_form_provision,
# Report Card fields (new framework, from Nov 2025) "previous_overall": None, # Not available in new schema
"rc_safeguarding_met": o.rc_safeguarding_met, "rc_safeguarding_met": o.rc_safeguarding_met,
"rc_inclusion": o.rc_inclusion, "rc_inclusion": o.rc_inclusion,
"rc_curriculum_teaching": o.rc_curriculum_teaching, "rc_curriculum_teaching": o.rc_curriculum_teaching,
@@ -596,11 +305,16 @@ def get_supplementary_data(db: Session, urn: int) -> dict:
"rc_leadership_governance": o.rc_leadership_governance, "rc_leadership_governance": o.rc_leadership_governance,
"rc_early_years": o.rc_early_years, "rc_early_years": o.rc_early_years,
"rc_sixth_form": o.rc_sixth_form, "rc_sixth_form": o.rc_sixth_form,
} if o else None "report_url": o.report_url,
}
if o
else None
)
# Parent View # Parent View
pv = safe_query(OfstedParentView, "urn") pv = safe_query(FactParentView, "urn")
result["parent_view"] = { result["parent_view"] = (
{
"survey_date": pv.survey_date.isoformat() if pv.survey_date else None, "survey_date": pv.survey_date.isoformat() if pv.survey_date else None,
"total_responses": pv.total_responses, "total_responses": pv.total_responses,
"q_happy_pct": pv.q_happy_pct, "q_happy_pct": pv.q_happy_pct,
@@ -616,69 +330,62 @@ def get_supplementary_data(db: Session, urn: int) -> dict:
"q_leadership_pct": pv.q_leadership_pct, "q_leadership_pct": pv.q_leadership_pct,
"q_wellbeing_pct": pv.q_wellbeing_pct, "q_wellbeing_pct": pv.q_wellbeing_pct,
"q_recommend_pct": pv.q_recommend_pct, "q_recommend_pct": pv.q_recommend_pct,
"q_sen_pct": pv.q_sen_pct, }
} if pv else None if pv
else None
)
# School Census (latest year) # Census (fact_pupil_characteristics — minimal until census columns are verified)
c = safe_query(SchoolCensus, "urn", "year") result["census"] = None
result["census"] = {
"year": c.year,
"class_size_avg": c.class_size_avg,
"ethnicity_white_pct": c.ethnicity_white_pct,
"ethnicity_asian_pct": c.ethnicity_asian_pct,
"ethnicity_black_pct": c.ethnicity_black_pct,
"ethnicity_mixed_pct": c.ethnicity_mixed_pct,
"ethnicity_other_pct": c.ethnicity_other_pct,
} if c else None
# Admissions (latest year) # Admissions (latest year)
a = safe_query(SchoolAdmissions, "urn", "year") a = safe_query(FactAdmissions, "urn", "year")
result["admissions"] = { result["admissions"] = (
{
"year": a.year, "year": a.year,
"school_phase": a.school_phase,
"published_admission_number": a.published_admission_number, "published_admission_number": a.published_admission_number,
"total_applications": a.total_applications, "total_applications": a.total_applications,
"first_preference_offers_pct": a.first_preference_offers_pct, "first_preference_applications": a.first_preference_applications,
"first_preference_offers": a.first_preference_offers,
"first_preference_offer_pct": a.first_preference_offer_pct,
"oversubscribed": a.oversubscribed, "oversubscribed": a.oversubscribed,
} if a else None }
if a
else None
)
# SEN Detail (latest year) # SEN detail — not available in current marts
s = safe_query(SenDetail, "urn", "year") result["sen_detail"] = None
result["sen_detail"] = {
"year": s.year,
"primary_need_speech_pct": s.primary_need_speech_pct,
"primary_need_autism_pct": s.primary_need_autism_pct,
"primary_need_mld_pct": s.primary_need_mld_pct,
"primary_need_spld_pct": s.primary_need_spld_pct,
"primary_need_semh_pct": s.primary_need_semh_pct,
"primary_need_physical_pct": s.primary_need_physical_pct,
"primary_need_other_pct": s.primary_need_other_pct,
} if s else None
# Phonics (latest year) # Phonics — no school-level data on EES
ph = safe_query(Phonics, "urn", "year") result["phonics"] = None
result["phonics"] = {
"year": ph.year,
"year1_phonics_pct": ph.year1_phonics_pct,
"year2_phonics_pct": ph.year2_phonics_pct,
} if ph else None
# Deprivation # Deprivation
d = safe_query(SchoolDeprivation, "urn") d = safe_query(FactDeprivation, "urn")
result["deprivation"] = { result["deprivation"] = (
{
"lsoa_code": d.lsoa_code, "lsoa_code": d.lsoa_code,
"idaci_score": d.idaci_score, "idaci_score": d.idaci_score,
"idaci_decile": d.idaci_decile, "idaci_decile": d.idaci_decile,
} if d else None }
if d
else None
)
# Finance (latest year) # Finance (latest year)
f = safe_query(SchoolFinance, "urn", "year") f = safe_query(FactFinance, "urn", "year")
result["finance"] = { result["finance"] = (
{
"year": f.year, "year": f.year,
"per_pupil_spend": f.per_pupil_spend, "per_pupil_spend": f.per_pupil_spend,
"staff_cost_pct": f.staff_cost_pct, "staff_cost_pct": f.staff_cost_pct,
"teacher_cost_pct": f.teacher_cost_pct, "teacher_cost_pct": f.teacher_cost_pct,
"support_staff_cost_pct": f.support_staff_cost_pct, "support_staff_cost_pct": f.support_staff_cost_pct,
"premises_cost_pct": f.premises_cost_pct, "premises_cost_pct": f.premises_cost_pct,
} if f else None }
if f
else None
)
return result return result

View File

@@ -1,36 +1,30 @@
""" """
Database connection setup using SQLAlchemy. Database connection setup using SQLAlchemy.
The schema is managed by dbt — the backend only reads from marts.* tables.
""" """
from datetime import datetime
from typing import Optional
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker, declarative_base
from contextlib import contextmanager from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from .config import settings from .config import settings
# Create engine
engine = create_engine( engine = create_engine(
settings.database_url, settings.database_url,
pool_size=10, pool_size=10,
max_overflow=20, max_overflow=20,
pool_pre_ping=True, # Verify connections before use pool_pre_ping=True,
echo=False, # Set to True for SQL debugging echo=False,
) )
# Session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base class for models
Base = declarative_base() Base = declarative_base()
def get_db(): def get_db():
""" """Dependency for FastAPI routes."""
Dependency for FastAPI routes to get a database session.
"""
db = SessionLocal() db = SessionLocal()
try: try:
yield db yield db
@@ -40,10 +34,7 @@ def get_db():
@contextmanager @contextmanager
def get_db_session(): def get_db_session():
""" """Context manager for non-FastAPI contexts."""
Context manager for database sessions.
Use in non-FastAPI contexts (scripts, etc).
"""
db = SessionLocal() db = SessionLocal()
try: try:
yield db yield db
@@ -53,95 +44,3 @@ def get_db_session():
raise raise
finally: finally:
db.close() db.close()
def init_db():
"""
Initialize database - create all tables.
"""
Base.metadata.create_all(bind=engine)
def drop_db():
"""
Drop all tables - use with caution!
"""
Base.metadata.drop_all(bind=engine)
def get_db_schema_version() -> Optional[int]:
"""
Get the current schema version from the database.
Returns None if table doesn't exist or no version is set.
"""
from .models import SchemaVersion # Import here to avoid circular imports
# Check if schema_version table exists
inspector = inspect(engine)
if "schema_version" not in inspector.get_table_names():
return None
try:
with get_db_session() as db:
row = db.query(SchemaVersion).first()
return row.version if row else None
except Exception:
return None
def set_db_schema_version(version: int):
"""
Set/update the schema version in the database.
Creates the row if it doesn't exist.
"""
from .models import SchemaVersion
with get_db_session() as db:
row = db.query(SchemaVersion).first()
if row:
row.version = version
row.migrated_at = datetime.utcnow()
else:
db.add(SchemaVersion(id=1, version=version, migrated_at=datetime.utcnow()))
def check_and_migrate_if_needed():
"""
Check schema version and run migration if needed.
Called during application startup.
"""
from .version import SCHEMA_VERSION
from .migration import run_full_migration
db_version = get_db_schema_version()
if db_version == SCHEMA_VERSION:
print(f"Schema version {SCHEMA_VERSION} matches. Fast startup.")
# Still ensure tables exist (they should if version matches)
init_db()
return
if db_version is None:
print(f"No schema version found. Running initial migration (v{SCHEMA_VERSION})...")
else:
print(f"Schema mismatch: DB has v{db_version}, code expects v{SCHEMA_VERSION}")
print("Running full migration...")
try:
# Set schema version BEFORE migration so a crash mid-migration
# doesn't cause an infinite re-migration loop on every restart.
init_db()
set_db_schema_version(SCHEMA_VERSION)
success = run_full_migration(geocode=False)
if success:
print(f"Migration complete. Schema version {SCHEMA_VERSION}.")
else:
print("Warning: Migration completed but no data was imported.")
except Exception as e:
print(f"FATAL: Migration failed: {e}")
print("Application cannot start. Please check database and CSV files.")
raise

View File

@@ -1,408 +1,216 @@
""" """
SQLAlchemy database models for school data. SQLAlchemy models — all tables live in the marts schema, built by dbt.
Normalized schema with separate tables for schools and yearly results. Read-only: the pipeline writes to these tables; the backend only reads.
""" """
from datetime import datetime from sqlalchemy import Column, Integer, String, Float, Boolean, Date, Text, Index
from sqlalchemy import (
Column, Integer, String, Float, ForeignKey, Index, UniqueConstraint,
Text, Boolean, DateTime, Date
)
from sqlalchemy.orm import relationship
from .database import Base from .database import Base
MARTS = {"schema": "marts"}
class School(Base):
"""
Core school information - relatively static data.
"""
__tablename__ = "schools"
id = Column(Integer, primary_key=True, autoincrement=True) class DimSchool(Base):
urn = Column(Integer, unique=True, nullable=False, index=True) """Canonical school dimension — one row per active URN."""
__tablename__ = "dim_school"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
school_name = Column(String(255), nullable=False) school_name = Column(String(255), nullable=False)
local_authority = Column(String(100)) phase = Column(String(100))
local_authority_code = Column(Integer)
school_type = Column(String(100)) school_type = Column(String(100))
school_type_code = Column(String(10)) academy_trust_name = Column(String(255))
religious_denomination = Column(String(100)) academy_trust_uid = Column(String(20))
religious_character = Column(String(100))
gender = Column(String(20))
age_range = Column(String(20)) age_range = Column(String(20))
capacity = Column(Integer)
total_pupils = Column(Integer)
headteacher_name = Column(String(200))
website = Column(String(255))
telephone = Column(String(30))
status = Column(String(50))
nursery_provision = Column(Boolean)
admissions_policy = Column(String(50))
# Denormalised Ofsted summary (updated by monthly pipeline)
ofsted_grade = Column(Integer)
ofsted_date = Column(Date)
ofsted_framework = Column(String(20))
# Address
address1 = Column(String(255)) class DimLocation(Base):
address2 = Column(String(255)) """School location — address, lat/lng from easting/northing (BNG→WGS84)."""
__tablename__ = "dim_location"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
address_line1 = Column(String(255))
address_line2 = Column(String(255))
town = Column(String(100)) town = Column(String(100))
postcode = Column(String(20), index=True) county = Column(String(100))
postcode = Column(String(20))
# Geocoding (cached) local_authority_code = Column(Integer)
local_authority_name = Column(String(100))
parliamentary_constituency = Column(String(100))
urban_rural = Column(String(50))
easting = Column(Integer)
northing = Column(Integer)
latitude = Column(Float) latitude = Column(Float)
longitude = Column(Float) longitude = Column(Float)
# geom is a PostGIS geometry — not mapped to SQLAlchemy (accessed via raw SQL)
# GIAS enrichment fields
website = Column(String(255))
headteacher_name = Column(String(200))
capacity = Column(Integer)
trust_name = Column(String(255))
trust_uid = Column(String(20))
gender = Column(String(20)) # Mixed / Girls / Boys
nursery_provision = Column(Boolean)
# Relationships
results = relationship("SchoolResult", back_populates="school", cascade="all, delete-orphan")
def __repr__(self):
return f"<School(urn={self.urn}, name='{self.school_name}')>"
@property
def address(self):
"""Combine address fields into single string."""
parts = [self.address1, self.address2, self.town, self.postcode]
return ", ".join(p for p in parts if p)
class SchoolResult(Base): class KS2Performance(Base):
""" """KS2 attainment — one row per URN per year (includes predecessor data)."""
Yearly KS2 results for a school. __tablename__ = "fact_ks2_performance"
Each school can have multiple years of results. __table_args__ = (
""" Index("ix_ks2_urn_year", "urn", "year"),
__tablename__ = "school_results" MARTS,
)
id = Column(Integer, primary_key=True, autoincrement=True) urn = Column(Integer, primary_key=True)
school_id = Column(Integer, ForeignKey("schools.id", ondelete="CASCADE"), nullable=False) year = Column(Integer, primary_key=True)
year = Column(Integer, nullable=False, index=True) source_urn = Column(Integer)
# Pupil numbers
total_pupils = Column(Integer) total_pupils = Column(Integer)
eligible_pupils = Column(Integer) eligible_pupils = Column(Integer)
# Core attainment
# Core KS2 metrics - Expected Standard
rwm_expected_pct = Column(Float) rwm_expected_pct = Column(Float)
reading_expected_pct = Column(Float)
writing_expected_pct = Column(Float)
maths_expected_pct = Column(Float)
gps_expected_pct = Column(Float)
science_expected_pct = Column(Float)
# Higher Standard
rwm_high_pct = Column(Float) rwm_high_pct = Column(Float)
reading_expected_pct = Column(Float)
reading_high_pct = Column(Float) reading_high_pct = Column(Float)
writing_high_pct = Column(Float)
maths_high_pct = Column(Float)
gps_high_pct = Column(Float)
# Progress Scores
reading_progress = Column(Float)
writing_progress = Column(Float)
maths_progress = Column(Float)
# Average Scores
reading_avg_score = Column(Float) reading_avg_score = Column(Float)
reading_progress = Column(Float)
writing_expected_pct = Column(Float)
writing_high_pct = Column(Float)
writing_progress = Column(Float)
maths_expected_pct = Column(Float)
maths_high_pct = Column(Float)
maths_avg_score = Column(Float) maths_avg_score = Column(Float)
maths_progress = Column(Float)
gps_expected_pct = Column(Float)
gps_high_pct = Column(Float)
gps_avg_score = Column(Float) gps_avg_score = Column(Float)
science_expected_pct = Column(Float)
# School Context # Absence
reading_absence_pct = Column(Float)
writing_absence_pct = Column(Float)
maths_absence_pct = Column(Float)
gps_absence_pct = Column(Float)
science_absence_pct = Column(Float)
# Gender
rwm_expected_boys_pct = Column(Float)
rwm_high_boys_pct = Column(Float)
rwm_expected_girls_pct = Column(Float)
rwm_high_girls_pct = Column(Float)
# Disadvantaged
rwm_expected_disadvantaged_pct = Column(Float)
rwm_expected_non_disadvantaged_pct = Column(Float)
disadvantaged_gap = Column(Float)
# Context
disadvantaged_pct = Column(Float) disadvantaged_pct = Column(Float)
eal_pct = Column(Float) eal_pct = Column(Float)
sen_support_pct = Column(Float) sen_support_pct = Column(Float)
sen_ehcp_pct = Column(Float) sen_ehcp_pct = Column(Float)
stability_pct = Column(Float) stability_pct = Column(Float)
# Pupil Absence from Tests
reading_absence_pct = Column(Float)
gps_absence_pct = Column(Float)
maths_absence_pct = Column(Float)
writing_absence_pct = Column(Float)
science_absence_pct = Column(Float)
# Gender Breakdown class FactOfstedInspection(Base):
rwm_expected_boys_pct = Column(Float) """Full Ofsted inspection history — one row per inspection."""
rwm_expected_girls_pct = Column(Float) __tablename__ = "fact_ofsted_inspection"
rwm_high_boys_pct = Column(Float)
rwm_high_girls_pct = Column(Float)
# Disadvantaged Performance
rwm_expected_disadvantaged_pct = Column(Float)
rwm_expected_non_disadvantaged_pct = Column(Float)
disadvantaged_gap = Column(Float)
# 3-Year Averages
rwm_expected_3yr_pct = Column(Float)
reading_avg_3yr = Column(Float)
maths_avg_3yr = Column(Float)
# Relationship
school = relationship("School", back_populates="results")
# Constraints
__table_args__ = ( __table_args__ = (
UniqueConstraint('school_id', 'year', name='uq_school_year'), Index("ix_ofsted_urn_date", "urn", "inspection_date"),
Index('ix_school_results_school_year', 'school_id', 'year'), MARTS,
) )
def __repr__(self):
return f"<SchoolResult(school_id={self.school_id}, year={self.year})>"
class SchemaVersion(Base):
"""
Tracks database schema version for automatic migrations.
Single-row table that stores the current schema version.
"""
__tablename__ = "schema_version"
id = Column(Integer, primary_key=True, default=1)
version = Column(Integer, nullable=False)
migrated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<SchemaVersion(version={self.version}, migrated_at={self.migrated_at})>"
# ---------------------------------------------------------------------------
# Supplementary data tables (populated by the Kestra data integrator)
# ---------------------------------------------------------------------------
class OfstedInspection(Base):
"""Latest Ofsted inspection judgement per school."""
__tablename__ = "ofsted_inspections"
urn = Column(Integer, primary_key=True) urn = Column(Integer, primary_key=True)
inspection_date = Column(Date) inspection_date = Column(Date, primary_key=True)
publication_date = Column(Date) inspection_type = Column(String(100))
inspection_type = Column(String(100)) # Section 5 / Section 8 etc.
# Which inspection framework was used: 'OEIF' or 'ReportCard'
framework = Column(String(20)) framework = Column(String(20))
# --- OEIF grades (old framework, pre-Nov 2025) ---
# 1=Outstanding 2=Good 3=Requires improvement 4=Inadequate
overall_effectiveness = Column(Integer) overall_effectiveness = Column(Integer)
quality_of_education = Column(Integer) quality_of_education = Column(Integer)
behaviour_attitudes = Column(Integer) behaviour_attitudes = Column(Integer)
personal_development = Column(Integer) personal_development = Column(Integer)
leadership_management = Column(Integer) leadership_management = Column(Integer)
early_years_provision = Column(Integer) # nullable — not all schools early_years_provision = Column(Integer)
previous_overall = Column(Integer) # for trend display sixth_form_provision = Column(Integer)
rc_safeguarding_met = Column(Boolean)
# --- Report Card grades (new framework, from Nov 2025) ---
# 1=Exceptional 2=Strong 3=Expected standard 4=Needs attention 5=Urgent improvement
rc_safeguarding_met = Column(Boolean) # True=Met, False=Not met
rc_inclusion = Column(Integer) rc_inclusion = Column(Integer)
rc_curriculum_teaching = Column(Integer) rc_curriculum_teaching = Column(Integer)
rc_achievement = Column(Integer) rc_achievement = Column(Integer)
rc_attendance_behaviour = Column(Integer) rc_attendance_behaviour = Column(Integer)
rc_personal_development = Column(Integer) rc_personal_development = Column(Integer)
rc_leadership_governance = Column(Integer) rc_leadership_governance = Column(Integer)
rc_early_years = Column(Integer) # nullable — not all schools rc_early_years = Column(Integer)
rc_sixth_form = Column(Integer) # nullable — secondary only rc_sixth_form = Column(Integer)
report_url = Column(Text)
def __repr__(self):
return f"<OfstedInspection(urn={self.urn}, framework={self.framework}, overall={self.overall_effectiveness})>"
class OfstedParentView(Base): class FactParentView(Base):
"""Ofsted Parent View survey — latest per school. 14 questions, % saying Yes.""" """Ofsted Parent View survey — latest per school."""
__tablename__ = "ofsted_parent_view" __tablename__ = "fact_parent_view"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True) urn = Column(Integer, primary_key=True)
survey_date = Column(Date) survey_date = Column(Date)
total_responses = Column(Integer) total_responses = Column(Integer)
q_happy_pct = Column(Float) # My child is happy at this school q_happy_pct = Column(Float)
q_safe_pct = Column(Float) # My child feels safe at this school q_safe_pct = Column(Float)
q_bullying_pct = Column(Float) # School deals with bullying well q_behaviour_pct = Column(Float)
q_communication_pct = Column(Float) # School keeps me informed q_bullying_pct = Column(Float)
q_progress_pct = Column(Float) # My child does well / good progress q_communication_pct = Column(Float)
q_teaching_pct = Column(Float) # Teaching is good q_progress_pct = Column(Float)
q_information_pct = Column(Float) # I receive valuable info about progress q_teaching_pct = Column(Float)
q_curriculum_pct = Column(Float) # Broad range of subjects taught q_information_pct = Column(Float)
q_future_pct = Column(Float) # Prepares child well for the future q_curriculum_pct = Column(Float)
q_leadership_pct = Column(Float) # Led and managed effectively q_future_pct = Column(Float)
q_wellbeing_pct = Column(Float) # Supports wider personal development q_leadership_pct = Column(Float)
q_behaviour_pct = Column(Float) # Pupils are well behaved q_wellbeing_pct = Column(Float)
q_recommend_pct = Column(Float) # I would recommend this school q_recommend_pct = Column(Float)
q_sen_pct = Column(Float) # Good information about child's SEN (where applicable)
def __repr__(self):
return f"<OfstedParentView(urn={self.urn}, responses={self.total_responses})>"
class SchoolCensus(Base): class FactAdmissions(Base):
"""Annual school census snapshot — class sizes and ethnicity breakdown.""" """School admissions — one row per URN per year."""
__tablename__ = "school_census" __tablename__ = "fact_admissions"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
class_size_avg = Column(Float)
ethnicity_white_pct = Column(Float)
ethnicity_asian_pct = Column(Float)
ethnicity_black_pct = Column(Float)
ethnicity_mixed_pct = Column(Float)
ethnicity_other_pct = Column(Float)
__table_args__ = ( __table_args__ = (
Index('ix_school_census_urn_year', 'urn', 'year'), Index("ix_admissions_urn_year", "urn", "year"),
MARTS,
) )
def __repr__(self):
return f"<SchoolCensus(urn={self.urn}, year={self.year})>"
class SchoolAdmissions(Base):
"""Annual admissions statistics per school."""
__tablename__ = "school_admissions"
urn = Column(Integer, primary_key=True) urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True)
published_admission_number = Column(Integer) # PAN school_phase = Column(String(50))
published_admission_number = Column(Integer)
total_applications = Column(Integer) total_applications = Column(Integer)
first_preference_offers_pct = Column(Float) # % receiving 1st choice first_preference_applications = Column(Integer)
first_preference_offers = Column(Integer)
first_preference_offer_pct = Column(Float)
oversubscribed = Column(Boolean) oversubscribed = Column(Boolean)
admissions_policy = Column(String(100))
__table_args__ = (
Index('ix_school_admissions_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SchoolAdmissions(urn={self.urn}, year={self.year})>"
class SenDetail(Base): class FactDeprivation(Base):
"""SEN primary need type breakdown — more granular than school_results context fields.""" """IDACI deprivation index — one row per URN."""
__tablename__ = "sen_detail" __tablename__ = "fact_deprivation"
__table_args__ = MARTS
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
primary_need_speech_pct = Column(Float) # SLCN
primary_need_autism_pct = Column(Float) # ASD
primary_need_mld_pct = Column(Float) # Moderate learning difficulty
primary_need_spld_pct = Column(Float) # Specific learning difficulty (dyslexia etc.)
primary_need_semh_pct = Column(Float) # Social, emotional, mental health
primary_need_physical_pct = Column(Float) # Physical/sensory
primary_need_other_pct = Column(Float)
__table_args__ = (
Index('ix_sen_detail_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SenDetail(urn={self.urn}, year={self.year})>"
class Phonics(Base):
"""Phonics Screening Check pass rates."""
__tablename__ = "phonics"
urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
year1_phonics_pct = Column(Float) # % reaching expected standard in Year 1
year2_phonics_pct = Column(Float) # % reaching standard in Year 2 (re-takers)
__table_args__ = (
Index('ix_phonics_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<Phonics(urn={self.urn}, year={self.year})>"
class SchoolDeprivation(Base):
"""IDACI deprivation index — derived via postcode → LSOA lookup."""
__tablename__ = "school_deprivation"
urn = Column(Integer, primary_key=True) urn = Column(Integer, primary_key=True)
lsoa_code = Column(String(20)) lsoa_code = Column(String(20))
idaci_score = Column(Float) # 01, higher = more deprived idaci_score = Column(Float)
idaci_decile = Column(Integer) # 1 = most deprived, 10 = least deprived idaci_decile = Column(Integer)
def __repr__(self):
return f"<SchoolDeprivation(urn={self.urn}, decile={self.idaci_decile})>"
class SchoolFinance(Base): class FactFinance(Base):
"""FBIT financial benchmarking data.""" """FBIT financial benchmarking — one row per URN per year."""
__tablename__ = "school_finance" __tablename__ = "fact_finance"
__table_args__ = (
Index("ix_finance_urn_year", "urn", "year"),
MARTS,
)
urn = Column(Integer, primary_key=True) urn = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True)
per_pupil_spend = Column(Float) # £ total expenditure per pupil per_pupil_spend = Column(Float)
staff_cost_pct = Column(Float) # % of budget on all staff staff_cost_pct = Column(Float)
teacher_cost_pct = Column(Float) # % on teachers specifically teacher_cost_pct = Column(Float)
support_staff_cost_pct = Column(Float) support_staff_cost_pct = Column(Float)
premises_cost_pct = Column(Float) premises_cost_pct = Column(Float)
__table_args__ = (
Index('ix_school_finance_urn_year', 'urn', 'year'),
)
def __repr__(self):
return f"<SchoolFinance(urn={self.urn}, year={self.year})>"
# Mapping from CSV columns to model fields
SCHOOL_FIELD_MAPPING = {
'urn': 'urn',
'school_name': 'school_name',
'local_authority': 'local_authority',
'local_authority_code': 'local_authority_code',
'school_type': 'school_type',
'school_type_code': 'school_type_code',
'religious_denomination': 'religious_denomination',
'age_range': 'age_range',
'address1': 'address1',
'address2': 'address2',
'town': 'town',
'postcode': 'postcode',
}
RESULT_FIELD_MAPPING = {
'year': 'year',
'total_pupils': 'total_pupils',
'eligible_pupils': 'eligible_pupils',
# Expected Standard
'rwm_expected_pct': 'rwm_expected_pct',
'reading_expected_pct': 'reading_expected_pct',
'writing_expected_pct': 'writing_expected_pct',
'maths_expected_pct': 'maths_expected_pct',
'gps_expected_pct': 'gps_expected_pct',
'science_expected_pct': 'science_expected_pct',
# Higher Standard
'rwm_high_pct': 'rwm_high_pct',
'reading_high_pct': 'reading_high_pct',
'writing_high_pct': 'writing_high_pct',
'maths_high_pct': 'maths_high_pct',
'gps_high_pct': 'gps_high_pct',
# Progress
'reading_progress': 'reading_progress',
'writing_progress': 'writing_progress',
'maths_progress': 'maths_progress',
# Averages
'reading_avg_score': 'reading_avg_score',
'maths_avg_score': 'maths_avg_score',
'gps_avg_score': 'gps_avg_score',
# Context
'disadvantaged_pct': 'disadvantaged_pct',
'eal_pct': 'eal_pct',
'sen_support_pct': 'sen_support_pct',
'sen_ehcp_pct': 'sen_ehcp_pct',
'stability_pct': 'stability_pct',
# Absence
'reading_absence_pct': 'reading_absence_pct',
'gps_absence_pct': 'gps_absence_pct',
'maths_absence_pct': 'maths_absence_pct',
'writing_absence_pct': 'writing_absence_pct',
'science_absence_pct': 'science_absence_pct',
# Gender
'rwm_expected_boys_pct': 'rwm_expected_boys_pct',
'rwm_expected_girls_pct': 'rwm_expected_girls_pct',
'rwm_high_boys_pct': 'rwm_high_boys_pct',
'rwm_high_girls_pct': 'rwm_high_girls_pct',
# Disadvantaged
'rwm_expected_disadvantaged_pct': 'rwm_expected_disadvantaged_pct',
'rwm_expected_non_disadvantaged_pct': 'rwm_expected_non_disadvantaged_pct',
'disadvantaged_gap': 'disadvantaged_gap',
# 3-Year
'rwm_expected_3yr_pct': 'rwm_expected_3yr_pct',
'reading_avg_3yr': 'reading_avg_3yr',
'maths_avg_3yr': 'maths_avg_3yr',
}

View File

@@ -120,12 +120,12 @@ with DAG(
extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted
# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ─────────── # ── Annual DAG (EES: KS2, KS4, Census, Admissions) ───────────────────
with DAG( with DAG(
dag_id="school_data_annual_ees", dag_id="school_data_annual_ees",
default_args=default_args, default_args=default_args,
description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)", description="Annual EES data extraction (KS2, KS4, Census, Admissions)",
schedule=None, # Triggered manually when new releases are published schedule=None, # Triggered manually when new releases are published
start_date=datetime(2025, 1, 1), start_date=datetime(2025, 1, 1),
catchup=False, catchup=False,
@@ -140,7 +140,7 @@ with DAG(
dbt_build_ees = BashOperator( dbt_build_ees = BashOperator(
task_id="dbt_build", task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_phonics+", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+",
) )
sync_typesense_ees = BashOperator( sync_typesense_ees = BashOperator(

View File

@@ -1,9 +1,11 @@
"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data. """EES Singer tap — extracts KS2, KS4, Census, Admissions data.
Each stream targets a specific CSV file within an EES release ZIP. Each stream targets a specific CSV file within an EES release ZIP.
The EES data uses 'school_urn' for school-level records and 'z' for The EES data uses 'school_urn' for school-level records and 'z' for
suppressed values. Column names vary by file — schemas declare all suppressed values. Column names vary by file — schemas declare all
columns needed by downstream dbt staging models. columns needed by downstream dbt staging models.
Phonics has no school-level data on EES and is not included.
""" """
from __future__ import annotations from __future__ import annotations
@@ -38,12 +40,17 @@ def download_release_zip(release_id: str) -> zipfile.ZipFile:
class EESDatasetStream(Stream): class EESDatasetStream(Stream):
"""Base stream for an EES dataset extracted from a release ZIP.""" """Base stream for an EES dataset extracted from a release ZIP.
Subclasses set _target_filename to a keyword that appears in the
target CSV path inside the ZIP (substring match, not exact).
"""
replication_key = None replication_key = None
_publication_slug: str = "" _publication_slug: str = ""
_target_filename: str = "" # exact filename within the ZIP _target_filename: str = "" # keyword that appears in the CSV path
_urn_column: str = "school_urn" # column name for URN in the CSV _urn_column: str = "school_urn" # column name for URN in the CSV
_encoding: str = "utf-8" # CSV file encoding (some DfE files use latin-1)
def get_records(self, context): def get_records(self, context):
import pandas as pd import pandas as pd
@@ -56,17 +63,17 @@ class EESDatasetStream(Stream):
) )
zf = download_release_zip(release_id) zf = download_release_zip(release_id)
# Find the target file # Find the target file (substring match)
all_files = zf.namelist() all_files = zf.namelist()
target = None target = None
for name in all_files: for name in all_files:
if name.endswith(self._target_filename): if self._target_filename in name and name.endswith(".csv"):
target = name target = name
break break
if not target: if not target:
self.logger.error( self.logger.error(
"File '%s' not found in ZIP. Available: %s", "File matching '%s' not found in ZIP. Available: %s",
self._target_filename, self._target_filename,
[n for n in all_files if n.endswith(".csv")], [n for n in all_files if n.endswith(".csv")],
) )
@@ -74,7 +81,7 @@ class EESDatasetStream(Stream):
self.logger.info("Reading %s from ZIP", target) self.logger.info("Reading %s from ZIP", target)
with zf.open(target) as f: with zf.open(target) as f:
df = pd.read_csv(f, dtype=str, keep_default_na=False) df = pd.read_csv(f, dtype=str, keep_default_na=False, encoding=self._encoding)
# Filter to school-level data if the column exists # Filter to school-level data if the column exists
if "geographic_level" in df.columns: if "geographic_level" in df.columns:
@@ -96,7 +103,7 @@ class EESKS2AttainmentStream(EESDatasetStream):
name = "ees_ks2_attainment" name = "ees_ks2_attainment"
primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"] primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"]
_publication_slug = "key-stage-2-attainment" _publication_slug = "key-stage-2-attainment"
_target_filename = "ks2_school_attainment_data.csv" _target_filename = "ks2_school_attainment_data"
schema = th.PropertiesList( schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True), th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True),
@@ -126,7 +133,7 @@ class EESKS2InfoStream(EESDatasetStream):
name = "ees_ks2_info" name = "ees_ks2_info"
primary_keys = ["school_urn", "time_period"] primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-2-attainment" _publication_slug = "key-stage-2-attainment"
_target_filename = "ks2_school_information_data.csv" _target_filename = "ks2_school_information_data"
schema = th.PropertiesList( schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True), th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True),
@@ -150,60 +157,174 @@ class EESKS2InfoStream(EESDatasetStream):
).to_dict() ).to_dict()
# ── KS4 Attainment ────────────────────────────────────────────────────────── # ── KS4 Performance (long format: one row per school × breakdown × sex) ─────
# File: 202425_performance_tables_schools_revised.csv (156 cols)
# Dimensions: breakdown_topic, breakdown, sex, disadvantage_status, etc.
# Metrics are already in separate columns (attainment8_average, progress8_average, etc.)
class EESKS4Stream(EESDatasetStream): class EESKS4PerformanceStream(EESDatasetStream):
name = "ees_ks4" name = "ees_ks4_performance"
primary_keys = ["school_urn", "time_period"] primary_keys = ["school_urn", "time_period", "breakdown_topic", "breakdown", "sex"]
_publication_slug = "key-stage-4-performance" _publication_slug = "key-stage-4-performance"
_target_filename = "school" # Will be refined once we see the actual ZIP contents _target_filename = "performance_tables_schools"
schema = th.PropertiesList( schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True), th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("establishment_type_group", th.StringType),
th.Property("breakdown_topic", th.StringType, required=True),
th.Property("breakdown", th.StringType, required=True),
th.Property("sex", th.StringType, required=True),
th.Property("disadvantage_status", th.StringType),
th.Property("first_language", th.StringType),
th.Property("prior_attainment", th.StringType),
th.Property("mobility", th.StringType),
# Pupil counts
th.Property("pupil_count", th.StringType),
th.Property("pupil_percent", th.StringType),
# Attainment 8
th.Property("attainment8_sum", th.StringType),
th.Property("attainment8_average", th.StringType),
# English & Maths
th.Property("engmath_entering_total", th.StringType),
th.Property("engmath_entering_percent", th.StringType),
th.Property("engmath_95_total", th.StringType),
th.Property("engmath_95_percent", th.StringType),
th.Property("engmath_94_total", th.StringType),
th.Property("engmath_94_percent", th.StringType),
# EBacc
th.Property("ebacc_entering_total", th.StringType),
th.Property("ebacc_entering_percent", th.StringType),
th.Property("ebacc_95_total", th.StringType),
th.Property("ebacc_95_percent", th.StringType),
th.Property("ebacc_94_total", th.StringType),
th.Property("ebacc_94_percent", th.StringType),
th.Property("ebacc_aps_sum", th.StringType),
th.Property("ebacc_aps_average", th.StringType),
# Progress 8
th.Property("progress8_pupil_count", th.StringType),
th.Property("progress8_sum", th.StringType),
th.Property("progress8_average", th.StringType),
th.Property("progress8_lower_95_ci", th.StringType),
th.Property("progress8_upper_95_ci", th.StringType),
# Progress 8 elements
th.Property("progress8eng_average", th.StringType),
th.Property("progress8mat_average", th.StringType),
th.Property("progress8ebacc_average", th.StringType),
th.Property("progress8open_average", th.StringType),
# GCSE grades
th.Property("gcse_91_total", th.StringType),
th.Property("gcse_91_percent", th.StringType),
# EBacc subject entry/achievement
th.Property("ebacceng_entering_percent", th.StringType),
th.Property("ebaccmat_entering_percent", th.StringType),
th.Property("ebaccsci_entering_percent", th.StringType),
th.Property("ebacchum_entering_percent", th.StringType),
th.Property("ebacclan_entering_percent", th.StringType),
).to_dict()
# ── KS4 Information (wide format: one row per school, context/demographics) ──
# File: 202425_information_about_schools_provisional.csv (38 cols)
class EESKS4InfoStream(EESDatasetStream):
name = "ees_ks4_info"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-4-performance"
_target_filename = "information_about_schools"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("establishment_type_group", th.StringType),
th.Property("reldenom", th.StringType),
th.Property("admpol_pt", th.StringType),
th.Property("egender", th.StringType),
th.Property("agerange", th.StringType),
th.Property("allks_pupil_count", th.StringType),
th.Property("allks_boys_count", th.StringType),
th.Property("allks_girls_count", th.StringType),
th.Property("endks4_pupil_count", th.StringType),
th.Property("ks2_scaledscore_average", th.StringType),
th.Property("sen_with_ehcp_pupil_percent", th.StringType),
th.Property("sen_pupil_percent", th.StringType),
th.Property("sen_no_ehcp_pupil_percent", th.StringType),
th.Property("attainment8_diffn", th.StringType),
th.Property("progress8_diffn", th.StringType),
th.Property("progress8_banding", th.StringType),
).to_dict() ).to_dict()
# ── Census (school-level pupil characteristics) ───────────────────────────── # ── Census (school-level pupil characteristics) ─────────────────────────────
# File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/)
# Uses 'urn' not 'school_urn'. Filename has yearly suffix that changes.
class EESCensusStream(EESDatasetStream): class EESCensusStream(EESDatasetStream):
name = "ees_census" name = "ees_census"
primary_keys = ["urn", "time_period"] primary_keys = ["school_urn", "time_period"]
_publication_slug = "school-pupils-and-their-characteristics" _publication_slug = "school-pupils-and-their-characteristics"
_target_filename = "spc_school_level_underlying_data_2025.csv" _target_filename = "spc_school_level_underlying_data"
_urn_column = "urn" _urn_column = "urn"
_encoding = "latin-1"
schema = th.PropertiesList( schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True), th.Property("time_period", th.StringType, required=True),
th.Property("urn", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True),
th.Property("school_name", th.StringType), th.Property("school_name", th.StringType),
th.Property("laestab", th.StringType), th.Property("laestab", th.StringType),
th.Property("phase_type_grouping", th.StringType), th.Property("phase_type_grouping", th.StringType),
# TODO: Add data columns (ethnicity %, FSM %, SEN %, etc.) once
# actual column names are verified on the container. The CSV has
# 269 columns — only the first 30 (metadata) have been inspected.
).to_dict() ).to_dict()
# ── Admissions ─────────────────────────────────────────────────────────────── # ── Admissions ───────────────────────────────────────────────────────────────
# File: AppsandOffers_YYYY_SchoolLevelDDMMYYYY.csv (37 cols, in supporting-files/)
# Wide format, no geographic_level column. Uses school_urn.
class EESAdmissionsStream(EESDatasetStream): class EESAdmissionsStream(EESDatasetStream):
name = "ees_admissions" name = "ees_admissions"
primary_keys = ["school_urn", "time_period"] primary_keys = ["school_urn", "time_period"]
_publication_slug = "primary-and-secondary-school-applications-and-offers" _publication_slug = "primary-and-secondary-school-applications-and-offers"
_target_filename = "school" # Will be refined once we see the actual ZIP contents _target_filename = "SchoolLevel"
_encoding = "latin-1"
schema = th.PropertiesList( schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True), th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True),
th.Property("school_name", th.StringType),
th.Property("school_laestab_as_used", th.StringType),
th.Property("school_phase", th.StringType),
th.Property("entry_year", th.StringType),
# Places and offers
th.Property("total_number_places_offered", th.StringType),
th.Property("number_preferred_offers", th.StringType),
th.Property("number_1st_preference_offers", th.StringType),
th.Property("number_2nd_preference_offers", th.StringType),
th.Property("number_3rd_preference_offers", th.StringType),
# Applications
th.Property("times_put_as_any_preferred_school", th.StringType),
th.Property("times_put_as_1st_preference", th.StringType),
th.Property("times_put_as_2nd_preference", th.StringType),
th.Property("times_put_as_3rd_preference", th.StringType),
# Proportions
th.Property("proportion_1stprefs_v_1stprefoffers", th.StringType),
th.Property("proportion_1stprefs_v_totaloffers", th.StringType),
# Cross-LA
th.Property("all_applications_from_another_LA", th.StringType),
th.Property("offers_to_applicants_from_another_LA", th.StringType),
# Context
th.Property("establishment_type", th.StringType),
th.Property("denomination", th.StringType),
th.Property("FSM_eligible_percent", th.StringType),
th.Property("admissions_policy", th.StringType),
th.Property("urban_rural", th.StringType),
).to_dict() ).to_dict()
# ── Phonics ────────────────────────────────────────────────────────────────── # Note: Phonics (phonics-screening-check-attainment) has NO school-level data
# on EES. Only national and LA-level files are published.
class EESPhonicsStream(EESDatasetStream):
name = "ees_phonics"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "phonics-screening-check-attainment"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
).to_dict()
class TapUKEES(Tap): class TapUKEES(Tap):
@@ -219,10 +340,10 @@ class TapUKEES(Tap):
return [ return [
EESKS2AttainmentStream(self), EESKS2AttainmentStream(self),
EESKS2InfoStream(self), EESKS2InfoStream(self),
EESKS4Stream(self), EESKS4PerformanceStream(self),
EESKS4InfoStream(self),
EESCensusStream(self), EESCensusStream(self),
EESAdmissionsStream(self), EESAdmissionsStream(self),
EESPhonicsStream(self),
] ]

View File

@@ -4,16 +4,14 @@ with current_ks4 as (
select select
urn as current_urn, urn as current_urn,
urn as source_urn, urn as source_urn,
year, year, total_pupils, eligible_pupils, prior_attainment_avg,
total_pupils,
progress_8_score,
attainment_8_score, attainment_8_score,
ebacc_entry_pct, progress_8_score, progress_8_lower_ci, progress_8_upper_ci,
ebacc_achievement_pct, progress_8_english, progress_8_maths, progress_8_ebacc, progress_8_open,
english_strong_pass_pct, english_maths_strong_pass_pct, english_maths_standard_pass_pct,
maths_strong_pass_pct, ebacc_entry_pct, ebacc_strong_pass_pct, ebacc_standard_pass_pct, ebacc_avg_score,
english_maths_strong_pass_pct, gcse_grade_91_pct,
staying_in_education_pct sen_pct, sen_ehcp_pct, sen_support_pct
from {{ ref('stg_ees_ks4') }} from {{ ref('stg_ees_ks4') }}
), ),
@@ -21,16 +19,14 @@ predecessor_ks4 as (
select select
lin.current_urn, lin.current_urn,
ks4.urn as source_urn, ks4.urn as source_urn,
ks4.year, ks4.year, ks4.total_pupils, ks4.eligible_pupils, ks4.prior_attainment_avg,
ks4.total_pupils,
ks4.progress_8_score,
ks4.attainment_8_score, ks4.attainment_8_score,
ks4.ebacc_entry_pct, ks4.progress_8_score, ks4.progress_8_lower_ci, ks4.progress_8_upper_ci,
ks4.ebacc_achievement_pct, ks4.progress_8_english, ks4.progress_8_maths, ks4.progress_8_ebacc, ks4.progress_8_open,
ks4.english_strong_pass_pct, ks4.english_maths_strong_pass_pct, ks4.english_maths_standard_pass_pct,
ks4.maths_strong_pass_pct, ks4.ebacc_entry_pct, ks4.ebacc_strong_pass_pct, ks4.ebacc_standard_pass_pct, ks4.ebacc_avg_score,
ks4.english_maths_strong_pass_pct, ks4.gcse_grade_91_pct,
ks4.staying_in_education_pct ks4.sen_pct, ks4.sen_ehcp_pct, ks4.sen_support_pct
from {{ ref('stg_ees_ks4') }} ks4 from {{ ref('stg_ees_ks4') }} ks4
inner join {{ ref('int_school_lineage') }} lin inner join {{ ref('int_school_lineage') }} lin
on ks4.urn = lin.predecessor_urn on ks4.urn = lin.predecessor_urn

View File

@@ -1,18 +1,8 @@
-- Intermediate model: Merged pupil characteristics from census data -- Intermediate model: Merged pupil characteristics from census data
-- TODO: Expand once census data columns are verified and added to stg_ees_census
select select
urn, urn,
year, year,
fsm_pct, phase_type_grouping
sen_support_pct,
sen_ehcp_pct,
eal_pct,
disadvantaged_pct,
ethnicity_white_pct,
ethnicity_asian_pct,
ethnicity_black_pct,
ethnicity_mixed_pct,
ethnicity_other_pct,
class_size_avg,
stability_pct
from {{ ref('stg_ees_census') }} from {{ ref('stg_ees_census') }}

View File

@@ -88,14 +88,6 @@ models:
- name: year - name: year
tests: [not_null] tests: [not_null]
- name: fact_phonics
description: Phonics screening results — one row per URN per year
columns:
- name: urn
tests: [not_null]
- name: year
tests: [not_null]
- name: fact_parent_view - name: fact_parent_view
description: Parent View survey responses description: Parent View survey responses
columns: columns:

View File

@@ -3,8 +3,12 @@
select select
urn, urn,
year, year,
school_phase,
published_admission_number, published_admission_number,
total_applications, total_applications,
first_preference_offers_pct, first_preference_applications,
oversubscribed first_preference_offers,
first_preference_offer_pct,
oversubscribed,
admissions_policy
from {{ ref('stg_ees_admissions') }} from {{ ref('stg_ees_admissions') }}

View File

@@ -1,16 +1,42 @@
-- Mart: KS4 performance fact table — one row per URN per year -- Mart: KS4 performance fact table — one row per URN per year
-- Includes predecessor data via lineage resolution
select select
current_urn as urn, current_urn as urn,
source_urn, source_urn,
year, year,
total_pupils, total_pupils,
progress_8_score, eligible_pupils,
prior_attainment_avg,
-- Attainment 8
attainment_8_score, attainment_8_score,
ebacc_entry_pct,
ebacc_achievement_pct, -- Progress 8
english_strong_pass_pct, progress_8_score,
maths_strong_pass_pct, progress_8_lower_ci,
progress_8_upper_ci,
progress_8_english,
progress_8_maths,
progress_8_ebacc,
progress_8_open,
-- English & Maths
english_maths_strong_pass_pct, english_maths_strong_pass_pct,
staying_in_education_pct english_maths_standard_pass_pct,
-- EBacc
ebacc_entry_pct,
ebacc_strong_pass_pct,
ebacc_standard_pass_pct,
ebacc_avg_score,
-- GCSE
gcse_grade_91_pct,
-- Context
sen_pct,
sen_ehcp_pct,
sen_support_pct
from {{ ref('int_ks4_with_lineage') }} from {{ ref('int_ks4_with_lineage') }}

View File

@@ -1,8 +0,0 @@
-- Mart: Phonics screening results — one row per URN per year
select
urn,
year,
year1_phonics_pct,
year2_phonics_pct
from {{ ref('stg_ees_phonics') }}

View File

@@ -1,18 +1,8 @@
-- Mart: Pupil characteristics — one row per URN per year -- Mart: Pupil characteristics — one row per URN per year
-- TODO: Expand once census data columns are verified and added to staging
select select
urn, urn,
year, year,
fsm_pct, phase_type_grouping
sen_support_pct,
sen_ehcp_pct,
eal_pct,
disadvantaged_pct,
ethnicity_white_pct,
ethnicity_asian_pct,
ethnicity_black_pct,
ethnicity_mixed_pct,
ethnicity_other_pct,
class_size_avg,
stability_pct
from {{ ref('int_pupil_chars_merged') }} from {{ ref('int_pupil_chars_merged') }}

View File

@@ -30,8 +30,11 @@ sources:
- name: ees_ks2_info - name: ees_ks2_info
description: KS2 school information (wide format — context/demographics per school) description: KS2 school information (wide format — context/demographics per school)
- name: ees_ks4 - name: ees_ks4_performance
description: KS4 attainment data from Explore Education Statistics description: KS4 performance tables (long format — one row per school × breakdown × sex)
- name: ees_ks4_info
description: KS4 school information (wide format — context/demographics per school)
- name: ees_census - name: ees_census
description: School census pupil characteristics description: School census pupil characteristics
@@ -39,8 +42,7 @@ sources:
- name: ees_admissions - name: ees_admissions
description: Primary and secondary school admissions data description: Primary and secondary school admissions data
- name: ees_phonics # Phonics: no school-level data on EES (only national/LA level)
description: Phonics screening check results
- name: parent_view - name: parent_view
description: Ofsted Parent View survey responses description: Ofsted Parent View survey responses

View File

@@ -1,19 +1,48 @@
-- Staging model: Primary and secondary school admissions from EES -- Staging model: Primary and secondary school admissions from EES
-- Wide format, one row per school per year. No geographic_level column.
-- File is in supporting-files/ subdirectory of the release ZIP.
with source as ( with source as (
select * from {{ source('raw', 'ees_admissions') }} select * from {{ source('raw', 'ees_admissions') }}
where school_urn is not null
), ),
renamed as ( renamed as (
select select
cast(urn as integer) as urn, cast(school_urn as integer) as urn,
cast(time_period as integer) as year, cast(time_period as integer) as year,
cast(published_admission_number as integer) as published_admission_number, school_phase,
cast(total_applications as integer) as total_applications, entry_year,
cast(first_preference_offers_pct as numeric) as first_preference_offers_pct,
cast(oversubscribed as boolean) as oversubscribed -- Places and offers
cast(nullif(total_number_places_offered, 'z') as integer) as published_admission_number,
cast(nullif(number_preferred_offers, 'z') as integer) as total_offers,
cast(nullif(number_1st_preference_offers, 'z') as integer) as first_preference_offers,
cast(nullif(number_2nd_preference_offers, 'z') as integer) as second_preference_offers,
cast(nullif(number_3rd_preference_offers, 'z') as integer) as third_preference_offers,
-- Applications
cast(nullif(times_put_as_any_preferred_school, 'z') as integer) as total_applications,
cast(nullif(times_put_as_1st_preference, 'z') as integer) as first_preference_applications,
-- Proportions
cast(nullif(proportion_1stprefs_v_totaloffers, 'z') as numeric) as first_preference_offer_pct,
-- Derived: oversubscribed if applications > places
case
when nullif(times_put_as_1st_preference, 'z') is not null
and nullif(total_number_places_offered, 'z') is not null
and cast(times_put_as_1st_preference as integer)
> cast(total_number_places_offered as integer)
then true
else false
end as oversubscribed,
-- Context
admissions_policy,
nullif(FSM_eligible_percent, 'z') as fsm_eligible_pct
from source from source
where urn is not null
) )
select * from renamed select * from renamed

View File

@@ -1,27 +1,30 @@
-- Staging model: School census pupil characteristics from EES -- Staging model: School census pupil characteristics from EES
-- File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/)
-- Uses 'urn' column (not school_urn). Tap normalises to school_urn.
--
-- TODO: The CSV has 269 columns but only metadata columns have been verified.
-- Data columns (ethnicity %, FSM %, SEN %, class sizes) need to be discovered
-- by inspecting the CSV on the Airflow container. The column references below
-- are placeholders and will fail until the tap schema and this model are updated
-- with the actual column names.
with source as ( with source as (
select * from {{ source('raw', 'ees_census') }} select * from {{ source('raw', 'ees_census') }}
where school_urn is not null
), ),
renamed as ( renamed as (
select select
cast(urn as integer) as urn, cast(school_urn as integer) as urn,
cast(time_period as integer) as year, cast(time_period as integer) as year,
cast(fsm_pct as numeric) as fsm_pct, school_name,
cast(sen_support_pct as numeric) as sen_support_pct, phase_type_grouping
cast(sen_ehcp_pct as numeric) as sen_ehcp_pct, -- TODO: Add census data columns once verified:
cast(eal_pct as numeric) as eal_pct, -- fsm_pct, sen_support_pct, sen_ehcp_pct, eal_pct,
cast(disadvantaged_pct as numeric) as disadvantaged_pct, -- disadvantaged_pct, ethnicity_white_pct, ethnicity_asian_pct,
cast(ethnicity_white_pct as numeric) as ethnicity_white_pct, -- ethnicity_black_pct, ethnicity_mixed_pct, ethnicity_other_pct,
cast(ethnicity_asian_pct as numeric) as ethnicity_asian_pct, -- class_size_avg, stability_pct
cast(ethnicity_black_pct as numeric) as ethnicity_black_pct,
cast(ethnicity_mixed_pct as numeric) as ethnicity_mixed_pct,
cast(ethnicity_other_pct as numeric) as ethnicity_other_pct,
cast(class_size_avg as numeric) as class_size_avg,
cast(stability_pct as numeric) as stability_pct
from source from source
where urn is not null
) )
select * from renamed select * from renamed

View File

@@ -1,24 +1,102 @@
-- Staging model: KS4 attainment data from EES (secondary schools — NEW) -- Staging model: KS4 attainment data from EES
-- KS4 performance data is long-format with breakdown dimensions (breakdown_topic,
-- breakdown, sex). Unlike KS2 which has a subject dimension, KS4 metrics are
-- already in separate columns — we just filter to the 'All pupils' breakdown.
-- EES uses 'z' for suppressed values — cast to null via nullif.
with source as ( with performance as (
select * from {{ source('raw', 'ees_ks4') }} select * from {{ source('raw', 'ees_ks4_performance') }}
where school_urn is not null
), ),
renamed as ( -- Filter to all-pupils totals (one row per school per year)
all_pupils as (
select select
cast(urn as integer) as urn, cast(school_urn as integer) as urn,
cast(time_period as integer) as year, cast(time_period as integer) as year,
cast(t_pupils as integer) as total_pupils, cast(nullif(pupil_count, 'z') as integer) as total_pupils,
cast(progress_8_score as numeric) as progress_8_score,
cast(attainment_8_score as numeric) as attainment_8_score, -- Attainment 8
cast(ebacc_entry_pct as numeric) as ebacc_entry_pct, cast(nullif(attainment8_average, 'z') as numeric) as attainment_8_score,
cast(ebacc_achievement_pct as numeric) as ebacc_achievement_pct,
cast(english_strong_pass_pct as numeric) as english_strong_pass_pct, -- Progress 8
cast(maths_strong_pass_pct as numeric) as maths_strong_pass_pct, cast(nullif(progress8_average, 'z') as numeric) as progress_8_score,
cast(english_maths_strong_pass_pct as numeric) as english_maths_strong_pass_pct, cast(nullif(progress8_lower_95_ci, 'z') as numeric) as progress_8_lower_ci,
cast(staying_in_education_pct as numeric) as staying_in_education_pct cast(nullif(progress8_upper_95_ci, 'z') as numeric) as progress_8_upper_ci,
from source cast(nullif(progress8eng_average, 'z') as numeric) as progress_8_english,
where urn is not null cast(nullif(progress8mat_average, 'z') as numeric) as progress_8_maths,
cast(nullif(progress8ebacc_average, 'z') as numeric) as progress_8_ebacc,
cast(nullif(progress8open_average, 'z') as numeric) as progress_8_open,
-- English & Maths pass rates
cast(nullif(engmath_95_percent, 'z') as numeric) as english_maths_strong_pass_pct,
cast(nullif(engmath_94_percent, 'z') as numeric) as english_maths_standard_pass_pct,
-- EBacc
cast(nullif(ebacc_entering_percent, 'z') as numeric) as ebacc_entry_pct,
cast(nullif(ebacc_95_percent, 'z') as numeric) as ebacc_strong_pass_pct,
cast(nullif(ebacc_94_percent, 'z') as numeric) as ebacc_standard_pass_pct,
cast(nullif(ebacc_aps_average, 'z') as numeric) as ebacc_avg_score,
-- GCSE grade 9-1
cast(nullif(gcse_91_percent, 'z') as numeric) as gcse_grade_91_pct
from performance
where breakdown_topic = 'All pupils'
and breakdown = 'Total'
and sex = 'Total'
),
-- KS4 info table for context/demographics
info as (
select
cast(school_urn as integer) as urn,
cast(time_period as integer) as year,
cast(nullif(endks4_pupil_count, 'z') as integer) as eligible_pupils,
cast(nullif(ks2_scaledscore_average, 'z') as numeric) as prior_attainment_avg,
cast(nullif(sen_pupil_percent, 'z') as numeric) as sen_pct,
cast(nullif(sen_with_ehcp_pupil_percent, 'z') as numeric) as sen_ehcp_pct,
cast(nullif(sen_no_ehcp_pupil_percent, 'z') as numeric) as sen_support_pct
from {{ source('raw', 'ees_ks4_info') }}
where school_urn is not null
) )
select * from renamed select
p.urn,
p.year,
p.total_pupils,
i.eligible_pupils,
i.prior_attainment_avg,
-- Attainment 8
p.attainment_8_score,
-- Progress 8
p.progress_8_score,
p.progress_8_lower_ci,
p.progress_8_upper_ci,
p.progress_8_english,
p.progress_8_maths,
p.progress_8_ebacc,
p.progress_8_open,
-- English & Maths
p.english_maths_strong_pass_pct,
p.english_maths_standard_pass_pct,
-- EBacc
p.ebacc_entry_pct,
p.ebacc_strong_pass_pct,
p.ebacc_standard_pass_pct,
p.ebacc_avg_score,
-- GCSE
p.gcse_grade_91_pct,
-- Context
i.sen_pct,
i.sen_ehcp_pct,
i.sen_support_pct
from all_pupils p
left join info i on p.urn = i.urn and p.year = i.year

View File

@@ -1,17 +0,0 @@
-- Staging model: Phonics screening check results from EES
with source as (
select * from {{ source('raw', 'ees_phonics') }}
),
renamed as (
select
cast(urn as integer) as urn,
cast(time_period as integer) as year,
cast(year1_phonics_pct as numeric) as year1_phonics_pct,
cast(year2_phonics_pct as numeric) as year2_phonics_pct
from source
where urn is not null
)
select * from renamed