All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 1m1s
- Add runtime normalization of cryptic school type codes to user-friendly names (e.g., AC/ACC/ACCS -> "Academy", CY/CYS -> "Community") - Update SCHOOL_TYPE_MAP in schemas.py with consolidated mappings - Add normalize_school_type() and get_school_type_codes_for_filter() helpers - Persist selected schools in localStorage across page refreshes - Move "Add to Compare" button from modal footer to header Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
514 lines
16 KiB
Python
514 lines
16 KiB
Python
"""
|
|
Data loading module that queries from PostgreSQL database.
|
|
Provides efficient queries with caching and lazy loading.
|
|
|
|
Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py).
|
|
Only user search postcodes are geocoded on-demand via geocode_single_postcode().
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from functools import lru_cache
|
|
from typing import Optional, Dict, Tuple, List
|
|
import requests
|
|
from sqlalchemy import select, func, and_, or_
|
|
from sqlalchemy.orm import joinedload, Session
|
|
|
|
from .config import settings
|
|
from .database import SessionLocal, get_db_session
|
|
from .models import School, SchoolResult
|
|
from .schemas import SCHOOL_TYPE_MAP
|
|
|
|
# Cache for user search postcode geocoding (not for school data)
|
|
_postcode_cache: Dict[str, Tuple[float, float]] = {}
|
|
|
|
|
|
def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
|
|
"""Convert cryptic school type codes to user-friendly names."""
|
|
if not school_type:
|
|
return None
|
|
# Check if it's a code that needs mapping
|
|
code = school_type.strip().upper()
|
|
if code in SCHOOL_TYPE_MAP:
|
|
return SCHOOL_TYPE_MAP[code]
|
|
# Return original if already a friendly name or unknown code
|
|
return school_type
|
|
|
|
|
|
def get_school_type_codes_for_filter(school_type: str) -> List[str]:
|
|
"""Get all database codes that map to a given friendly name."""
|
|
if not school_type:
|
|
return []
|
|
school_type_lower = school_type.lower()
|
|
# Collect all codes that map to this friendly name
|
|
codes = []
|
|
for code, friendly_name in SCHOOL_TYPE_MAP.items():
|
|
if friendly_name.lower() == school_type_lower:
|
|
codes.append(code.lower())
|
|
# Also include the school_type itself (case-insensitive) in case it's stored as-is
|
|
codes.append(school_type_lower)
|
|
return codes
|
|
|
|
|
|
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
|
|
"""Geocode a single postcode using postcodes.io API."""
|
|
if not postcode:
|
|
return None
|
|
|
|
postcode = postcode.strip().upper()
|
|
|
|
# Check cache first
|
|
if postcode in _postcode_cache:
|
|
return _postcode_cache[postcode]
|
|
|
|
try:
|
|
response = requests.get(
|
|
f'https://api.postcodes.io/postcodes/{postcode}',
|
|
timeout=10
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
if data.get('result'):
|
|
lat = data['result'].get('latitude')
|
|
lon = data['result'].get('longitude')
|
|
if lat and lon:
|
|
_postcode_cache[postcode] = (lat, lon)
|
|
return (lat, lon)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""
|
|
Calculate the great circle distance between two points on Earth (in miles).
|
|
"""
|
|
from math import radians, cos, sin, asin, sqrt
|
|
|
|
# Convert to radians
|
|
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
|
|
|
|
# Haversine formula
|
|
dlat = lat2 - lat1
|
|
dlon = lon2 - lon1
|
|
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
|
|
c = 2 * asin(sqrt(a))
|
|
|
|
# Earth's radius in miles
|
|
r = 3956
|
|
|
|
return c * r
|
|
|
|
|
|
# =============================================================================
|
|
# DATABASE QUERY FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def get_db():
|
|
"""Get a database session."""
|
|
return SessionLocal()
|
|
|
|
|
|
def get_available_years(db: Session = None) -> List[int]:
|
|
"""Get list of available years in the database."""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all()
|
|
return [r[0] for r in result]
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
def get_available_local_authorities(db: Session = None) -> List[str]:
|
|
"""Get list of available local authorities."""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
result = db.query(School.local_authority)\
|
|
.filter(School.local_authority.isnot(None))\
|
|
.distinct()\
|
|
.order_by(School.local_authority)\
|
|
.all()
|
|
return [r[0] for r in result if r[0]]
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
def get_available_school_types(db: Session = None) -> List[str]:
|
|
"""Get list of available school types (normalized to user-friendly names)."""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
result = db.query(School.school_type)\
|
|
.filter(School.school_type.isnot(None))\
|
|
.distinct()\
|
|
.all()
|
|
# Normalize codes to friendly names and deduplicate
|
|
normalized = set()
|
|
for r in result:
|
|
if r[0]:
|
|
friendly_name = normalize_school_type(r[0])
|
|
if friendly_name:
|
|
normalized.add(friendly_name)
|
|
return sorted(normalized)
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
def get_schools_count(db: Session = None) -> int:
|
|
"""Get total number of schools."""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
return db.query(School).count()
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
def get_schools(
|
|
db: Session,
|
|
search: Optional[str] = None,
|
|
local_authority: Optional[str] = None,
|
|
school_type: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
) -> Tuple[List[School], int]:
|
|
"""
|
|
Get paginated list of schools with optional filters.
|
|
Returns (schools, total_count).
|
|
"""
|
|
query = db.query(School)
|
|
|
|
# Apply filters
|
|
if search:
|
|
search_lower = f"%{search.lower()}%"
|
|
query = query.filter(
|
|
or_(
|
|
func.lower(School.school_name).like(search_lower),
|
|
func.lower(School.postcode).like(search_lower),
|
|
func.lower(School.town).like(search_lower),
|
|
)
|
|
)
|
|
|
|
if local_authority:
|
|
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
|
|
|
|
if school_type:
|
|
# Filter by all codes that map to this friendly name
|
|
type_codes = get_school_type_codes_for_filter(school_type)
|
|
if type_codes:
|
|
query = query.filter(func.lower(School.school_type).in_(type_codes))
|
|
|
|
# Get total count
|
|
total = query.count()
|
|
|
|
# Apply pagination
|
|
offset = (page - 1) * page_size
|
|
schools = query.order_by(School.school_name).offset(offset).limit(page_size).all()
|
|
|
|
return schools, total
|
|
|
|
|
|
def get_schools_near_location(
|
|
db: Session,
|
|
latitude: float,
|
|
longitude: float,
|
|
radius_miles: float = 5.0,
|
|
search: Optional[str] = None,
|
|
local_authority: Optional[str] = None,
|
|
school_type: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
) -> Tuple[List[Tuple[School, float]], int]:
|
|
"""
|
|
Get schools near a location, sorted by distance.
|
|
Returns list of (school, distance) tuples and total count.
|
|
"""
|
|
# Get all schools with coordinates
|
|
query = db.query(School).filter(
|
|
School.latitude.isnot(None),
|
|
School.longitude.isnot(None)
|
|
)
|
|
|
|
# Apply text filters
|
|
if search:
|
|
search_lower = f"%{search.lower()}%"
|
|
query = query.filter(
|
|
or_(
|
|
func.lower(School.school_name).like(search_lower),
|
|
func.lower(School.postcode).like(search_lower),
|
|
func.lower(School.town).like(search_lower),
|
|
)
|
|
)
|
|
|
|
if local_authority:
|
|
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
|
|
|
|
if school_type:
|
|
# Filter by all codes that map to this friendly name
|
|
type_codes = get_school_type_codes_for_filter(school_type)
|
|
if type_codes:
|
|
query = query.filter(func.lower(School.school_type).in_(type_codes))
|
|
|
|
# Get all matching schools and calculate distances
|
|
all_schools = query.all()
|
|
|
|
schools_with_distance = []
|
|
for school in all_schools:
|
|
if school.latitude and school.longitude:
|
|
dist = haversine_distance(latitude, longitude, school.latitude, school.longitude)
|
|
if dist <= radius_miles:
|
|
schools_with_distance.append((school, dist))
|
|
|
|
# Sort by distance
|
|
schools_with_distance.sort(key=lambda x: x[1])
|
|
|
|
total = len(schools_with_distance)
|
|
|
|
# Paginate
|
|
offset = (page - 1) * page_size
|
|
paginated = schools_with_distance[offset:offset + page_size]
|
|
|
|
return paginated, total
|
|
|
|
|
|
def get_school_by_urn(db: Session, urn: int) -> Optional[School]:
|
|
"""Get a single school by URN."""
|
|
return db.query(School).filter(School.urn == urn).first()
|
|
|
|
|
|
def get_school_results(
|
|
db: Session,
|
|
urn: int,
|
|
years: Optional[List[int]] = None
|
|
) -> List[SchoolResult]:
|
|
"""Get all results for a school, optionally filtered by years."""
|
|
query = db.query(SchoolResult)\
|
|
.join(School)\
|
|
.filter(School.urn == urn)\
|
|
.order_by(SchoolResult.year)
|
|
|
|
if years:
|
|
query = query.filter(SchoolResult.year.in_(years))
|
|
|
|
return query.all()
|
|
|
|
|
|
def get_rankings(
|
|
db: Session,
|
|
metric: str,
|
|
year: int,
|
|
local_authority: Optional[str] = None,
|
|
limit: int = 20,
|
|
ascending: bool = False,
|
|
) -> List[Tuple[School, SchoolResult]]:
|
|
"""
|
|
Get school rankings for a specific metric and year.
|
|
Returns list of (school, result) tuples.
|
|
"""
|
|
# Build the query
|
|
query = db.query(School, SchoolResult)\
|
|
.join(SchoolResult)\
|
|
.filter(SchoolResult.year == year)
|
|
|
|
# Filter by local authority
|
|
if local_authority:
|
|
query = query.filter(func.lower(School.local_authority) == local_authority.lower())
|
|
|
|
# Get the metric column
|
|
metric_column = getattr(SchoolResult, metric, None)
|
|
if metric_column is None:
|
|
return []
|
|
|
|
# Filter out nulls and order
|
|
query = query.filter(metric_column.isnot(None))
|
|
|
|
if ascending:
|
|
query = query.order_by(metric_column.asc())
|
|
else:
|
|
query = query.order_by(metric_column.desc())
|
|
|
|
return query.limit(limit).all()
|
|
|
|
|
|
def get_data_info(db: Session = None) -> dict:
|
|
"""Get information about the data in the database."""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
school_count = db.query(School).count()
|
|
result_count = db.query(SchoolResult).count()
|
|
years = get_available_years(db)
|
|
local_authorities = get_available_local_authorities(db)
|
|
|
|
return {
|
|
"total_schools": school_count,
|
|
"total_results": result_count,
|
|
"years_available": years,
|
|
"local_authorities_count": len(local_authorities),
|
|
"data_source": "PostgreSQL",
|
|
}
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
def school_to_dict(school: School, include_results: bool = False) -> dict:
|
|
"""Convert a School model to dictionary."""
|
|
data = {
|
|
"urn": school.urn,
|
|
"school_name": school.school_name,
|
|
"local_authority": school.local_authority,
|
|
"school_type": normalize_school_type(school.school_type),
|
|
"address": school.address,
|
|
"town": school.town,
|
|
"postcode": school.postcode,
|
|
"latitude": school.latitude,
|
|
"longitude": school.longitude,
|
|
}
|
|
|
|
if include_results and school.results:
|
|
data["results"] = [result_to_dict(r) for r in school.results]
|
|
|
|
return data
|
|
|
|
|
|
def result_to_dict(result: SchoolResult) -> dict:
|
|
"""Convert a SchoolResult model to dictionary."""
|
|
return {
|
|
"year": result.year,
|
|
"total_pupils": result.total_pupils,
|
|
"eligible_pupils": result.eligible_pupils,
|
|
# Expected Standard
|
|
"rwm_expected_pct": result.rwm_expected_pct,
|
|
"reading_expected_pct": result.reading_expected_pct,
|
|
"writing_expected_pct": result.writing_expected_pct,
|
|
"maths_expected_pct": result.maths_expected_pct,
|
|
"gps_expected_pct": result.gps_expected_pct,
|
|
"science_expected_pct": result.science_expected_pct,
|
|
# Higher Standard
|
|
"rwm_high_pct": result.rwm_high_pct,
|
|
"reading_high_pct": result.reading_high_pct,
|
|
"writing_high_pct": result.writing_high_pct,
|
|
"maths_high_pct": result.maths_high_pct,
|
|
"gps_high_pct": result.gps_high_pct,
|
|
# Progress
|
|
"reading_progress": result.reading_progress,
|
|
"writing_progress": result.writing_progress,
|
|
"maths_progress": result.maths_progress,
|
|
# Averages
|
|
"reading_avg_score": result.reading_avg_score,
|
|
"maths_avg_score": result.maths_avg_score,
|
|
"gps_avg_score": result.gps_avg_score,
|
|
# Context
|
|
"disadvantaged_pct": result.disadvantaged_pct,
|
|
"eal_pct": result.eal_pct,
|
|
"sen_support_pct": result.sen_support_pct,
|
|
"sen_ehcp_pct": result.sen_ehcp_pct,
|
|
"stability_pct": result.stability_pct,
|
|
# Gender
|
|
"rwm_expected_boys_pct": result.rwm_expected_boys_pct,
|
|
"rwm_expected_girls_pct": result.rwm_expected_girls_pct,
|
|
"rwm_high_boys_pct": result.rwm_high_boys_pct,
|
|
"rwm_high_girls_pct": result.rwm_high_girls_pct,
|
|
# Disadvantaged
|
|
"rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct,
|
|
"rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct,
|
|
"disadvantaged_gap": result.disadvantaged_gap,
|
|
# 3-Year
|
|
"rwm_expected_3yr_pct": result.rwm_expected_3yr_pct,
|
|
"reading_avg_3yr": result.reading_avg_3yr,
|
|
"maths_avg_3yr": result.maths_avg_3yr,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# LEGACY COMPATIBILITY - DataFrame-based functions
|
|
# =============================================================================
|
|
|
|
def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame:
|
|
"""
|
|
Load all school data as a pandas DataFrame.
|
|
For compatibility with existing code that expects DataFrames.
|
|
"""
|
|
close_db = db is None
|
|
if db is None:
|
|
db = get_db()
|
|
|
|
try:
|
|
# Query all schools with their results
|
|
schools = db.query(School).options(joinedload(School.results)).all()
|
|
|
|
rows = []
|
|
for school in schools:
|
|
for result in school.results:
|
|
row = {
|
|
"urn": school.urn,
|
|
"school_name": school.school_name,
|
|
"local_authority": school.local_authority,
|
|
"school_type": normalize_school_type(school.school_type),
|
|
"address": school.address,
|
|
"town": school.town,
|
|
"postcode": school.postcode,
|
|
"latitude": school.latitude,
|
|
"longitude": school.longitude,
|
|
**result_to_dict(result)
|
|
}
|
|
rows.append(row)
|
|
|
|
if rows:
|
|
return pd.DataFrame(rows)
|
|
return pd.DataFrame()
|
|
finally:
|
|
if close_db:
|
|
db.close()
|
|
|
|
|
|
# Cache for DataFrame (legacy compatibility)
|
|
_df_cache: Optional[pd.DataFrame] = None
|
|
|
|
|
|
def load_school_data() -> pd.DataFrame:
|
|
"""
|
|
Legacy function to load school data as DataFrame.
|
|
Uses caching for performance.
|
|
"""
|
|
global _df_cache
|
|
|
|
if _df_cache is not None:
|
|
return _df_cache
|
|
|
|
print("Loading school data from database...")
|
|
_df_cache = load_school_data_as_dataframe()
|
|
|
|
if not _df_cache.empty:
|
|
print(f"Total records loaded: {len(_df_cache)}")
|
|
print(f"Unique schools: {_df_cache['urn'].nunique()}")
|
|
print(f"Years: {sorted(_df_cache['year'].unique())}")
|
|
else:
|
|
print("No data found in database")
|
|
|
|
return _df_cache
|
|
|
|
|
|
def clear_cache():
|
|
"""Clear all caches."""
|
|
global _df_cache
|
|
_df_cache = None
|