""" Data loading module that queries from PostgreSQL database. Provides efficient queries with caching and lazy loading. Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py). Only user search postcodes are geocoded on-demand via geocode_single_postcode(). """ import pandas as pd import numpy as np from functools import lru_cache from typing import Optional, Dict, Tuple, List import requests from sqlalchemy import select, func, and_, or_ from sqlalchemy.orm import joinedload, Session from .config import settings from .database import SessionLocal, get_db_session from .models import ( School, SchoolResult, OfstedInspection, OfstedParentView, SchoolCensus, SchoolAdmissions, SenDetail, Phonics, SchoolDeprivation, SchoolFinance, ) from .schemas import SCHOOL_TYPE_MAP # Cache for user search postcode geocoding (not for school data) _postcode_cache: Dict[str, Tuple[float, float]] = {} def normalize_school_type(school_type: Optional[str]) -> Optional[str]: """Convert cryptic school type codes to user-friendly names.""" if not school_type: return None # Check if it's a code that needs mapping code = school_type.strip().upper() if code in SCHOOL_TYPE_MAP: return SCHOOL_TYPE_MAP[code] # Return original if already a friendly name or unknown code return school_type def get_school_type_codes_for_filter(school_type: str) -> List[str]: """Get all database codes that map to a given friendly name.""" if not school_type: return [] school_type_lower = school_type.lower() # Collect all codes that map to this friendly name codes = [] for code, friendly_name in SCHOOL_TYPE_MAP.items(): if friendly_name.lower() == school_type_lower: codes.append(code.lower()) # Also include the school_type itself (case-insensitive) in case it's stored as-is codes.append(school_type_lower) return codes def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: """Geocode a single postcode using postcodes.io API.""" if not postcode: return None postcode = postcode.strip().upper() # Check cache first if postcode in _postcode_cache: return _postcode_cache[postcode] try: response = requests.get( f'https://api.postcodes.io/postcodes/{postcode}', timeout=10 ) if response.status_code == 200: data = response.json() if data.get('result'): lat = data['result'].get('latitude') lon = data['result'].get('longitude') if lat and lon: _postcode_cache[postcode] = (lat, lon) return (lat, lon) except Exception: pass return None def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate the great circle distance between two points on Earth (in miles). """ from math import radians, cos, sin, asin, sqrt # Convert to radians lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) # Earth's radius in miles r = 3956 return c * r # ============================================================================= # DATABASE QUERY FUNCTIONS # ============================================================================= def get_db(): """Get a database session.""" return SessionLocal() def get_available_years(db: Session = None) -> List[int]: """Get list of available years in the database.""" close_db = db is None if db is None: db = get_db() try: result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all() return [r[0] for r in result] finally: if close_db: db.close() def get_available_local_authorities(db: Session = None) -> List[str]: """Get list of available local authorities.""" close_db = db is None if db is None: db = get_db() try: result = db.query(School.local_authority)\ .filter(School.local_authority.isnot(None))\ .distinct()\ .order_by(School.local_authority)\ .all() return [r[0] for r in result if r[0]] finally: if close_db: db.close() def get_available_school_types(db: Session = None) -> List[str]: """Get list of available school types (normalized to user-friendly names).""" close_db = db is None if db is None: db = get_db() try: result = db.query(School.school_type)\ .filter(School.school_type.isnot(None))\ .distinct()\ .all() # Normalize codes to friendly names and deduplicate normalized = set() for r in result: if r[0]: friendly_name = normalize_school_type(r[0]) if friendly_name: normalized.add(friendly_name) return sorted(normalized) finally: if close_db: db.close() def get_schools_count(db: Session = None) -> int: """Get total number of schools.""" close_db = db is None if db is None: db = get_db() try: return db.query(School).count() finally: if close_db: db.close() def get_schools( db: Session, search: Optional[str] = None, local_authority: Optional[str] = None, school_type: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> Tuple[List[School], int]: """ Get paginated list of schools with optional filters. Returns (schools, total_count). """ query = db.query(School) # Apply filters if search: search_lower = f"%{search.lower()}%" query = query.filter( or_( func.lower(School.school_name).like(search_lower), func.lower(School.postcode).like(search_lower), func.lower(School.town).like(search_lower), ) ) if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) if school_type: # Filter by all codes that map to this friendly name type_codes = get_school_type_codes_for_filter(school_type) if type_codes: query = query.filter(func.lower(School.school_type).in_(type_codes)) # Get total count total = query.count() # Apply pagination offset = (page - 1) * page_size schools = query.order_by(School.school_name).offset(offset).limit(page_size).all() return schools, total def get_schools_near_location( db: Session, latitude: float, longitude: float, radius_miles: float = 5.0, search: Optional[str] = None, local_authority: Optional[str] = None, school_type: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> Tuple[List[Tuple[School, float]], int]: """ Get schools near a location, sorted by distance. Returns list of (school, distance) tuples and total count. """ # Get all schools with coordinates query = db.query(School).filter( School.latitude.isnot(None), School.longitude.isnot(None) ) # Apply text filters if search: search_lower = f"%{search.lower()}%" query = query.filter( or_( func.lower(School.school_name).like(search_lower), func.lower(School.postcode).like(search_lower), func.lower(School.town).like(search_lower), ) ) if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) if school_type: # Filter by all codes that map to this friendly name type_codes = get_school_type_codes_for_filter(school_type) if type_codes: query = query.filter(func.lower(School.school_type).in_(type_codes)) # Get all matching schools and calculate distances all_schools = query.all() schools_with_distance = [] for school in all_schools: if school.latitude and school.longitude: dist = haversine_distance(latitude, longitude, school.latitude, school.longitude) if dist <= radius_miles: schools_with_distance.append((school, dist)) # Sort by distance schools_with_distance.sort(key=lambda x: x[1]) total = len(schools_with_distance) # Paginate offset = (page - 1) * page_size paginated = schools_with_distance[offset:offset + page_size] return paginated, total def get_school_by_urn(db: Session, urn: int) -> Optional[School]: """Get a single school by URN.""" return db.query(School).filter(School.urn == urn).first() def get_school_results( db: Session, urn: int, years: Optional[List[int]] = None ) -> List[SchoolResult]: """Get all results for a school, optionally filtered by years.""" query = db.query(SchoolResult)\ .join(School)\ .filter(School.urn == urn)\ .order_by(SchoolResult.year) if years: query = query.filter(SchoolResult.year.in_(years)) return query.all() def get_rankings( db: Session, metric: str, year: int, local_authority: Optional[str] = None, limit: int = 20, ascending: bool = False, ) -> List[Tuple[School, SchoolResult]]: """ Get school rankings for a specific metric and year. Returns list of (school, result) tuples. """ # Build the query query = db.query(School, SchoolResult)\ .join(SchoolResult)\ .filter(SchoolResult.year == year) # Filter by local authority if local_authority: query = query.filter(func.lower(School.local_authority) == local_authority.lower()) # Get the metric column metric_column = getattr(SchoolResult, metric, None) if metric_column is None: return [] # Filter out nulls and order query = query.filter(metric_column.isnot(None)) if ascending: query = query.order_by(metric_column.asc()) else: query = query.order_by(metric_column.desc()) return query.limit(limit).all() def get_data_info(db: Session = None) -> dict: """Get information about the data in the database.""" close_db = db is None if db is None: db = get_db() try: school_count = db.query(School).count() result_count = db.query(SchoolResult).count() years = get_available_years(db) local_authorities = get_available_local_authorities(db) return { "total_schools": school_count, "total_results": result_count, "years_available": years, "local_authorities_count": len(local_authorities), "data_source": "PostgreSQL", } finally: if close_db: db.close() def school_to_dict(school: School, include_results: bool = False) -> dict: """Convert a School model to dictionary.""" data = { "urn": school.urn, "school_name": school.school_name, "local_authority": school.local_authority, "school_type": normalize_school_type(school.school_type), "address": school.address, "town": school.town, "postcode": school.postcode, "latitude": school.latitude, "longitude": school.longitude, # GIAS fields "website": school.website, "headteacher_name": school.headteacher_name, "capacity": school.capacity, "trust_name": school.trust_name, "gender": school.gender, } if include_results and school.results: data["results"] = [result_to_dict(r) for r in school.results] return data def result_to_dict(result: SchoolResult) -> dict: """Convert a SchoolResult model to dictionary.""" return { "year": result.year, "total_pupils": result.total_pupils, "eligible_pupils": result.eligible_pupils, # Expected Standard "rwm_expected_pct": result.rwm_expected_pct, "reading_expected_pct": result.reading_expected_pct, "writing_expected_pct": result.writing_expected_pct, "maths_expected_pct": result.maths_expected_pct, "gps_expected_pct": result.gps_expected_pct, "science_expected_pct": result.science_expected_pct, # Higher Standard "rwm_high_pct": result.rwm_high_pct, "reading_high_pct": result.reading_high_pct, "writing_high_pct": result.writing_high_pct, "maths_high_pct": result.maths_high_pct, "gps_high_pct": result.gps_high_pct, # Progress "reading_progress": result.reading_progress, "writing_progress": result.writing_progress, "maths_progress": result.maths_progress, # Averages "reading_avg_score": result.reading_avg_score, "maths_avg_score": result.maths_avg_score, "gps_avg_score": result.gps_avg_score, # Context "disadvantaged_pct": result.disadvantaged_pct, "eal_pct": result.eal_pct, "sen_support_pct": result.sen_support_pct, "sen_ehcp_pct": result.sen_ehcp_pct, "stability_pct": result.stability_pct, # Gender "rwm_expected_boys_pct": result.rwm_expected_boys_pct, "rwm_expected_girls_pct": result.rwm_expected_girls_pct, "rwm_high_boys_pct": result.rwm_high_boys_pct, "rwm_high_girls_pct": result.rwm_high_girls_pct, # Disadvantaged "rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct, "rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct, "disadvantaged_gap": result.disadvantaged_gap, # 3-Year "rwm_expected_3yr_pct": result.rwm_expected_3yr_pct, "reading_avg_3yr": result.reading_avg_3yr, "maths_avg_3yr": result.maths_avg_3yr, } # ============================================================================= # LEGACY COMPATIBILITY - DataFrame-based functions # ============================================================================= def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame: """ Load all school data as a pandas DataFrame. For compatibility with existing code that expects DataFrames. """ close_db = db is None if db is None: db = get_db() try: # Query all schools with their results schools = db.query(School).options(joinedload(School.results)).all() # Load Ofsted data into a lookup dict (urn → grade, date) ofsted_lookup: Dict[int, dict] = {} try: ofsted_rows = db.query( OfstedInspection.urn, OfstedInspection.overall_effectiveness, OfstedInspection.inspection_date, ).all() for o in ofsted_rows: ofsted_lookup[o.urn] = { "ofsted_grade": o.overall_effectiveness, "ofsted_date": o.inspection_date.isoformat() if o.inspection_date else None, } except Exception: pass # Table may not exist yet on first run rows = [] for school in schools: ofsted = ofsted_lookup.get(school.urn, {}) for result in school.results: row = { "urn": school.urn, "school_name": school.school_name, "local_authority": school.local_authority, "school_type": normalize_school_type(school.school_type), "address": school.address, "town": school.town, "postcode": school.postcode, "latitude": school.latitude, "longitude": school.longitude, # GIAS fields "website": school.website, "headteacher_name": school.headteacher_name, "capacity": school.capacity, "trust_name": school.trust_name, "gender": school.gender, # Ofsted (for list view) "ofsted_grade": ofsted.get("ofsted_grade"), "ofsted_date": ofsted.get("ofsted_date"), **result_to_dict(result) } rows.append(row) if rows: return pd.DataFrame(rows) return pd.DataFrame() finally: if close_db: db.close() # Cache for DataFrame (legacy compatibility) _df_cache: Optional[pd.DataFrame] = None def load_school_data() -> pd.DataFrame: """ Legacy function to load school data as DataFrame. Uses caching for performance. """ global _df_cache if _df_cache is not None: return _df_cache print("Loading school data from database...") _df_cache = load_school_data_as_dataframe() if not _df_cache.empty: print(f"Total records loaded: {len(_df_cache)}") print(f"Unique schools: {_df_cache['urn'].nunique()}") print(f"Years: {sorted(_df_cache['year'].unique())}") else: print("No data found in database") return _df_cache def clear_cache(): """Clear all caches.""" global _df_cache _df_cache = None def get_supplementary_data(db: Session, urn: int) -> dict: """ Fetch all supplementary data for a single school URN. Returns a dict with keys: ofsted, parent_view, census, admissions, sen_detail, phonics, deprivation, finance. Values are dicts or None. """ result = {} def safe_query(model, pk_field, latest_year_field=None): try: if latest_year_field: row = ( db.query(model) .filter(getattr(model, pk_field) == urn) .order_by(getattr(model, latest_year_field).desc()) .first() ) else: row = db.query(model).filter(getattr(model, pk_field) == urn).first() return row except Exception: return None # Ofsted inspection o = safe_query(OfstedInspection, "urn") result["ofsted"] = { "framework": o.framework, "inspection_date": o.inspection_date.isoformat() if o.inspection_date else None, "inspection_type": o.inspection_type, # OEIF fields (old framework) "overall_effectiveness": o.overall_effectiveness, "quality_of_education": o.quality_of_education, "behaviour_attitudes": o.behaviour_attitudes, "personal_development": o.personal_development, "leadership_management": o.leadership_management, "early_years_provision": o.early_years_provision, "previous_overall": o.previous_overall, # Report Card fields (new framework, from Nov 2025) "rc_safeguarding_met": o.rc_safeguarding_met, "rc_inclusion": o.rc_inclusion, "rc_curriculum_teaching": o.rc_curriculum_teaching, "rc_achievement": o.rc_achievement, "rc_attendance_behaviour": o.rc_attendance_behaviour, "rc_personal_development": o.rc_personal_development, "rc_leadership_governance": o.rc_leadership_governance, "rc_early_years": o.rc_early_years, "rc_sixth_form": o.rc_sixth_form, } if o else None # Parent View pv = safe_query(OfstedParentView, "urn") result["parent_view"] = { "survey_date": pv.survey_date.isoformat() if pv.survey_date else None, "total_responses": pv.total_responses, "q_happy_pct": pv.q_happy_pct, "q_safe_pct": pv.q_safe_pct, "q_behaviour_pct": pv.q_behaviour_pct, "q_bullying_pct": pv.q_bullying_pct, "q_communication_pct": pv.q_communication_pct, "q_progress_pct": pv.q_progress_pct, "q_teaching_pct": pv.q_teaching_pct, "q_information_pct": pv.q_information_pct, "q_curriculum_pct": pv.q_curriculum_pct, "q_future_pct": pv.q_future_pct, "q_leadership_pct": pv.q_leadership_pct, "q_wellbeing_pct": pv.q_wellbeing_pct, "q_recommend_pct": pv.q_recommend_pct, "q_sen_pct": pv.q_sen_pct, } if pv else None # School Census (latest year) c = safe_query(SchoolCensus, "urn", "year") result["census"] = { "year": c.year, "class_size_avg": c.class_size_avg, "ethnicity_white_pct": c.ethnicity_white_pct, "ethnicity_asian_pct": c.ethnicity_asian_pct, "ethnicity_black_pct": c.ethnicity_black_pct, "ethnicity_mixed_pct": c.ethnicity_mixed_pct, "ethnicity_other_pct": c.ethnicity_other_pct, } if c else None # Admissions (latest year) a = safe_query(SchoolAdmissions, "urn", "year") result["admissions"] = { "year": a.year, "published_admission_number": a.published_admission_number, "total_applications": a.total_applications, "first_preference_offers_pct": a.first_preference_offers_pct, "oversubscribed": a.oversubscribed, } if a else None # SEN Detail (latest year) s = safe_query(SenDetail, "urn", "year") result["sen_detail"] = { "year": s.year, "primary_need_speech_pct": s.primary_need_speech_pct, "primary_need_autism_pct": s.primary_need_autism_pct, "primary_need_mld_pct": s.primary_need_mld_pct, "primary_need_spld_pct": s.primary_need_spld_pct, "primary_need_semh_pct": s.primary_need_semh_pct, "primary_need_physical_pct": s.primary_need_physical_pct, "primary_need_other_pct": s.primary_need_other_pct, } if s else None # Phonics (latest year) ph = safe_query(Phonics, "urn", "year") result["phonics"] = { "year": ph.year, "year1_phonics_pct": ph.year1_phonics_pct, "year2_phonics_pct": ph.year2_phonics_pct, } if ph else None # Deprivation d = safe_query(SchoolDeprivation, "urn") result["deprivation"] = { "lsoa_code": d.lsoa_code, "idaci_score": d.idaci_score, "idaci_decile": d.idaci_decile, } if d else None # Finance (latest year) f = safe_query(SchoolFinance, "urn", "year") result["finance"] = { "year": f.year, "per_pupil_spend": f.per_pupil_spend, "staff_cost_pct": f.staff_cost_pct, "teacher_cost_pct": f.teacher_cost_pct, "support_staff_cost_pct": f.support_staff_cost_pct, "premises_cost_pct": f.premises_cost_pct, } if f else None return result