Files
school_compare/backend/data_loader.py
Tudor Sitaru 52fbade30c
Some checks failed
Build and Push Docker Image / build-and-push (push) Failing after 32s
Introducing Postgresql for persistance
2026-01-06 17:15:43 +00:00

519 lines
16 KiB
Python

"""
Data loading module that queries from PostgreSQL database.
Provides efficient queries with caching and lazy loading.
"""
import pandas as pd
import numpy as np
from functools import lru_cache
from typing import Optional, Dict, Tuple, List
import requests
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import joinedload, Session
from .config import settings
from .database import SessionLocal, get_db_session
from .models import School, SchoolResult
# Cache for postcode geocoding
_postcode_cache: Dict[str, Tuple[float, float]] = {}
def geocode_postcodes_bulk(postcodes: list) -> Dict[str, Tuple[float, float]]:
"""
Geocode postcodes in bulk using postcodes.io API.
Returns dict of postcode -> (latitude, longitude).
"""
results = {}
# Check cache first
uncached = []
for pc in postcodes:
if pc and isinstance(pc, str):
pc_upper = pc.strip().upper()
if pc_upper in _postcode_cache:
results[pc_upper] = _postcode_cache[pc_upper]
elif len(pc_upper) >= 5:
uncached.append(pc_upper)
if not uncached:
return results
uncached = list(set(uncached))
# postcodes.io allows max 100 postcodes per request
batch_size = 100
for i in range(0, len(uncached), batch_size):
batch = uncached[i:i + batch_size]
try:
response = requests.post(
'https://api.postcodes.io/postcodes',
json={'postcodes': batch},
timeout=30
)
if response.status_code == 200:
data = response.json()
for item in data.get('result', []):
if item and item.get('result'):
pc = item['query'].upper()
lat = item['result'].get('latitude')
lon = item['result'].get('longitude')
if lat and lon:
results[pc] = (lat, lon)
_postcode_cache[pc] = (lat, lon)
except Exception as e:
print(f" Warning: Geocoding batch failed: {e}")
return results
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
# Check cache first
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f'https://api.postcodes.io/postcodes/{postcode}',
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('result'):
lat = data['result'].get('latitude')
lon = data['result'].get('longitude')
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in miles).
"""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
# =============================================================================
# DATABASE QUERY FUNCTIONS
# =============================================================================
def get_db():
"""Get a database session."""
return SessionLocal()
def get_available_years(db: Session = None) -> List[int]:
"""Get list of available years in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
return [r[0] for r in result]
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
"""Get list of available local authorities."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.local_authority)\
.filter(School.local_authority.isnot(None))\
.distinct()\
.order_by(School.local_authority)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_available_school_types(db: Session = None) -> List[str]:
"""Get list of available school types."""
close_db = db is None
if db is None:
db = get_db()
try:
result = db.query(School.school_type)\
.filter(School.school_type.isnot(None))\
.distinct()\
.order_by(School.school_type)\
.all()
return [r[0] for r in result if r[0]]
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
"""Get total number of schools."""
close_db = db is None
if db is None:
db = get_db()
try:
return db.query(School).count()
finally:
if close_db:
db.close()
def get_schools(
db: Session,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[School], int]:
"""
Get paginated list of schools with optional filters.
Returns (schools, total_count).
"""
query = db.query(School)
# Apply filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
query = query.filter(func.lower(School.school_type) == school_type.lower())
# Get total count
total = query.count()
# Apply pagination
offset = (page - 1) * page_size
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
return schools, total
def get_schools_near_location(
db: Session,
latitude: float,
longitude: float,
radius_miles: float = 5.0,
search: Optional[str] = None,
local_authority: Optional[str] = None,
school_type: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> Tuple[List[Tuple[School, float]], int]:
"""
Get schools near a location, sorted by distance.
Returns list of (school, distance) tuples and total count.
"""
# Get all schools with coordinates
query = db.query(School).filter(
School.latitude.isnot(None),
School.longitude.isnot(None)
)
# Apply text filters
if search:
search_lower = f"%{search.lower()}%"
query = query.filter(
or_(
func.lower(School.school_name).like(search_lower),
func.lower(School.postcode).like(search_lower),
func.lower(School.town).like(search_lower),
)
)
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
if school_type:
query = query.filter(func.lower(School.school_type) == school_type.lower())
# Get all matching schools and calculate distances
all_schools = query.all()
schools_with_distance = []
for school in all_schools:
if school.latitude and school.longitude:
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
if dist <= radius_miles:
schools_with_distance.append((school, dist))
# Sort by distance
schools_with_distance.sort(key=lambda x: x[1])
total = len(schools_with_distance)
# Paginate
offset = (page - 1) * page_size
paginated = schools_with_distance[offset:offset + page_size]
return paginated, total
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
"""Get a single school by URN."""
return db.query(School).filter(School.urn == urn).first()
def get_school_results(
db: Session,
urn: int,
years: Optional[List[int]] = None
) -> List[SchoolResult]:
"""Get all results for a school, optionally filtered by years."""
query = db.query(SchoolResult)\
.join(School)\
.filter(School.urn == urn)\
.order_by(SchoolResult.year)
if years:
query = query.filter(SchoolResult.year.in_(years))
return query.all()
def get_rankings(
db: Session,
metric: str,
year: int,
local_authority: Optional[str] = None,
limit: int = 20,
ascending: bool = False,
) -> List[Tuple[School, SchoolResult]]:
"""
Get school rankings for a specific metric and year.
Returns list of (school, result) tuples.
"""
# Build the query
query = db.query(School, SchoolResult)\
.join(SchoolResult)\
.filter(SchoolResult.year == year)
# Filter by local authority
if local_authority:
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
# Get the metric column
metric_column = getattr(SchoolResult, metric, None)
if metric_column is None:
return []
# Filter out nulls and order
query = query.filter(metric_column.isnot(None))
if ascending:
query = query.order_by(metric_column.asc())
else:
query = query.order_by(metric_column.desc())
return query.limit(limit).all()
def get_data_info(db: Session = None) -> dict:
"""Get information about the data in the database."""
close_db = db is None
if db is None:
db = get_db()
try:
school_count = db.query(School).count()
result_count = db.query(SchoolResult).count()
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"total_results": result_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL",
}
finally:
if close_db:
db.close()
def school_to_dict(school: School, include_results: bool = False) -> dict:
"""Convert a School model to dictionary."""
data = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": school.school_type,
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
}
if include_results and school.results:
data["results"] = [result_to_dict(r) for r in school.results]
return data
def result_to_dict(result: SchoolResult) -> dict:
"""Convert a SchoolResult model to dictionary."""
return {
"year": result.year,
"total_pupils": result.total_pupils,
"eligible_pupils": result.eligible_pupils,
# Expected Standard
"rwm_expected_pct": result.rwm_expected_pct,
"reading_expected_pct": result.reading_expected_pct,
"writing_expected_pct": result.writing_expected_pct,
"maths_expected_pct": result.maths_expected_pct,
"gps_expected_pct": result.gps_expected_pct,
"science_expected_pct": result.science_expected_pct,
# Higher Standard
"rwm_high_pct": result.rwm_high_pct,
"reading_high_pct": result.reading_high_pct,
"writing_high_pct": result.writing_high_pct,
"maths_high_pct": result.maths_high_pct,
"gps_high_pct": result.gps_high_pct,
# Progress
"reading_progress": result.reading_progress,
"writing_progress": result.writing_progress,
"maths_progress": result.maths_progress,
# Averages
"reading_avg_score": result.reading_avg_score,
"maths_avg_score": result.maths_avg_score,
"gps_avg_score": result.gps_avg_score,
# Context
"disadvantaged_pct": result.disadvantaged_pct,
"eal_pct": result.eal_pct,
"sen_support_pct": result.sen_support_pct,
"sen_ehcp_pct": result.sen_ehcp_pct,
"stability_pct": result.stability_pct,
# Gender
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
"rwm_high_boys_pct": result.rwm_high_boys_pct,
"rwm_high_girls_pct": result.rwm_high_girls_pct,
# Disadvantaged
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
"disadvantaged_gap": result.disadvantaged_gap,
# 3-Year
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
"reading_avg_3yr": result.reading_avg_3yr,
"maths_avg_3yr": result.maths_avg_3yr,
}
# =============================================================================
# LEGACY COMPATIBILITY - DataFrame-based functions
# =============================================================================
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
"""
Load all school data as a pandas DataFrame.
For compatibility with existing code that expects DataFrames.
"""
close_db = db is None
if db is None:
db = get_db()
try:
# Query all schools with their results
schools = db.query(School).options(joinedload(School.results)).all()
rows = []
for school in schools:
for result in school.results:
row = {
"urn": school.urn,
"school_name": school.school_name,
"local_authority": school.local_authority,
"school_type": school.school_type,
"address": school.address,
"town": school.town,
"postcode": school.postcode,
"latitude": school.latitude,
"longitude": school.longitude,
**result_to_dict(result)
}
rows.append(row)
if rows:
return pd.DataFrame(rows)
return pd.DataFrame()
finally:
if close_db:
db.close()
# Cache for DataFrame (legacy compatibility)
_df_cache: Optional[pd.DataFrame] = None
def load_school_data() -> pd.DataFrame:
"""
Legacy function to load school data as DataFrame.
Uses caching for performance.
"""
global _df_cache
if _df_cache is not None:
return _df_cache
print("Loading school data from database...")
_df_cache = load_school_data_as_dataframe()
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].unique())}")
else:
print("No data found in database")
return _df_cache
def clear_cache():
"""Clear all caches."""
global _df_cache
_df_cache = None