Files
school_compare/backend/data_loader.py
Tudor 1a9341eaf4
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 1m1s
Simplify school types and persist selected schools
- Add runtime normalization of cryptic school type codes to user-friendly names
  (e.g., AC/ACC/ACCS -> "Academy", CY/CYS -> "Community")
- Update SCHOOL_TYPE_MAP in schemas.py with consolidated mappings
- Add normalize_school_type() and get_school_type_codes_for_filter() helpers
- Persist selected schools in localStorage across page refreshes
- Move "Add to Compare" button from modal footer to header

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 15:55:23 +00:00

514 lines
16 KiB
Python

"""
Data loading module that queries from PostgreSQL database.
Provides efficient queries with caching and lazy loading.
Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py).
Only user search postcodes are geocoded on-demand via geocode_single_postcode().
"""
import pandas as pd
import numpy as np
from functools import lru_cache
from typing import Optional, Dict, Tuple, List
import requests
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload, Session
from .config import settings
from .database import SessionLocal, get_db_session
from .models import School, SchoolResult
from .schemas import SCHOOL_TYPE_MAP
# Cache for user search postcode geocoding (not for school data)
_postcode_cache: Dict[str, Tuple[float, float]] = {}
def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
"""Convert cryptic school type codes to user-friendly names."""
if not school_type:
return None
# Check if it's a code that needs mapping
code = school_type.strip().upper()
if code in SCHOOL_TYPE_MAP:
return SCHOOL_TYPE_MAP[code]
# Return original if already a friendly name or unknown code
return school_type
def get_school_type_codes_for_filter(school_type: str) -> List[str]:
"""Get all database codes that map to a given friendly name."""
if not school_type:
return []
school_type_lower = school_type.lower()
# Collect all codes that map to this friendly name
codes = []
for code, friendly_name in SCHOOL_TYPE_MAP.items():
if friendly_name.lower() == school_type_lower:
codes.append(code.lower())
# Also include the school_type itself (case-insensitive) in case it's stored as-is
codes.append(school_type_lower)
return codes
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}',
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('result'):
lat = data['result'].get('latitude')
lon = data['result'].get('longitude')
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in miles).
"""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
# =============================================================================
# DATABASE QUERY FUNCTIONS
# =============================================================================
def get_db():
"""Get a database session."""
return SessionLocal()
def get_available_years(db: Session = None) -> List[int]:
"""Get list of available years in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
return [r[0] for r in result]
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types (normalized to user-friendly names)."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.all()
# Normalize codes to friendly names and deduplicate
normalized = set()
for r in result:
if r[0]:
friendly_name = normalize_school_type(r[0])
if friendly_name:
normalized.add(friendly_name)
return sorted(normalized)
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
"""
Get paginated list of schools with optional filters.
Returns (schools, total_count).
"""
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
return schools, total
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
"""
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
"""
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
# Filter by all codes that map to this friendly name
type_codes = get_school_type_codes_for_filter(school_type)
if type_codes:
query = query.filter(func.lower(School.school_type).in_(type_codes))
# Get all matching schools and calculate distances
all_schools = query.all()
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
"""
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
"""
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
return query.limit(limit).all()
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
}
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
"""
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
"""
close_db = db is None
if db is None:
db = get_db()
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
rows = []
for school in schools:
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": normalize_school_type(school.school_type),
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
**result_to_dict(result)
}
rows.append(row)
if rows:
return pd.DataFrame(rows)
return pd.DataFrame()
finally:
if close_db:
db.close()
# Cache for DataFrame (legacy compatibility)
_df_cache: Optional[pd.DataFrame] = None
def load_school_data() -> pd.DataFrame:
"""
Legacy function to load school data as DataFrame.
Uses caching for performance.
"""
global _df_cache
if _df_cache is not None:
return _df_cache
print("Loading school data from database...")
_df_cache = load_school_data_as_dataframe()
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}")
else:
print("No data found in database")
return _df_cache
def clear_cache():
"""Clear all caches."""
global _df_cache
_df_cache = None