""" Data loading module that queries from PostgreSQL database. Provides efficient queries with caching and lazy loading. Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py). Only user search postcodes are geocoded on-demand via geocode_single_postcode(). """ import pandas as pd import numpy as np from functools import lru_cache from typing import Optional, Dict, Tuple, List import requests from sqlalchemy import select, func, and_, or_ from sqlalchemy.orm import joinedload, Session from .config import settings from .database import SessionLocal, get_db_session from .models import School, SchoolResult from .schemas import SCHOOL_TYPE_MAP # Cache for user search postcode geocoding (not for school data) _postcode_cache: Dict[str, Tuple[float, float]] = {} def normalize_school_type(school_type: Optional[str]) -> Optional[str]: """Convert cryptic school type codes to user-friendly names.""" if not school_type: return None # Check if it's a code that needs mapping code = school_type.strip().upper() if code in SCHOOL_TYPE_MAP: return SCHOOL_TYPE_MAP[code] # Return original if already a friendly name or unknown code return school_type def get_school_type_codes_for_filter(school_type: str) -> List[str]: """Get all database codes that map to a given friendly name.""" if not school_type: return [] school_type_lower = school_type.lower() # Collect all codes that map to this friendly name codes = [] for code, friendly_name in SCHOOL_TYPE_MAP.items(): if friendly_name.lower() == school_type_lower: codes.append(code.lower()) # Also include the school_type itself (case-insensitive) in case it's stored as-is codes.append(school_type_lower) return codes def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: """Geocode a single postcode using postcodes.io API.""" if not postcode: return None postcode = postcode.strip().upper() # Check cache first if postcode in _postcode_cache: return _postcode_cache[postcode] try: response = requests.get( f'https://api.postcodes.io/postcodes/{postcode}', timeout=10 ) if response.status_code == 200: data = response.json() if data.get('result'): lat = data['result'].get('latitude') lon = data['result'].get('longitude') if lat and lon: _postcode_cache[postcode] = (lat, lon) return (lat, lon) except Exception: pass return None def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate the great circle distance between two points on Earth (in miles). """ from math import radians, cos, sin, asin, sqrt # Convert to radians lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) # Earth's radius in miles r = 3956 return c * r # ============================================================================= # DATABASE QUERY FUNCTIONS # ============================================================================= def get_db(): """Get a database session.""" return SessionLocal() def get_available_years(db: Session = None) -> List[int]: """Get list of available years in the database.""" close_db = db is None if db is None: db = get_db() try: result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all() return [r[0] for r in result] finally: if close_db: db.close() def get_available_local_authorities(db: Session = None) -> List[str]: """Get list of available local authorities.""" close_db = db is None if db is None: db = get_db() try: result = db.query(School.local_authority)\ .filter(School.local_authority.isnot(None))\ .distinct()\ .order_by(School.local_authority)\ .all() return [r[0] for r in result if r[0]] finally: if close_db: db.close() def get_available_school_types(db: Session = None) -> List[str]: """Get list of available school types (normalized to user-friendly names).""" close_db = db is None if db is None: db = get_db() try: result = db.query(School.school_type)\ .filter(School.school_type.isnot(None))\ .distinct()\ .all() # Normalize codes to friendly names and deduplicate normalized = set() for r in result: if r[0]: friendly_name = normalize_school_type(r[0]) if friendly_name: normalized.add(friendly_name) return sorted(normalized) finally: if close_db: db.close() def get_schools_count(db: Session = None) -> int: """Get total number of schools.""" close_db = db is None if db is None: db = get_db() try: return db.query(School).count() finally: if close_db: db.close() def get_schools( db: Session, search: Optional[str] = None, local_authority: Optional[str] = None, school_type: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> Tuple[List[School], int]: """ Get paginated list of schools with optional filters. Returns (schools, total_count). """ query = db.query(School) # Apply filters if search: search_lower = f"%{search.lower()}%" query = query.filter( or_( func.lower(School.school_name).like(search_lower), func.lower(School.postcode).like(search_lower), func.lower(School.town).like(search_lower), ) ) if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) if school_type: # Filter by all codes that map to this friendly name type_codes = get_school_type_codes_for_filter(school_type) if type_codes: query = query.filter(func.lower(School.school_type).in_(type_codes)) # Get total count total = query.count() # Apply pagination offset = (page - 1) * page_size schools = query.order_by(School.school_name).offset(offset).limit(page_size).all() return schools, total def get_schools_near_location( db: Session, latitude: float, longitude: float, radius_miles: float = 5.0, search: Optional[str] = None, local_authority: Optional[str] = None, school_type: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> Tuple[List[Tuple[School, float]], int]: """ Get schools near a location, sorted by distance. Returns list of (school, distance) tuples and total count. """ # Get all schools with coordinates query = db.query(School).filter( School.latitude.isnot(None), School.longitude.isnot(None) ) # Apply text filters if search: search_lower = f"%{search.lower()}%" query = query.filter( or_( func.lower(School.school_name).like(search_lower), func.lower(School.postcode).like(search_lower), func.lower(School.town).like(search_lower), ) ) if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) if school_type: # Filter by all codes that map to this friendly name type_codes = get_school_type_codes_for_filter(school_type) if type_codes: query = query.filter(func.lower(School.school_type).in_(type_codes)) # Get all matching schools and calculate distances all_schools = query.all() schools_with_distance = [] for school in all_schools: if school.latitude and school.longitude: dist = haversine_distance(latitude, longitude, school.latitude, school.longitude) if dist <= radius_miles: schools_with_distance.append((school, dist)) # Sort by distance schools_with_distance.sort(key=lambda x: x[1]) total = len(schools_with_distance) # Paginate offset = (page - 1) * page_size paginated = schools_with_distance[offset:offset + page_size] return paginated, total def get_school_by_urn(db: Session, urn: int) -> Optional[School]: """Get a single school by URN.""" return db.query(School).filter(School.urn == urn).first() def get_school_results( db: Session, urn: int, years: Optional[List[int]] = None ) -> List[SchoolResult]: """Get all results for a school, optionally filtered by years.""" query = db.query(SchoolResult)\ .join(School)\ .filter(School.urn == urn)\ .order_by(SchoolResult.year) if years: query = query.filter(SchoolResult.year.in_(years)) return query.all() def get_rankings( db: Session, metric: str, year: int, local_authority: Optional[str] = None, limit: int = 20, ascending: bool = False, ) -> List[Tuple[School, SchoolResult]]: """ Get school rankings for a specific metric and year. Returns list of (school, result) tuples. """ # Build the query query = db.query(School, SchoolResult)\ .join(SchoolResult)\ .filter(SchoolResult.year == year) # Filter by local authority if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) # Get the metric column metric_column = getattr(SchoolResult, metric, None) if metric_column is None: return [] # Filter out nulls and order query = query.filter(metric_column.isnot(None)) if ascending: query = query.order_by(metric_column.asc()) else: query = query.order_by(metric_column.desc()) return query.limit(limit).all() def get_data_info(db: Session = None) -> dict: """Get information about the data in the database.""" close_db = db is None if db is None: db = get_db() try: school_count = db.query(School).count() result_count = db.query(SchoolResult).count() years = get_available_years(db) local_authorities = get_available_local_authorities(db) return { "total_schools": school_count, "total_results": result_count, "years_available": years, "local_authorities_count": len(local_authorities), "data_source": "PostgreSQL", } finally: if close_db: db.close() def school_to_dict(school: School, include_results: bool = False) -> dict: """Convert a School model to dictionary.""" data = { "urn": school.urn, "school_name": school.school_name, "local_authority": school.local_authority, "school_type": normalize_school_type(school.school_type), "address": school.address, "town": school.town, "postcode": school.postcode, "latitude": school.latitude, "longitude": school.longitude, } if include_results and school.results: data["results"] = [result_to_dict(r) for r in school.results] return data def result_to_dict(result: SchoolResult) -> dict: """Convert a SchoolResult model to dictionary.""" return { "year": result.year, "total_pupils": result.total_pupils, "eligible_pupils": result.eligible_pupils, # Expected Standard "rwm_expected_pct": result.rwm_expected_pct, "reading_expected_pct": result.reading_expected_pct, "writing_expected_pct": result.writing_expected_pct, "maths_expected_pct": result.maths_expected_pct, "gps_expected_pct": result.gps_expected_pct, "science_expected_pct": result.science_expected_pct, # Higher Standard "rwm_high_pct": result.rwm_high_pct, "reading_high_pct": result.reading_high_pct, "writing_high_pct": result.writing_high_pct, "maths_high_pct": result.maths_high_pct, "gps_high_pct": result.gps_high_pct, # Progress "reading_progress": result.reading_progress, "writing_progress": result.writing_progress, "maths_progress": result.maths_progress, # Averages "reading_avg_score": result.reading_avg_score, "maths_avg_score": result.maths_avg_score, "gps_avg_score": result.gps_avg_score, # Context "disadvantaged_pct": result.disadvantaged_pct, "eal_pct": result.eal_pct, "sen_support_pct": result.sen_support_pct, "sen_ehcp_pct": result.sen_ehcp_pct, "stability_pct": result.stability_pct, # Gender "rwm_expected_boys_pct": result.rwm_expected_boys_pct, "rwm_expected_girls_pct": result.rwm_expected_girls_pct, "rwm_high_boys_pct": result.rwm_high_boys_pct, "rwm_high_girls_pct": result.rwm_high_girls_pct, # Disadvantaged "rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct, "rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct, "disadvantaged_gap": result.disadvantaged_gap, # 3-Year "rwm_expected_3yr_pct": result.rwm_expected_3yr_pct, "reading_avg_3yr": result.reading_avg_3yr, "maths_avg_3yr": result.maths_avg_3yr, } # ============================================================================= # LEGACY COMPATIBILITY - DataFrame-based functions # ============================================================================= def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame: """ Load all school data as a pandas DataFrame. For compatibility with existing code that expects DataFrames. """ close_db = db is None if db is None: db = get_db() try: # Query all schools with their results schools = db.query(School).options(joinedload(School.results)).all() rows = [] for school in schools: for result in school.results: row = { "urn": school.urn, "school_name": school.school_name, "local_authority": school.local_authority, "school_type": normalize_school_type(school.school_type), "address": school.address, "town": school.town, "postcode": school.postcode, "latitude": school.latitude, "longitude": school.longitude, **result_to_dict(result) } rows.append(row) if rows: return pd.DataFrame(rows) return pd.DataFrame() finally: if close_db: db.close() # Cache for DataFrame (legacy compatibility) _df_cache: Optional[pd.DataFrame] = None def load_school_data() -> pd.DataFrame: """ Legacy function to load school data as DataFrame. Uses caching for performance. """ global _df_cache if _df_cache is not None: return _df_cache print("Loading school data from database...") _df_cache = load_school_data_as_dataframe() if not _df_cache.empty: print(f"Total records loaded: {len(_df_cache)}") print(f"Unique schools: {_df_cache['urn'].nunique()}") print(f"Years: {sorted(_df_cache['year'].unique())}") else: print("No data found in database") return _df_cache def clear_cache(): """Clear all caches.""" global _df_cache _df_cache = None