""" Data loading module — reads from marts.* tables built by dbt. Provides efficient queries with caching. """ import pandas as pd import numpy as np from typing import Optional, Dict, Tuple, List import requests from sqlalchemy import text from sqlalchemy.orm import Session from .config import settings from .database import SessionLocal, engine from .models import ( DimSchool, DimLocation, KS2Performance, FactOfstedInspection, FactParentView, FactAdmissions, FactDeprivation, FactFinance, ) from .schemas import SCHOOL_TYPE_MAP _postcode_cache: Dict[str, Tuple[float, float]] = {} _typesense_client = None def _get_typesense_client(): global _typesense_client if _typesense_client is not None: return _typesense_client url = settings.typesense_url key = settings.typesense_api_key if not url or not key: return None try: import typesense host = url.split("//")[-1] host_part, _, port_str = host.partition(":") port = int(port_str) if port_str else 8108 _typesense_client = typesense.Client({ "nodes": [{"host": host_part, "port": str(port), "protocol": "http"}], "api_key": key, "connection_timeout_seconds": 2, }) return _typesense_client except Exception: return None def search_schools_typesense(query: str, limit: int = 250) -> List[int]: """Search Typesense. Returns URNs in relevance order, or [] if unavailable.""" client = _get_typesense_client() if client is None: return [] try: result = client.collections["schools"].documents.search({ "q": query, "query_by": "school_name,local_authority,postcode", "per_page": min(limit, 250), "typo_tokens_threshold": 1, }) return [int(h["document"]["urn"]) for h in result.get("hits", [])] except Exception: return [] def normalize_school_type(school_type: Optional[str]) -> Optional[str]: """Convert cryptic school type codes to user-friendly names.""" if not school_type: return None code = school_type.strip().upper() if code in SCHOOL_TYPE_MAP: return SCHOOL_TYPE_MAP[code] return school_type def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: """Geocode a single postcode using postcodes.io API.""" if not postcode: return None postcode = postcode.strip().upper() if postcode in _postcode_cache: return _postcode_cache[postcode] try: response = requests.get( f"https://api.postcodes.io/postcodes/{postcode}", timeout=10, ) if response.status_code == 200: data = response.json() if data.get("result"): lat = data["result"].get("latitude") lon = data["result"].get("longitude") if lat and lon: _postcode_cache[postcode] = (lat, lon) return (lat, lon) except Exception: pass return None def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate great-circle distance between two points (miles).""" from math import radians, cos, sin, asin, sqrt lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 return 2 * asin(sqrt(a)) * 3956 # ============================================================================= # MAIN DATA LOAD — joins dim_school + dim_location + fact_ks2_performance # ============================================================================= _MAIN_QUERY = text(""" -- Branch 1: Primary schools (KS2 data; KS4 columns NULL) SELECT s.urn, s.school_name, s.phase, s.school_type, s.academy_trust_name AS trust_name, s.academy_trust_uid AS trust_uid, s.religious_character AS religious_denomination, s.gender, s.age_range, s.capacity, s.headteacher_name, s.website, s.ofsted_grade, s.ofsted_date, s.ofsted_framework, l.local_authority_name AS local_authority, l.local_authority_code, l.address_line1 AS address1, l.address_line2 AS address2, l.town, l.postcode, l.latitude, l.longitude, k.year, k.source_urn, k.total_pupils, k.eligible_pupils, -- KS2 columns k.rwm_expected_pct, k.rwm_high_pct, k.reading_expected_pct, k.reading_high_pct, k.reading_avg_score, k.reading_progress, k.writing_expected_pct, k.writing_high_pct, k.writing_progress, k.maths_expected_pct, k.maths_high_pct, k.maths_avg_score, k.maths_progress, k.gps_expected_pct, k.gps_high_pct, k.gps_avg_score, k.science_expected_pct, k.reading_absence_pct, k.writing_absence_pct, k.maths_absence_pct, k.gps_absence_pct, k.science_absence_pct, k.rwm_expected_boys_pct, k.rwm_high_boys_pct, k.rwm_expected_girls_pct, k.rwm_high_girls_pct, k.rwm_expected_disadvantaged_pct, k.rwm_expected_non_disadvantaged_pct, k.disadvantaged_gap, k.disadvantaged_pct, k.eal_pct, k.sen_support_pct, k.sen_ehcp_pct, k.stability_pct, -- KS4 columns (NULL for primary) NULL::numeric AS attainment_8_score, NULL::numeric AS progress_8_score, NULL::numeric AS progress_8_lower_ci, NULL::numeric AS progress_8_upper_ci, NULL::numeric AS progress_8_english, NULL::numeric AS progress_8_maths, NULL::numeric AS progress_8_ebacc, NULL::numeric AS progress_8_open, NULL::numeric AS english_maths_strong_pass_pct, NULL::numeric AS english_maths_standard_pass_pct, NULL::numeric AS ebacc_entry_pct, NULL::numeric AS ebacc_strong_pass_pct, NULL::numeric AS ebacc_standard_pass_pct, NULL::numeric AS ebacc_avg_score, NULL::numeric AS gcse_grade_91_pct, NULL::numeric AS prior_attainment_avg FROM marts.dim_school s JOIN marts.dim_location l ON s.urn = l.urn JOIN marts.fact_ks2_performance k ON s.urn = k.urn UNION ALL -- Branch 2: Secondary schools (KS4 data; KS2 columns NULL) SELECT s.urn, s.school_name, s.phase, s.school_type, s.academy_trust_name AS trust_name, s.academy_trust_uid AS trust_uid, s.religious_character AS religious_denomination, s.gender, s.age_range, s.capacity, s.headteacher_name, s.website, s.ofsted_grade, s.ofsted_date, s.ofsted_framework, l.local_authority_name AS local_authority, l.local_authority_code, l.address_line1 AS address1, l.address_line2 AS address2, l.town, l.postcode, l.latitude, l.longitude, k4.year, k4.source_urn, k4.total_pupils, k4.eligible_pupils, -- KS2 columns (NULL for secondary) NULL::numeric AS rwm_expected_pct, NULL::numeric AS rwm_high_pct, NULL::numeric AS reading_expected_pct, NULL::numeric AS reading_high_pct, NULL::numeric AS reading_avg_score, NULL::numeric AS reading_progress, NULL::numeric AS writing_expected_pct, NULL::numeric AS writing_high_pct, NULL::numeric AS writing_progress, NULL::numeric AS maths_expected_pct, NULL::numeric AS maths_high_pct, NULL::numeric AS maths_avg_score, NULL::numeric AS maths_progress, NULL::numeric AS gps_expected_pct, NULL::numeric AS gps_high_pct, NULL::numeric AS gps_avg_score, NULL::numeric AS science_expected_pct, NULL::numeric AS reading_absence_pct, NULL::numeric AS writing_absence_pct, NULL::numeric AS maths_absence_pct, NULL::numeric AS gps_absence_pct, NULL::numeric AS science_absence_pct, NULL::numeric AS rwm_expected_boys_pct, NULL::numeric AS rwm_high_boys_pct, NULL::numeric AS rwm_expected_girls_pct, NULL::numeric AS rwm_high_girls_pct, NULL::numeric AS rwm_expected_disadvantaged_pct, NULL::numeric AS rwm_expected_non_disadvantaged_pct, NULL::numeric AS disadvantaged_gap, NULL::numeric AS disadvantaged_pct, NULL::numeric AS eal_pct, k4.sen_support_pct, k4.sen_ehcp_pct, NULL::numeric AS stability_pct, -- KS4 columns k4.attainment_8_score, k4.progress_8_score, k4.progress_8_lower_ci, k4.progress_8_upper_ci, k4.progress_8_english, k4.progress_8_maths, k4.progress_8_ebacc, k4.progress_8_open, k4.english_maths_strong_pass_pct, k4.english_maths_standard_pass_pct, k4.ebacc_entry_pct, k4.ebacc_strong_pass_pct, k4.ebacc_standard_pass_pct, k4.ebacc_avg_score, k4.gcse_grade_91_pct, k4.prior_attainment_avg FROM marts.dim_school s JOIN marts.dim_location l ON s.urn = l.urn JOIN marts.fact_ks4_performance k4 ON s.urn = k4.urn ORDER BY school_name, year """) def load_school_data_as_dataframe() -> pd.DataFrame: """Load all school + KS2 data as a pandas DataFrame.""" try: df = pd.read_sql(_MAIN_QUERY, engine) except Exception as exc: print(f"Warning: Could not load school data from marts: {exc}") return pd.DataFrame() if df.empty: return df # Build address string df["address"] = df.apply( lambda r: ", ".join( p for p in [r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode")] if p and str(p) != "None" ), axis=1, ) # Normalize school type df["school_type"] = df["school_type"].apply(normalize_school_type) return df # Cache for DataFrame _df_cache: Optional[pd.DataFrame] = None def load_school_data() -> pd.DataFrame: """Load school data with caching.""" global _df_cache if _df_cache is not None: return _df_cache print("Loading school data from marts...") _df_cache = load_school_data_as_dataframe() if not _df_cache.empty: print(f"Total records loaded: {len(_df_cache)}") print(f"Unique schools: {_df_cache['urn'].nunique()}") print(f"Years: {sorted(_df_cache['year'].unique())}") else: print("No data found in marts (EES data may not have been loaded yet)") return _df_cache def clear_cache(): """Clear all caches.""" global _df_cache _df_cache = None # ============================================================================= # METADATA QUERIES # ============================================================================= def get_available_years(db: Session = None) -> List[int]: close_db = db is None if db is None: db = SessionLocal() try: result = db.query(KS2Performance.year).distinct().order_by(KS2Performance.year).all() return [r[0] for r in result] except Exception: return [] finally: if close_db: db.close() def get_available_local_authorities(db: Session = None) -> List[str]: close_db = db is None if db is None: db = SessionLocal() try: result = ( db.query(DimLocation.local_authority_name) .filter(DimLocation.local_authority_name.isnot(None)) .distinct() .order_by(DimLocation.local_authority_name) .all() ) return [r[0] for r in result if r[0]] except Exception: return [] finally: if close_db: db.close() def get_schools_count(db: Session = None) -> int: close_db = db is None if db is None: db = SessionLocal() try: return db.query(DimSchool).count() except Exception: return 0 finally: if close_db: db.close() def get_data_info(db: Session = None) -> dict: close_db = db is None if db is None: db = SessionLocal() try: school_count = get_schools_count(db) years = get_available_years(db) local_authorities = get_available_local_authorities(db) return { "total_schools": school_count, "years_available": years, "local_authorities_count": len(local_authorities), "data_source": "PostgreSQL (marts)", } finally: if close_db: db.close() # ============================================================================= # SUPPLEMENTARY DATA — per-school detail page # ============================================================================= def get_supplementary_data(db: Session, urn: int) -> dict: """Fetch all supplementary data for a single school URN.""" result = {} def safe_query(model, pk_field, latest_field=None): try: q = db.query(model).filter(getattr(model, pk_field) == urn) if latest_field: q = q.order_by(getattr(model, latest_field).desc()) return q.first() except Exception as e: import logging logging.getLogger(__name__).error("safe_query failed for %s: %s", model.__name__, e) db.rollback() return None # Latest Ofsted inspection o = safe_query(FactOfstedInspection, "urn", "inspection_date") result["ofsted"] = ( { "framework": o.framework, "inspection_date": o.inspection_date.isoformat() if o.inspection_date else None, "inspection_type": o.inspection_type, "overall_effectiveness": o.overall_effectiveness, "quality_of_education": o.quality_of_education, "behaviour_attitudes": o.behaviour_attitudes, "personal_development": o.personal_development, "leadership_management": o.leadership_management, "early_years_provision": o.early_years_provision, "sixth_form_provision": o.sixth_form_provision, "previous_overall": None, # Not available in new schema "rc_safeguarding_met": o.rc_safeguarding_met, "rc_inclusion": o.rc_inclusion, "rc_curriculum_teaching": o.rc_curriculum_teaching, "rc_achievement": o.rc_achievement, "rc_attendance_behaviour": o.rc_attendance_behaviour, "rc_personal_development": o.rc_personal_development, "rc_leadership_governance": o.rc_leadership_governance, "rc_early_years": o.rc_early_years, "rc_sixth_form": o.rc_sixth_form, "report_url": o.report_url, } if o else None ) # Parent View pv = safe_query(FactParentView, "urn") result["parent_view"] = ( { "survey_date": pv.survey_date.isoformat() if pv.survey_date else None, "total_responses": pv.total_responses, "q_happy_pct": pv.q_happy_pct, "q_safe_pct": pv.q_safe_pct, "q_behaviour_pct": pv.q_behaviour_pct, "q_bullying_pct": pv.q_bullying_pct, "q_communication_pct": pv.q_communication_pct, "q_progress_pct": pv.q_progress_pct, "q_teaching_pct": pv.q_teaching_pct, "q_information_pct": pv.q_information_pct, "q_curriculum_pct": pv.q_curriculum_pct, "q_future_pct": pv.q_future_pct, "q_leadership_pct": pv.q_leadership_pct, "q_wellbeing_pct": pv.q_wellbeing_pct, "q_recommend_pct": pv.q_recommend_pct, } if pv else None ) # Census (fact_pupil_characteristics — minimal until census columns are verified) result["census"] = None # Admissions (latest year) a = safe_query(FactAdmissions, "urn", "year") result["admissions"] = ( { "year": a.year, "school_phase": a.school_phase, "published_admission_number": a.published_admission_number, "total_applications": a.total_applications, "first_preference_applications": a.first_preference_applications, "first_preference_offers": a.first_preference_offers, "first_preference_offer_pct": a.first_preference_offer_pct, "oversubscribed": a.oversubscribed, } if a else None ) # SEN detail — not available in current marts result["sen_detail"] = None # Phonics — no school-level data on EES result["phonics"] = None # Deprivation d = safe_query(FactDeprivation, "urn") result["deprivation"] = ( { "lsoa_code": d.lsoa_code, "idaci_score": d.idaci_score, "idaci_decile": d.idaci_decile, } if d else None ) # Finance (latest year) f = safe_query(FactFinance, "urn", "year") result["finance"] = ( { "year": f.year, "per_pupil_spend": f.per_pupil_spend, "staff_cost_pct": f.staff_cost_pct, "teacher_cost_pct": f.teacher_cost_pct, "support_staff_cost_pct": f.support_staff_cost_pct, "premises_cost_pct": f.premises_cost_pct, } if f else None ) return result