Files
school_compare/backend/data_loader.py

519 lines
16 KiB
Python
Raw Normal View History

2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Data loading module that queries from PostgreSQL database.
Provides efficient queries with caching and lazy loading.
2026-01-06 16:30:32 +00:00
"""
import pandas as pd
import numpy as np
from functools import lru_cache
2026-01-06 17:15:43 +00:00
from typing import Optional, Dict, Tuple, List
2026-01-06 16:59:25 +00:00
import requests
2026-01-06 17:15:43 +00:00
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload, Session
2026-01-06 16:30:32 +00:00
from .config import settings
2026-01-06 17:15:43 +00:00
from .database import SessionLocal, get_db_session
from .models import School, SchoolResult
2026-01-06 16:30:32 +00:00
2026-01-06 16:59:25 +00:00
# Cache for postcode geocoding
_postcode_cache: Dict[str, Tuple[float, float]] = {}
def geocode_postcodes_bulk(postcodes: list) -> Dict[str, Tuple[float, float]]:
"""
Geocode postcodes in bulk using postcodes.io API.
Returns dict of postcode -> (latitude, longitude).
"""
results = {}
2026-01-06 17:15:43 +00:00
# Check cache first
uncached = []
for pc in postcodes:
if pc and isinstance(pc, str):
pc_upper = pc.strip().upper()
if pc_upper in _postcode_cache:
results[pc_upper] = _postcode_cache[pc_upper]
elif len(pc_upper) >= 5:
uncached.append(pc_upper)
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if not uncached:
2026-01-06 16:59:25 +00:00
return results
2026-01-06 17:15:43 +00:00
uncached = list(set(uncached))
2026-01-06 16:59:25 +00:00
# postcodes.io allows max 100 postcodes per request
batch_size = 100
2026-01-06 17:15:43 +00:00
for i in range(0, len(uncached), batch_size):
batch = uncached[i:i + batch_size]
2026-01-06 16:59:25 +00:00
try:
response = requests.post(
'https://api.postcodes.io/postcodes',
json={'postcodes': batch},
timeout=30
)
if response.status_code == 200:
data = response.json()
for item in data.get('result', []):
if item and item.get('result'):
pc = item['query'].upper()
lat = item['result'].get('latitude')
lon = item['result'].get('longitude')
if lat and lon:
results[pc] = (lat, lon)
2026-01-06 17:15:43 +00:00
_postcode_cache[pc] = (lat, lon)
2026-01-06 16:59:25 +00:00
except Exception as e:
print(f" Warning: Geocoding batch failed: {e}")
return results
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}',
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('result'):
lat = data['result'].get('latitude')
lon = data['result'].get('longitude')
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in miles).
"""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# =============================================================================
# DATABASE QUERY FUNCTIONS
# =============================================================================
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_db():
"""Get a database session."""
return SessionLocal()
def get_available_years(db: Session = None) -> List[int]:
"""Get list of available years in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
return [r[0] for r in result]
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.order_by(School.school_type)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get paginated list of schools with optional filters.
Returns (schools, total_count).
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
if school_type:
query = query.filter(func.lower(School.school_type) == school_type.lower())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Get total count
total = query.count()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
return schools, total
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
query = query.filter(func.lower(School.school_type) == school_type.lower())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Get all matching schools and calculate distances
all_schools = query.all()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
return query.limit(limit).all()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
2026-01-06 16:30:32 +00:00
try:
2026-01-06 17:15:43 +00:00
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": school.school_type,
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
}
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
2026-01-06 16:30:32 +00:00
"""
2026-01-06 17:15:43 +00:00
close_db = db is None
if db is None:
db = get_db()
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
rows = []
for school in schools:
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": school.school_type,
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
**result_to_dict(result)
}
rows.append(row)
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if rows:
return pd.DataFrame(rows)
2026-01-06 16:30:32 +00:00
return pd.DataFrame()
2026-01-06 17:15:43 +00:00
finally:
if close_db:
db.close()
2026-01-06 16:30:32 +00:00
2026-01-06 17:15:43 +00:00
# Cache for DataFrame (legacy compatibility)
_df_cache: Optional[pd.DataFrame] = None
2026-01-06 16:30:32 +00:00
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
def load_school_data() -> pd.DataFrame:
2026-01-06 16:59:25 +00:00
"""
2026-01-06 17:15:43 +00:00
Legacy function to load school data as DataFrame.
Uses caching for performance.
2026-01-06 16:59:25 +00:00
"""
2026-01-06 17:15:43 +00:00
global _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if _df_cache is not None:
return _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
print("Loading school data from database...")
_df_cache = load_school_data_as_dataframe()
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}")
else:
print("No data found in database")
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
return _df_cache
2026-01-06 16:59:25 +00:00
2026-01-06 17:15:43 +00:00
def clear_cache():
"""Clear all caches."""
global _df_cache
_df_cache = None