Files
school_compare/backend/data_loader.py
T
Tudor Sitaru f05bbba613
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 21s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 50s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 12s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s
perf: resolve all P1–P5 performance issues from code review
P1 (backend/data_loader.py): Add load_latest_school_data() which pre-computes
the one-row-per-school latest-year snapshot (groupby, prev-year trend merge)
once at startup instead of on every /api/schools request. get_schools route
now starts from the cached snapshot rather than rebuilding it.

S3 (backend/app.py): Wrap synchronous geocode_single_postcode() call in
asyncio.to_thread() so postcode lookups no longer block the uvicorn event
loop. Admin reload endpoint also uses to_thread for both cache primes.

P2 (nextjs-app/components/HomeView.tsx): Add mapParamsRef guard so switching
back to map view does not re-fetch 500 schools when search params haven't
changed. Reset ref on new searches so fresh data is always fetched.

P3 (nextjs-app/lib/chartSetup.ts): Extract Chart.js registration into a
shared side-effect module. ComparisonChart and PerformanceChart now import
it instead of each calling ChartJS.register() independently.

P4 (backend/database.py): Remove unnecessary db.commit() from the read-only
get_db_session() context manager — saves a DB round-trip on every request.

P5 (backend/database.py): Add pool_recycle=1800 to SQLAlchemy engine to
prevent stale TCP connections from accumulating in long-running processes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 22:45:46 +01:00

520 lines
17 KiB
Python

"""
Data loading module — reads from marts.* tables built by dbt.
Provides efficient queries with caching.
"""
import pandas as pd
import numpy as np
from typing import Optional, Dict, Tuple, List
import requests
from sqlalchemy import text
from sqlalchemy.orm import Session
from .config import settings
from .database import SessionLocal, engine
from .models import (
DimSchool, DimLocation, KS2Performance,
FactOfstedInspection, FactParentView, FactAdmissions,
FactDeprivation, FactFinance,
)
from .schemas import SCHOOL_TYPE_MAP
_postcode_cache: Dict[str, Tuple[float, float]] = {}
_typesense_client = None
def _get_typesense_client():
global _typesense_client
if _typesense_client is not None:
return _typesense_client
url = settings.typesense_url
key = settings.typesense_api_key
if not url or not key:
return None
try:
import typesense
host = url.split("//")[-1]
host_part, _, port_str = host.partition(":")
port = int(port_str) if port_str else 8108
_typesense_client = typesense.Client({
"nodes": [{"host": host_part, "port": str(port), "protocol": "http"}],
"api_key": key,
"connection_timeout_seconds": 2,
})
return _typesense_client
except Exception:
return None
def search_schools_typesense(query: str, limit: int = 250) -> List[int]:
"""Search Typesense. Returns URNs in relevance order, or [] if unavailable."""
client = _get_typesense_client()
if client is None:
return []
try:
result = client.collections["schools"].documents.search({
"q": query,
"query_by": "school_name,local_authority,postcode",
"per_page": min(limit, 250),
"typo_tokens_threshold": 1,
})
return [int(h["document"]["urn"]) for h in result.get("hits", [])]
except Exception:
return []
def normalize_school_type(school_type: Optional[str]) -> Optional[str]:
"""Convert cryptic school type codes to user-friendly names."""
if not school_type:
return None
code = school_type.strip().upper()
if code in SCHOOL_TYPE_MAP:
return SCHOOL_TYPE_MAP[code]
return school_type
def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]:
"""Geocode a single postcode using postcodes.io API."""
if not postcode:
return None
postcode = postcode.strip().upper()
if postcode in _postcode_cache:
return _postcode_cache[postcode]
try:
response = requests.get(
f"https://api.postcodes.io/postcodes/{postcode}",
timeout=10,
)
if response.status_code == 200:
data = response.json()
if data.get("result"):
lat = data["result"].get("latitude")
lon = data["result"].get("longitude")
if lat and lon:
_postcode_cache[postcode] = (lat, lon)
return (lat, lon)
except Exception:
pass
return None
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate great-circle distance between two points (miles)."""
from math import radians, cos, sin, asin, sqrt
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
return 2 * asin(sqrt(a)) * 3956
# =============================================================================
# MAIN DATA LOAD — joins dim_school + dim_location + fact_performance
# fact_performance is a merged KS2+KS4 table (one row per URN per year).
# All-through schools have both KS2 and KS4 columns populated in the same row.
# =============================================================================
_MAIN_QUERY = text("""
SELECT
s.urn,
s.school_name,
s.phase,
s.school_type,
s.academy_trust_name AS trust_name,
s.academy_trust_uid AS trust_uid,
s.religious_character AS religious_denomination,
s.gender,
s.age_range,
s.admissions_policy,
s.capacity,
s.total_pupils AS gias_total_pupils,
s.headteacher_name,
s.website,
foi.ofsted_grade,
foi.ofsted_date,
foi.ofsted_framework,
l.local_authority_name AS local_authority,
l.local_authority_code,
l.address_line1 AS address1,
l.address_line2 AS address2,
l.town,
l.postcode,
l.latitude,
l.longitude,
p.year,
p.source_urn,
p.total_pupils,
p.eligible_pupils,
-- KS2 columns (NULL for pure secondary schools)
p.rwm_expected_pct,
p.rwm_high_pct,
p.reading_expected_pct,
p.reading_high_pct,
p.reading_avg_score,
p.reading_progress,
p.writing_expected_pct,
p.writing_high_pct,
p.writing_progress,
p.maths_expected_pct,
p.maths_high_pct,
p.maths_avg_score,
p.maths_progress,
p.gps_expected_pct,
p.gps_high_pct,
p.gps_avg_score,
p.science_expected_pct,
p.reading_absence_pct,
p.writing_absence_pct,
p.maths_absence_pct,
p.gps_absence_pct,
p.science_absence_pct,
p.rwm_expected_boys_pct,
p.rwm_high_boys_pct,
p.rwm_expected_girls_pct,
p.rwm_high_girls_pct,
p.rwm_expected_disadvantaged_pct,
p.rwm_expected_non_disadvantaged_pct,
p.disadvantaged_gap,
p.disadvantaged_pct,
p.eal_pct,
p.stability_pct,
-- KS4 columns (NULL for pure primary schools)
p.attainment_8_score,
p.progress_8_score,
p.progress_8_lower_ci,
p.progress_8_upper_ci,
p.progress_8_english,
p.progress_8_maths,
p.progress_8_ebacc,
p.progress_8_open,
p.english_maths_strong_pass_pct,
p.english_maths_standard_pass_pct,
p.ebacc_entry_pct,
p.ebacc_strong_pass_pct,
p.ebacc_standard_pass_pct,
p.ebacc_avg_score,
p.gcse_grade_91_pct,
p.prior_attainment_avg,
-- SEN (coalesced KS2+KS4 in fact_performance)
p.sen_support_pct,
p.sen_ehcp_pct
FROM marts.dim_school s
JOIN marts.dim_location l ON s.urn = l.urn
LEFT JOIN marts.fact_performance p ON s.urn = p.urn
LEFT JOIN (
SELECT DISTINCT ON (urn)
urn,
overall_effectiveness AS ofsted_grade,
inspection_date AS ofsted_date,
framework AS ofsted_framework
FROM marts.fact_ofsted_inspection
ORDER BY urn, inspection_date DESC NULLS LAST
) foi ON s.urn = foi.urn
ORDER BY s.school_name, p.year
""")
def load_school_data_as_dataframe() -> pd.DataFrame:
"""Load all school + KS2 data as a pandas DataFrame."""
try:
df = pd.read_sql(_MAIN_QUERY, engine)
except Exception as exc:
print(f"Warning: Could not load school data from marts: {exc}")
return pd.DataFrame()
if df.empty:
return df
# Build address string
df["address"] = df.apply(
lambda r: ", ".join(
p for p in [r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode")]
if p and str(p) != "None"
),
axis=1,
)
# Normalize school type
df["school_type"] = df["school_type"].apply(normalize_school_type)
return df
# Cache for DataFrame
_df_cache: Optional[pd.DataFrame] = None
# Pre-computed latest-year snapshot (one row per school, with prev-year trend columns)
_df_latest_cache: Optional[pd.DataFrame] = None
def load_school_data() -> pd.DataFrame:
"""Load school data with caching."""
global _df_cache
if _df_cache is not None:
return _df_cache
print("Loading school data from marts...")
_df_cache = load_school_data_as_dataframe()
if not _df_cache.empty:
print(f"Total records loaded: {len(_df_cache)}")
print(f"Unique schools: {_df_cache['urn'].nunique()}")
print(f"Years: {sorted(_df_cache['year'].dropna().unique())}")
else:
print("No data found in marts (EES data may not have been loaded yet)")
return _df_cache
def load_latest_school_data() -> pd.DataFrame:
"""Return a cached one-row-per-school DataFrame at the latest available year.
The expensive groupby / merge / prev-year trend computation runs once at
startup (or after a cache clear) rather than on every search request.
Per-request filters (phase, gender, LA …) should be applied to the returned
DataFrame's copy; they must NOT modify the cached object.
"""
global _df_latest_cache
if _df_latest_cache is not None:
return _df_latest_cache
df = load_school_data()
if df.empty:
return df
# Schools that have no performance rows (PRUs, new schools, etc.)
df_no_perf = df[df["year"].isna()].drop_duplicates(subset=["urn"])
df_with_perf = df[df["year"].notna()]
# Reduce to the latest year per school
latest_year = df_with_perf.groupby("urn")["year"].max().reset_index()
df_latest = df_with_perf.merge(latest_year, on=["urn", "year"])
# Attach previous-year metrics for trend arrows (second-latest year per school)
df_sorted = df_with_perf.sort_values(["urn", "year"], ascending=[True, False])
df_prev = df_sorted.groupby("urn").nth(1).reset_index()
if not df_prev.empty and "rwm_expected_pct" in df_prev.columns:
prev_rwm = df_prev[["urn", "rwm_expected_pct"]].rename(
columns={"rwm_expected_pct": "prev_rwm_expected_pct"}
)
if "attainment_8_score" in df_prev.columns:
prev_rwm = prev_rwm.merge(
df_prev[["urn", "attainment_8_score"]].rename(
columns={"attainment_8_score": "prev_attainment_8_score"}
),
on="urn",
how="outer",
)
df_latest = df_latest.merge(prev_rwm, on="urn", how="left")
# Merge back schools with no performance data
df_latest = pd.concat([df_latest, df_no_perf], ignore_index=True)
print(f"Latest-snapshot cache built: {len(df_latest)} schools")
_df_latest_cache = df_latest
return _df_latest_cache
def clear_cache():
"""Clear all caches."""
global _df_cache, _df_latest_cache
_df_cache = None
_df_latest_cache = None
# =============================================================================
# METADATA QUERIES
# =============================================================================
def get_available_years(db: Session = None) -> List[int]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = db.query(KS2Performance.year).distinct().order_by(KS2Performance.year).all()
return [r[0] for r in result]
except Exception:
return []
finally:
if close_db:
db.close()
def get_available_local_authorities(db: Session = None) -> List[str]:
close_db = db is None
if db is None:
db = SessionLocal()
try:
result = (
db.query(DimLocation.local_authority_name)
.filter(DimLocation.local_authority_name.isnot(None))
.distinct()
.order_by(DimLocation.local_authority_name)
.all()
)
return [r[0] for r in result if r[0]]
except Exception:
return []
finally:
if close_db:
db.close()
def get_schools_count(db: Session = None) -> int:
close_db = db is None
if db is None:
db = SessionLocal()
try:
return db.query(DimSchool).count()
except Exception:
return 0
finally:
if close_db:
db.close()
def get_data_info(db: Session = None) -> dict:
close_db = db is None
if db is None:
db = SessionLocal()
try:
school_count = get_schools_count(db)
years = get_available_years(db)
local_authorities = get_available_local_authorities(db)
return {
"total_schools": school_count,
"years_available": years,
"local_authorities_count": len(local_authorities),
"data_source": "PostgreSQL (marts)",
}
finally:
if close_db:
db.close()
# =============================================================================
# SUPPLEMENTARY DATA — per-school detail page
# =============================================================================
def get_supplementary_data(db: Session, urn: int) -> dict:
"""Fetch all supplementary data for a single school URN."""
result = {}
def safe_query(model, pk_field, latest_field=None):
try:
q = db.query(model).filter(getattr(model, pk_field) == urn)
if latest_field:
q = q.order_by(getattr(model, latest_field).desc())
return q.first()
except Exception as e:
import logging
logging.getLogger(__name__).error("safe_query failed for %s: %s", model.__name__, e)
db.rollback()
return None
# Latest Ofsted inspection
o = safe_query(FactOfstedInspection, "urn", "inspection_date")
result["ofsted"] = (
{
"framework": o.framework,
"inspection_date": o.inspection_date.isoformat() if o.inspection_date else None,
"inspection_type": o.inspection_type,
"overall_effectiveness": o.overall_effectiveness,
"quality_of_education": o.quality_of_education,
"behaviour_attitudes": o.behaviour_attitudes,
"personal_development": o.personal_development,
"leadership_management": o.leadership_management,
"early_years_provision": o.early_years_provision,
"sixth_form_provision": o.sixth_form_provision,
"previous_overall": None, # Not available in new schema
"rc_safeguarding_met": o.rc_safeguarding_met,
"rc_inclusion": o.rc_inclusion,
"rc_curriculum_teaching": o.rc_curriculum_teaching,
"rc_achievement": o.rc_achievement,
"rc_attendance_behaviour": o.rc_attendance_behaviour,
"rc_personal_development": o.rc_personal_development,
"rc_leadership_governance": o.rc_leadership_governance,
"rc_early_years": o.rc_early_years,
"rc_sixth_form": o.rc_sixth_form,
"report_url": o.report_url,
}
if o
else None
)
# Parent View
pv = safe_query(FactParentView, "urn")
result["parent_view"] = (
{
"survey_date": pv.survey_date.isoformat() if pv.survey_date else None,
"total_responses": pv.total_responses,
"q_happy_pct": pv.q_happy_pct,
"q_safe_pct": pv.q_safe_pct,
"q_behaviour_pct": pv.q_behaviour_pct,
"q_bullying_pct": pv.q_bullying_pct,
"q_communication_pct": pv.q_communication_pct,
"q_progress_pct": pv.q_progress_pct,
"q_teaching_pct": pv.q_teaching_pct,
"q_information_pct": pv.q_information_pct,
"q_curriculum_pct": pv.q_curriculum_pct,
"q_future_pct": pv.q_future_pct,
"q_leadership_pct": pv.q_leadership_pct,
"q_wellbeing_pct": pv.q_wellbeing_pct,
"q_recommend_pct": pv.q_recommend_pct,
}
if pv
else None
)
# Census (fact_pupil_characteristics — minimal until census columns are verified)
result["census"] = None
# Admissions (latest year)
a = safe_query(FactAdmissions, "urn", "year")
result["admissions"] = (
{
"year": a.year,
"school_phase": a.school_phase,
"places_offered": a.places_offered,
"total_applications": a.total_applications,
"first_preference_applications": a.first_preference_applications,
"first_preference_offers": a.first_preference_offers,
"first_preference_offer_pct": a.first_preference_offer_pct,
"oversubscription_ratio": a.oversubscription_ratio,
"oversubscribed": a.oversubscribed,
}
if a
else None
)
# SEN detail — not available in current marts
result["sen_detail"] = None
# Phonics — no school-level data on EES
result["phonics"] = None
# Deprivation
d = safe_query(FactDeprivation, "urn")
result["deprivation"] = (
{
"lsoa_code": d.lsoa_code,
"idaci_score": d.idaci_score,
"idaci_decile": d.idaci_decile,
}
if d
else None
)
# Finance (latest year)
f = safe_query(FactFinance, "urn", "year")
result["finance"] = (
{
"year": f.year,
"per_pupil_spend": f.per_pupil_spend,
"staff_cost_pct": f.staff_cost_pct,
"teacher_cost_pct": f.teacher_cost_pct,
"support_staff_cost_pct": f.support_staff_cost_pct,
"premises_cost_pct": f.premises_cost_pct,
}
if f
else None
)
return result