Files
school_compare/backend/data_loader.py

685 lines
22 KiB
Python
Raw Normal View History

2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Data loading module that queries from PostgreSQL database.
Provides efficient queries with caching and lazy loading.
2026-01-08 15:30:33 +00:00
Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py).
Only user search postcodes are geocoded on-demand via geocode_single_postcode().
2026-01-06 16:30:32 +00:00
"""
import pandas as pd
import numpy as np
from functools import lru_cache
2026-01-06 17:15:43 +00:00
from typing import Optional, Dict, Tuple, List
2026-01-06 16:59:25 +00:00
import requests
2026-01-06 17:15:43 +00:00
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload, Session
2026-01-06 16:30:32 +00:00
from .config import settings
2026-01-06 17:15:43 +00:00
from .database import SessionLocal, get_db_session
from .models import (
School, SchoolResult,
OfstedInspection, OfstedParentView, SchoolCensus,
SchoolAdmissions, SenDetail, Phonics, SchoolDeprivation, SchoolFinance,
)
from .schemas import SCHOOL_TYPE_MAP
2026-01-06 16:30:32 +00:00
2026-01-08 15:30:33 +00:00
# Cache for user search postcode geocoding (not for school data)
2026-01-06 16:59:25 +00:00
_postcode_cache: Dict[str, Tuple[float, float]] = {}
def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
"""Convert cryptic school type codes to user-friendly names."""
if not school_type:
return None
# Check if it's a code that needs mapping
code = school_type.strip().upper()
if code in SCHOOL_TYPE_MAP:
return SCHOOL_TYPE_MAP[code]
# Return original if already a friendly name or unknown code
return school_type
def get_school_type_codes_for_filter(school_type: str) -> List[str]:
"""Get all database codes that map to a given friendly name."""
if not school_type:
return []
school_type_lower = school_type.lower()
# Collect all codes that map to this friendly name
codes = []
for code, friendly_name in SCHOOL_TYPE_MAP.items():
if friendly_name.lower() == school_type_lower:
codes.append(code.lower())
# Also include the school_type itself (case-insensitive) in case it's stored as-is
codes.append(school_type_lower)
return codes
2026-01-06 16:59:25 +00:00
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}',
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('result'):
lat = data['result'].get('latitude')
lon = data['result'].get('longitude')
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in miles).
"""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# =============================================================================
# DATABASE QUERY FUNCTIONS
# =============================================================================
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_db():
"""Get a database session."""
return SessionLocal()
def get_available_years(db: Session = None) -> List[int]:
"""Get list of available years in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
return [r[0] for r in result]
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types (normalized to user-friendly names)."""
2026-01-06 17:15:43 +00:00
close_db = db is None
if db is None:
db = get_db()
2026-01-06 17:15:43 +00:00
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.all()
# Normalize codes to friendly names and deduplicate
normalized = set()
for r in result:
if r[0]:
friendly_name = normalize_school_type(r[0])
if friendly_name:
normalized.add(friendly_name)
return sorted(normalized)
2026-01-06 17:15:43 +00:00
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get paginated list of schools with optional filters.
Returns (schools, total_count).
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
2026-01-06 17:15:43 +00:00
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
2026-01-06 17:15:43 +00:00
# Get total count
total = query.count()
2026-01-06 17:15:43 +00:00
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
2026-01-06 17:15:43 +00:00
return schools, total
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
2026-01-06 17:15:43 +00:00
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
2026-01-06 17:15:43 +00:00
# Get all matching schools and calculate distances
all_schools = query.all()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
return query.limit(limit).all()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
2026-01-06 16:30:32 +00:00
try:
2026-01-06 17:15:43 +00:00
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
2026-01-06 17:15:43 +00:00
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
2026-01-06 17:15:43 +00:00
}
2026-01-06 17:15:43 +00:00
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
2026-01-06 17:15:43 +00:00
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
close_db = db is None
if db is None:
db = get_db()
2026-01-06 17:15:43 +00:00
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
# Load Ofsted data into a lookup dict (urn → grade, date)
ofsted_lookup: Dict[int, dict] = {}
try:
ofsted_rows = db.query(
OfstedInspection.urn,
OfstedInspection.overall_effectiveness,
OfstedInspection.inspection_date,
).all()
for o in ofsted_rows:
ofsted_lookup[o.urn] = {
"ofsted_grade": o.overall_effectiveness,
"ofsted_date": o.inspection_date.isoformat() if o.inspection_date else None,
}
except Exception:
pass # Table may not exist yet on first run
2026-01-06 17:15:43 +00:00
rows = []
for school in schools:
ofsted = ofsted_lookup.get(school.urn, {})
2026-01-06 17:15:43 +00:00
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
2026-01-06 17:15:43 +00:00
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
# GIAS fields
"website": school.website,
"headteacher_name": school.headteacher_name,
"capacity": school.capacity,
"trust_name": school.trust_name,
"gender": school.gender,
# Ofsted (for list view)
"ofsted_grade": ofsted.get("ofsted_grade"),
"ofsted_date": ofsted.get("ofsted_date"),
2026-01-06 17:15:43 +00:00
**result_to_dict(result)
}
rows.append(row)
2026-01-06 17:15:43 +00:00
if rows:
return pd.DataFrame(rows)
2026-01-06 16:30:32 +00:00
return pd.DataFrame()
2026-01-06 17:15:43 +00:00
finally:
if close_db:
db.close()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Cache for DataFrame (legacy compatibility)
_df_cache: Optional[pd.DataFrame] = None
2026-01-06 16:30:32 +00:00
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
def load_school_data() -> pd.DataFrame:
2026-01-06 16:59:25 +00:00
"""
2026-01-06 17:15:43 +00:00
Legacy function to load school data as DataFrame.
Uses caching for performance.
2026-01-06 16:59:25 +00:00
"""
2026-01-06 17:15:43 +00:00
global _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if _df_cache is not None:
return _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
print("Loading school data from database...")
_df_cache = load_school_data_as_dataframe()
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}")
else:
print("No data found in database")
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
return _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
def clear_cache():
"""Clear all caches."""
global _df_cache
_df_cache = None
def get_supplementary_data(db: Session, urn: int) -> dict:
"""
Fetch all supplementary data for a single school URN.
Returns a dict with keys: ofsted, parent_view, census, admissions, sen_detail,
phonics, deprivation, finance. Values are dicts or None.
"""
result = {}
def safe_query(model, pk_field, latest_year_field=None):
try:
if latest_year_field:
row = (
db.query(model)
.filter(getattr(model, pk_field) == urn)
.order_by(getattr(model, latest_year_field).desc())
.first()
)
else:
row = db.query(model).filter(getattr(model, pk_field) == urn).first()
return row
except Exception:
return None
# Ofsted inspection
o = safe_query(OfstedInspection, "urn")
result["ofsted"] = {
"framework": o.framework,
"inspection_date": o.inspection_date.isoformat() if o.inspection_date else None,
"inspection_type": o.inspection_type,
# OEIF fields (old framework)
"overall_effectiveness": o.overall_effectiveness,
"quality_of_education": o.quality_of_education,
"behaviour_attitudes": o.behaviour_attitudes,
"personal_development": o.personal_development,
"leadership_management": o.leadership_management,
"early_years_provision": o.early_years_provision,
"previous_overall": o.previous_overall,
# Report Card fields (new framework, from Nov 2025)
"rc_safeguarding_met": o.rc_safeguarding_met,
"rc_inclusion": o.rc_inclusion,
"rc_curriculum_teaching": o.rc_curriculum_teaching,
"rc_achievement": o.rc_achievement,
"rc_attendance_behaviour": o.rc_attendance_behaviour,
"rc_personal_development": o.rc_personal_development,
"rc_leadership_governance": o.rc_leadership_governance,
"rc_early_years": o.rc_early_years,
"rc_sixth_form": o.rc_sixth_form,
} if o else None
# Parent View
pv = safe_query(OfstedParentView, "urn")
result["parent_view"] = {
"survey_date": pv.survey_date.isoformat() if pv.survey_date else None,
"total_responses": pv.total_responses,
"q_happy_pct": pv.q_happy_pct,
"q_safe_pct": pv.q_safe_pct,
"q_behaviour_pct": pv.q_behaviour_pct,
"q_bullying_pct": pv.q_bullying_pct,
"q_communication_pct": pv.q_communication_pct,
"q_progress_pct": pv.q_progress_pct,
"q_teaching_pct": pv.q_teaching_pct,
"q_information_pct": pv.q_information_pct,
"q_curriculum_pct": pv.q_curriculum_pct,
"q_future_pct": pv.q_future_pct,
"q_leadership_pct": pv.q_leadership_pct,
"q_wellbeing_pct": pv.q_wellbeing_pct,
"q_recommend_pct": pv.q_recommend_pct,
"q_sen_pct": pv.q_sen_pct,
} if pv else None
# School Census (latest year)
c = safe_query(SchoolCensus, "urn", "year")
result["census"] = {
"year": c.year,
"class_size_avg": c.class_size_avg,
"ethnicity_white_pct": c.ethnicity_white_pct,
"ethnicity_asian_pct": c.ethnicity_asian_pct,
"ethnicity_black_pct": c.ethnicity_black_pct,
"ethnicity_mixed_pct": c.ethnicity_mixed_pct,
"ethnicity_other_pct": c.ethnicity_other_pct,
} if c else None
# Admissions (latest year)
a = safe_query(SchoolAdmissions, "urn", "year")
result["admissions"] = {
"year": a.year,
"published_admission_number": a.published_admission_number,
"total_applications": a.total_applications,
"first_preference_offers_pct": a.first_preference_offers_pct,
"oversubscribed": a.oversubscribed,
} if a else None
# SEN Detail (latest year)
s = safe_query(SenDetail, "urn", "year")
result["sen_detail"] = {
"year": s.year,
"primary_need_speech_pct": s.primary_need_speech_pct,
"primary_need_autism_pct": s.primary_need_autism_pct,
"primary_need_mld_pct": s.primary_need_mld_pct,
"primary_need_spld_pct": s.primary_need_spld_pct,
"primary_need_semh_pct": s.primary_need_semh_pct,
"primary_need_physical_pct": s.primary_need_physical_pct,
"primary_need_other_pct": s.primary_need_other_pct,
} if s else None
# Phonics (latest year)
ph = safe_query(Phonics, "urn", "year")
result["phonics"] = {
"year": ph.year,
"year1_phonics_pct": ph.year1_phonics_pct,
"year2_phonics_pct": ph.year2_phonics_pct,
} if ph else None
# Deprivation
d = safe_query(SchoolDeprivation, "urn")
result["deprivation"] = {
"lsoa_code": d.lsoa_code,
"idaci_score": d.idaci_score,
"idaci_decile": d.idaci_decile,
} if d else None
# Finance (latest year)
f = safe_query(SchoolFinance, "urn", "year")
result["finance"] = {
"year": f.year,
"per_pupil_spend": f.per_pupil_spend,
"staff_cost_pct": f.staff_cost_pct,
"teacher_cost_pct": f.teacher_cost_pct,
"support_staff_cost_pct": f.support_staff_cost_pct,
"premises_cost_pct": f.premises_cost_pct,
} if f else None
return result