diff --git a/backend/app.py b/backend/app.py index c2e3de4..541de79 100644 --- a/backend/app.py +++ b/backend/app.py @@ -28,8 +28,6 @@ from .data_loader import ( get_supplementary_data, ) from .data_loader import get_data_info as get_db_info -from .database import check_and_migrate_if_needed -from .migration import run_full_migration from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS from .utils import clean_for_json @@ -138,20 +136,15 @@ def validate_postcode(postcode: Optional[str]) -> Optional[str]: @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan - startup and shutdown events.""" - # Startup: check schema version and migrate if needed - print("Starting up: Checking database schema...") - check_and_migrate_if_needed() - - print("Loading school data from database...") + print("Loading school data from marts...") df = load_school_data() if df.empty: - print("Warning: No data in database. Check CSV files in data/ folder.") + print("Warning: No data in marts. Run the annual EES pipeline to populate KS2 data.") else: print(f"Data loaded successfully: {len(df)} records.") - yield # Application runs here + yield - # Shutdown: cleanup if needed print("Shutting down...") @@ -585,7 +578,7 @@ async def get_data_info(request: Request): if db_info["total_schools"] == 0: return { "status": "no_data", - "message": "No data in database. Run the migration script: python scripts/migrate_csv_to_db.py", + "message": "No data in marts. Run the annual EES pipeline to load KS2 data.", "data_source": "PostgreSQL", } @@ -635,56 +628,6 @@ async def reload_data( return {"status": "reloaded"} -_reimport_status: dict = {"running": False, "done": False, "error": None} - - -@app.post("/api/admin/reimport-ks2") -@limiter.limit("2/minute") -async def reimport_ks2( - request: Request, - geocode: bool = True, - _: bool = Depends(verify_admin_api_key) -): - """ - Start a full KS2 CSV migration in the background and return immediately. - Poll GET /api/admin/reimport-ks2/status to check progress. - Pass ?geocode=false to skip postcode → lat/lng resolution. - Requires X-API-Key header with valid admin API key. - """ - global _reimport_status - if _reimport_status["running"]: - return {"status": "already_running"} - - _reimport_status = {"running": True, "done": False, "error": None} - - def _run(): - global _reimport_status - try: - success = run_full_migration(geocode=geocode) - if not success: - _reimport_status = {"running": False, "done": False, "error": "No CSV data found"} - return - clear_cache() - load_school_data() - _reimport_status = {"running": False, "done": True, "error": None} - except Exception as exc: - _reimport_status = {"running": False, "done": False, "error": str(exc)} - - import threading - threading.Thread(target=_run, daemon=True).start() - return {"status": "started"} - - -@app.get("/api/admin/reimport-ks2/status") -async def reimport_ks2_status( - request: Request, - _: bool = Depends(verify_admin_api_key) -): - """Poll this endpoint to check reimport progress.""" - s = _reimport_status - if s["error"]: - raise HTTPException(status_code=500, detail=s["error"]) - return {"running": s["running"], "done": s["done"]} # ============================================================================= diff --git a/backend/data_loader.py b/backend/data_loader.py index 2a372ed..35b8617 100644 --- a/backend/data_loader.py +++ b/backend/data_loader.py @@ -1,29 +1,24 @@ """ -Data loading module that queries from PostgreSQL database. -Provides efficient queries with caching and lazy loading. - -Note: School geocoding is handled by a separate cron job (scripts/geocode_schools.py). -Only user search postcodes are geocoded on-demand via geocode_single_postcode(). +Data loading module — reads from marts.* tables built by dbt. +Provides efficient queries with caching. """ import pandas as pd import numpy as np -from functools import lru_cache from typing import Optional, Dict, Tuple, List import requests -from sqlalchemy import select, func, and_, or_ -from sqlalchemy.orm import joinedload, Session +from sqlalchemy import text +from sqlalchemy.orm import Session from .config import settings -from .database import SessionLocal, get_db_session +from .database import SessionLocal, engine from .models import ( - School, SchoolResult, - OfstedInspection, OfstedParentView, SchoolCensus, - SchoolAdmissions, SenDetail, Phonics, SchoolDeprivation, SchoolFinance, + DimSchool, DimLocation, KS2Performance, + FactOfstedInspection, FactParentView, FactAdmissions, + FactDeprivation, FactFinance, ) from .schemas import SCHOOL_TYPE_MAP -# Cache for user search postcode geocoding (not for school data) _postcode_cache: Dict[str, Tuple[float, float]] = {} @@ -31,515 +26,165 @@ def normalize_school_type(school_type: Optional[str]) -> Optional[str]: """Convert cryptic school type codes to user-friendly names.""" if not school_type: return None - # Check if it's a code that needs mapping code = school_type.strip().upper() if code in SCHOOL_TYPE_MAP: return SCHOOL_TYPE_MAP[code] - # Return original if already a friendly name or unknown code return school_type -def get_school_type_codes_for_filter(school_type: str) -> List[str]: - """Get all database codes that map to a given friendly name.""" - if not school_type: - return [] - school_type_lower = school_type.lower() - # Collect all codes that map to this friendly name - codes = [] - for code, friendly_name in SCHOOL_TYPE_MAP.items(): - if friendly_name.lower() == school_type_lower: - codes.append(code.lower()) - # Also include the school_type itself (case-insensitive) in case it's stored as-is - codes.append(school_type_lower) - return codes - - def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: """Geocode a single postcode using postcodes.io API.""" if not postcode: return None - postcode = postcode.strip().upper() - - # Check cache first if postcode in _postcode_cache: return _postcode_cache[postcode] - try: response = requests.get( - f'https://api.postcodes.io/postcodes/{postcode}', - timeout=10 + f"https://api.postcodes.io/postcodes/{postcode}", + timeout=10, ) if response.status_code == 200: data = response.json() - if data.get('result'): - lat = data['result'].get('latitude') - lon = data['result'].get('longitude') + if data.get("result"): + lat = data["result"].get("latitude") + lon = data["result"].get("longitude") if lat and lon: _postcode_cache[postcode] = (lat, lon) return (lat, lon) except Exception: pass - return None def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: - """ - Calculate the great circle distance between two points on Earth (in miles). - """ + """Calculate great-circle distance between two points (miles).""" from math import radians, cos, sin, asin, sqrt - - # Convert to radians lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) - - # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 - a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 - c = 2 * asin(sqrt(a)) - - # Earth's radius in miles - r = 3956 - - return c * r + a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 + return 2 * asin(sqrt(a)) * 3956 # ============================================================================= -# DATABASE QUERY FUNCTIONS +# MAIN DATA LOAD — joins dim_school + dim_location + fact_ks2_performance # ============================================================================= -def get_db(): - """Get a database session.""" - return SessionLocal() +_MAIN_QUERY = text(""" + SELECT + s.urn, + s.school_name, + s.phase, + s.school_type, + s.academy_trust_name AS trust_name, + s.academy_trust_uid AS trust_uid, + s.religious_character AS religious_denomination, + s.gender, + s.age_range, + s.capacity, + s.headteacher_name, + s.website, + s.ofsted_grade, + s.ofsted_date, + s.ofsted_framework, + l.local_authority_name AS local_authority, + l.local_authority_code, + l.address_line1 AS address1, + l.address_line2 AS address2, + l.town, + l.postcode, + l.latitude, + l.longitude, + -- KS2 performance + k.year, + k.source_urn, + k.total_pupils, + k.eligible_pupils, + k.rwm_expected_pct, + k.rwm_high_pct, + k.reading_expected_pct, + k.reading_high_pct, + k.reading_avg_score, + k.reading_progress, + k.writing_expected_pct, + k.writing_high_pct, + k.writing_progress, + k.maths_expected_pct, + k.maths_high_pct, + k.maths_avg_score, + k.maths_progress, + k.gps_expected_pct, + k.gps_high_pct, + k.gps_avg_score, + k.science_expected_pct, + k.reading_absence_pct, + k.writing_absence_pct, + k.maths_absence_pct, + k.gps_absence_pct, + k.science_absence_pct, + k.rwm_expected_boys_pct, + k.rwm_high_boys_pct, + k.rwm_expected_girls_pct, + k.rwm_high_girls_pct, + k.rwm_expected_disadvantaged_pct, + k.rwm_expected_non_disadvantaged_pct, + k.disadvantaged_gap, + k.disadvantaged_pct, + k.eal_pct, + k.sen_support_pct, + k.sen_ehcp_pct, + k.stability_pct + FROM marts.dim_school s + JOIN marts.dim_location l ON s.urn = l.urn + JOIN marts.fact_ks2_performance k ON s.urn = k.urn + ORDER BY s.school_name, k.year +""") -def get_available_years(db: Session = None) -> List[int]: - """Get list of available years in the database.""" - close_db = db is None - if db is None: - db = get_db() - +def load_school_data_as_dataframe() -> pd.DataFrame: + """Load all school + KS2 data as a pandas DataFrame.""" try: - result = db.query(SchoolResult.year).distinct().order_by(SchoolResult.year).all() - return [r[0] for r in result] - finally: - if close_db: - db.close() - - -def get_available_local_authorities(db: Session = None) -> List[str]: - """Get list of available local authorities.""" - close_db = db is None - if db is None: - db = get_db() - - try: - result = db.query(School.local_authority)\ - .filter(School.local_authority.isnot(None))\ - .distinct()\ - .order_by(School.local_authority)\ - .all() - return [r[0] for r in result if r[0]] - finally: - if close_db: - db.close() - - -def get_available_school_types(db: Session = None) -> List[str]: - """Get list of available school types (normalized to user-friendly names).""" - close_db = db is None - if db is None: - db = get_db() - - try: - result = db.query(School.school_type)\ - .filter(School.school_type.isnot(None))\ - .distinct()\ - .all() - # Normalize codes to friendly names and deduplicate - normalized = set() - for r in result: - if r[0]: - friendly_name = normalize_school_type(r[0]) - if friendly_name: - normalized.add(friendly_name) - return sorted(normalized) - finally: - if close_db: - db.close() - - -def get_schools_count(db: Session = None) -> int: - """Get total number of schools.""" - close_db = db is None - if db is None: - db = get_db() - - try: - return db.query(School).count() - finally: - if close_db: - db.close() - - -def get_schools( - db: Session, - search: Optional[str] = None, - local_authority: Optional[str] = None, - school_type: Optional[str] = None, - page: int = 1, - page_size: int = 50, -) -> Tuple[List[School], int]: - """ - Get paginated list of schools with optional filters. - Returns (schools, total_count). - """ - query = db.query(School) - - # Apply filters - if search: - search_lower = f"%{search.lower()}%" - query = query.filter( - or_( - func.lower(School.school_name).like(search_lower), - func.lower(School.postcode).like(search_lower), - func.lower(School.town).like(search_lower), - ) - ) - - if local_authority: - query = query.filter(func.lower(School.local_authority) == local_authority.lower()) - - if school_type: - # Filter by all codes that map to this friendly name - type_codes = get_school_type_codes_for_filter(school_type) - if type_codes: - query = query.filter(func.lower(School.school_type).in_(type_codes)) - - # Get total count - total = query.count() - - # Apply pagination - offset = (page - 1) * page_size - schools = query.order_by(School.school_name).offset(offset).limit(page_size).all() - - return schools, total - - -def get_schools_near_location( - db: Session, - latitude: float, - longitude: float, - radius_miles: float = 5.0, - search: Optional[str] = None, - local_authority: Optional[str] = None, - school_type: Optional[str] = None, - page: int = 1, - page_size: int = 50, -) -> Tuple[List[Tuple[School, float]], int]: - """ - Get schools near a location, sorted by distance. - Returns list of (school, distance) tuples and total count. - """ - # Get all schools with coordinates - query = db.query(School).filter( - School.latitude.isnot(None), - School.longitude.isnot(None) - ) - - # Apply text filters - if search: - search_lower = f"%{search.lower()}%" - query = query.filter( - or_( - func.lower(School.school_name).like(search_lower), - func.lower(School.postcode).like(search_lower), - func.lower(School.town).like(search_lower), - ) - ) - - if local_authority: - query = query.filter(func.lower(School.local_authority) == local_authority.lower()) - - if school_type: - # Filter by all codes that map to this friendly name - type_codes = get_school_type_codes_for_filter(school_type) - if type_codes: - query = query.filter(func.lower(School.school_type).in_(type_codes)) - - # Get all matching schools and calculate distances - all_schools = query.all() - - schools_with_distance = [] - for school in all_schools: - if school.latitude and school.longitude: - dist = haversine_distance(latitude, longitude, school.latitude, school.longitude) - if dist <= radius_miles: - schools_with_distance.append((school, dist)) - - # Sort by distance - schools_with_distance.sort(key=lambda x: x[1]) - - total = len(schools_with_distance) - - # Paginate - offset = (page - 1) * page_size - paginated = schools_with_distance[offset:offset + page_size] - - return paginated, total - - -def get_school_by_urn(db: Session, urn: int) -> Optional[School]: - """Get a single school by URN.""" - return db.query(School).filter(School.urn == urn).first() - - -def get_school_results( - db: Session, - urn: int, - years: Optional[List[int]] = None -) -> List[SchoolResult]: - """Get all results for a school, optionally filtered by years.""" - query = db.query(SchoolResult)\ - .join(School)\ - .filter(School.urn == urn)\ - .order_by(SchoolResult.year) - - if years: - query = query.filter(SchoolResult.year.in_(years)) - - return query.all() - - -def get_rankings( - db: Session, - metric: str, - year: int, - local_authority: Optional[str] = None, - limit: int = 20, - ascending: bool = False, -) -> List[Tuple[School, SchoolResult]]: - """ - Get school rankings for a specific metric and year. - Returns list of (school, result) tuples. - """ - # Build the query - query = db.query(School, SchoolResult)\ - .join(SchoolResult)\ - .filter(SchoolResult.year == year) - - # Filter by local authority - if local_authority: - query = query.filter(func.lower(School.local_authority) == local_authority.lower()) - - # Get the metric column - metric_column = getattr(SchoolResult, metric, None) - if metric_column is None: - return [] - - # Filter out nulls and order - query = query.filter(metric_column.isnot(None)) - - if ascending: - query = query.order_by(metric_column.asc()) - else: - query = query.order_by(metric_column.desc()) - - return query.limit(limit).all() - - -def get_data_info(db: Session = None) -> dict: - """Get information about the data in the database.""" - close_db = db is None - if db is None: - db = get_db() - - try: - school_count = db.query(School).count() - result_count = db.query(SchoolResult).count() - years = get_available_years(db) - local_authorities = get_available_local_authorities(db) - - return { - "total_schools": school_count, - "total_results": result_count, - "years_available": years, - "local_authorities_count": len(local_authorities), - "data_source": "PostgreSQL", - } - finally: - if close_db: - db.close() - - -def school_to_dict(school: School, include_results: bool = False) -> dict: - """Convert a School model to dictionary.""" - data = { - "urn": school.urn, - "school_name": school.school_name, - "local_authority": school.local_authority, - "school_type": normalize_school_type(school.school_type), - "address": school.address, - "town": school.town, - "postcode": school.postcode, - "latitude": school.latitude, - "longitude": school.longitude, - # GIAS fields - "website": school.website, - "headteacher_name": school.headteacher_name, - "capacity": school.capacity, - "trust_name": school.trust_name, - "gender": school.gender, - } - - if include_results and school.results: - data["results"] = [result_to_dict(r) for r in school.results] - - return data - - -def result_to_dict(result: SchoolResult) -> dict: - """Convert a SchoolResult model to dictionary.""" - return { - "year": result.year, - "total_pupils": result.total_pupils, - "eligible_pupils": result.eligible_pupils, - # Expected Standard - "rwm_expected_pct": result.rwm_expected_pct, - "reading_expected_pct": result.reading_expected_pct, - "writing_expected_pct": result.writing_expected_pct, - "maths_expected_pct": result.maths_expected_pct, - "gps_expected_pct": result.gps_expected_pct, - "science_expected_pct": result.science_expected_pct, - # Higher Standard - "rwm_high_pct": result.rwm_high_pct, - "reading_high_pct": result.reading_high_pct, - "writing_high_pct": result.writing_high_pct, - "maths_high_pct": result.maths_high_pct, - "gps_high_pct": result.gps_high_pct, - # Progress - "reading_progress": result.reading_progress, - "writing_progress": result.writing_progress, - "maths_progress": result.maths_progress, - # Averages - "reading_avg_score": result.reading_avg_score, - "maths_avg_score": result.maths_avg_score, - "gps_avg_score": result.gps_avg_score, - # Context - "disadvantaged_pct": result.disadvantaged_pct, - "eal_pct": result.eal_pct, - "sen_support_pct": result.sen_support_pct, - "sen_ehcp_pct": result.sen_ehcp_pct, - "stability_pct": result.stability_pct, - # Gender - "rwm_expected_boys_pct": result.rwm_expected_boys_pct, - "rwm_expected_girls_pct": result.rwm_expected_girls_pct, - "rwm_high_boys_pct": result.rwm_high_boys_pct, - "rwm_high_girls_pct": result.rwm_high_girls_pct, - # Disadvantaged - "rwm_expected_disadvantaged_pct": result.rwm_expected_disadvantaged_pct, - "rwm_expected_non_disadvantaged_pct": result.rwm_expected_non_disadvantaged_pct, - "disadvantaged_gap": result.disadvantaged_gap, - # 3-Year - "rwm_expected_3yr_pct": result.rwm_expected_3yr_pct, - "reading_avg_3yr": result.reading_avg_3yr, - "maths_avg_3yr": result.maths_avg_3yr, - } - - -# ============================================================================= -# LEGACY COMPATIBILITY - DataFrame-based functions -# ============================================================================= - -def load_school_data_as_dataframe(db: Session = None) -> pd.DataFrame: - """ - Load all school data as a pandas DataFrame. - For compatibility with existing code that expects DataFrames. - """ - close_db = db is None - if db is None: - db = get_db() - - try: - # Query all schools with their results - schools = db.query(School).options(joinedload(School.results)).all() - - # Load Ofsted data into a lookup dict (urn → grade, date) - ofsted_lookup: Dict[int, dict] = {} - try: - ofsted_rows = db.query( - OfstedInspection.urn, - OfstedInspection.overall_effectiveness, - OfstedInspection.inspection_date, - ).all() - for o in ofsted_rows: - ofsted_lookup[o.urn] = { - "ofsted_grade": o.overall_effectiveness, - "ofsted_date": o.inspection_date.isoformat() if o.inspection_date else None, - } - except Exception: - pass # Table may not exist yet on first run - - rows = [] - for school in schools: - ofsted = ofsted_lookup.get(school.urn, {}) - for result in school.results: - row = { - "urn": school.urn, - "school_name": school.school_name, - "local_authority": school.local_authority, - "school_type": normalize_school_type(school.school_type), - "address": school.address, - "town": school.town, - "postcode": school.postcode, - "latitude": school.latitude, - "longitude": school.longitude, - # GIAS fields - "website": school.website, - "headteacher_name": school.headteacher_name, - "capacity": school.capacity, - "trust_name": school.trust_name, - "gender": school.gender, - # Ofsted (for list view) - "ofsted_grade": ofsted.get("ofsted_grade"), - "ofsted_date": ofsted.get("ofsted_date"), - **result_to_dict(result) - } - rows.append(row) - - if rows: - return pd.DataFrame(rows) + df = pd.read_sql(_MAIN_QUERY, engine) + except Exception as exc: + print(f"Warning: Could not load school data from marts: {exc}") return pd.DataFrame() - finally: - if close_db: - db.close() + + if df.empty: + return df + + # Build address string + df["address"] = df.apply( + lambda r: ", ".join( + p for p in [r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode")] + if p and str(p) != "None" + ), + axis=1, + ) + + # Normalize school type + df["school_type"] = df["school_type"].apply(normalize_school_type) + + return df -# Cache for DataFrame (legacy compatibility) +# Cache for DataFrame _df_cache: Optional[pd.DataFrame] = None def load_school_data() -> pd.DataFrame: - """ - Legacy function to load school data as DataFrame. - Uses caching for performance. - """ + """Load school data with caching.""" global _df_cache - if _df_cache is not None: return _df_cache - - print("Loading school data from database...") + print("Loading school data from marts...") _df_cache = load_school_data_as_dataframe() - if not _df_cache.empty: print(f"Total records loaded: {len(_df_cache)}") print(f"Unique schools: {_df_cache['urn'].nunique()}") print(f"Years: {sorted(_df_cache['year'].unique())}") else: - print("No data found in database") - + print("No data found in marts (EES data may not have been loaded yet)") return _df_cache @@ -549,136 +194,198 @@ def clear_cache(): _df_cache = None +# ============================================================================= +# METADATA QUERIES +# ============================================================================= + +def get_available_years(db: Session = None) -> List[int]: + close_db = db is None + if db is None: + db = SessionLocal() + try: + result = db.query(KS2Performance.year).distinct().order_by(KS2Performance.year).all() + return [r[0] for r in result] + except Exception: + return [] + finally: + if close_db: + db.close() + + +def get_available_local_authorities(db: Session = None) -> List[str]: + close_db = db is None + if db is None: + db = SessionLocal() + try: + result = ( + db.query(DimLocation.local_authority_name) + .filter(DimLocation.local_authority_name.isnot(None)) + .distinct() + .order_by(DimLocation.local_authority_name) + .all() + ) + return [r[0] for r in result if r[0]] + except Exception: + return [] + finally: + if close_db: + db.close() + + +def get_schools_count(db: Session = None) -> int: + close_db = db is None + if db is None: + db = SessionLocal() + try: + return db.query(DimSchool).count() + except Exception: + return 0 + finally: + if close_db: + db.close() + + +def get_data_info(db: Session = None) -> dict: + close_db = db is None + if db is None: + db = SessionLocal() + try: + school_count = get_schools_count(db) + years = get_available_years(db) + local_authorities = get_available_local_authorities(db) + return { + "total_schools": school_count, + "years_available": years, + "local_authorities_count": len(local_authorities), + "data_source": "PostgreSQL (marts)", + } + finally: + if close_db: + db.close() + + +# ============================================================================= +# SUPPLEMENTARY DATA — per-school detail page +# ============================================================================= + def get_supplementary_data(db: Session, urn: int) -> dict: - """ - Fetch all supplementary data for a single school URN. - Returns a dict with keys: ofsted, parent_view, census, admissions, sen_detail, - phonics, deprivation, finance. Values are dicts or None. - """ + """Fetch all supplementary data for a single school URN.""" result = {} - def safe_query(model, pk_field, latest_year_field=None): + def safe_query(model, pk_field, latest_field=None): try: - if latest_year_field: - row = ( - db.query(model) - .filter(getattr(model, pk_field) == urn) - .order_by(getattr(model, latest_year_field).desc()) - .first() - ) - else: - row = db.query(model).filter(getattr(model, pk_field) == urn).first() - return row + q = db.query(model).filter(getattr(model, pk_field) == urn) + if latest_field: + q = q.order_by(getattr(model, latest_field).desc()) + return q.first() except Exception: return None - # Ofsted inspection - o = safe_query(OfstedInspection, "urn") - result["ofsted"] = { - "framework": o.framework, - "inspection_date": o.inspection_date.isoformat() if o.inspection_date else None, - "inspection_type": o.inspection_type, - # OEIF fields (old framework) - "overall_effectiveness": o.overall_effectiveness, - "quality_of_education": o.quality_of_education, - "behaviour_attitudes": o.behaviour_attitudes, - "personal_development": o.personal_development, - "leadership_management": o.leadership_management, - "early_years_provision": o.early_years_provision, - "previous_overall": o.previous_overall, - # Report Card fields (new framework, from Nov 2025) - "rc_safeguarding_met": o.rc_safeguarding_met, - "rc_inclusion": o.rc_inclusion, - "rc_curriculum_teaching": o.rc_curriculum_teaching, - "rc_achievement": o.rc_achievement, - "rc_attendance_behaviour": o.rc_attendance_behaviour, - "rc_personal_development": o.rc_personal_development, - "rc_leadership_governance": o.rc_leadership_governance, - "rc_early_years": o.rc_early_years, - "rc_sixth_form": o.rc_sixth_form, - } if o else None + # Latest Ofsted inspection + o = safe_query(FactOfstedInspection, "urn", "inspection_date") + result["ofsted"] = ( + { + "framework": o.framework, + "inspection_date": o.inspection_date.isoformat() if o.inspection_date else None, + "inspection_type": o.inspection_type, + "overall_effectiveness": o.overall_effectiveness, + "quality_of_education": o.quality_of_education, + "behaviour_attitudes": o.behaviour_attitudes, + "personal_development": o.personal_development, + "leadership_management": o.leadership_management, + "early_years_provision": o.early_years_provision, + "sixth_form_provision": o.sixth_form_provision, + "previous_overall": None, # Not available in new schema + "rc_safeguarding_met": o.rc_safeguarding_met, + "rc_inclusion": o.rc_inclusion, + "rc_curriculum_teaching": o.rc_curriculum_teaching, + "rc_achievement": o.rc_achievement, + "rc_attendance_behaviour": o.rc_attendance_behaviour, + "rc_personal_development": o.rc_personal_development, + "rc_leadership_governance": o.rc_leadership_governance, + "rc_early_years": o.rc_early_years, + "rc_sixth_form": o.rc_sixth_form, + "report_url": o.report_url, + } + if o + else None + ) # Parent View - pv = safe_query(OfstedParentView, "urn") - result["parent_view"] = { - "survey_date": pv.survey_date.isoformat() if pv.survey_date else None, - "total_responses": pv.total_responses, - "q_happy_pct": pv.q_happy_pct, - "q_safe_pct": pv.q_safe_pct, - "q_behaviour_pct": pv.q_behaviour_pct, - "q_bullying_pct": pv.q_bullying_pct, - "q_communication_pct": pv.q_communication_pct, - "q_progress_pct": pv.q_progress_pct, - "q_teaching_pct": pv.q_teaching_pct, - "q_information_pct": pv.q_information_pct, - "q_curriculum_pct": pv.q_curriculum_pct, - "q_future_pct": pv.q_future_pct, - "q_leadership_pct": pv.q_leadership_pct, - "q_wellbeing_pct": pv.q_wellbeing_pct, - "q_recommend_pct": pv.q_recommend_pct, - "q_sen_pct": pv.q_sen_pct, - } if pv else None + pv = safe_query(FactParentView, "urn") + result["parent_view"] = ( + { + "survey_date": pv.survey_date.isoformat() if pv.survey_date else None, + "total_responses": pv.total_responses, + "q_happy_pct": pv.q_happy_pct, + "q_safe_pct": pv.q_safe_pct, + "q_behaviour_pct": pv.q_behaviour_pct, + "q_bullying_pct": pv.q_bullying_pct, + "q_communication_pct": pv.q_communication_pct, + "q_progress_pct": pv.q_progress_pct, + "q_teaching_pct": pv.q_teaching_pct, + "q_information_pct": pv.q_information_pct, + "q_curriculum_pct": pv.q_curriculum_pct, + "q_future_pct": pv.q_future_pct, + "q_leadership_pct": pv.q_leadership_pct, + "q_wellbeing_pct": pv.q_wellbeing_pct, + "q_recommend_pct": pv.q_recommend_pct, + } + if pv + else None + ) - # School Census (latest year) - c = safe_query(SchoolCensus, "urn", "year") - result["census"] = { - "year": c.year, - "class_size_avg": c.class_size_avg, - "ethnicity_white_pct": c.ethnicity_white_pct, - "ethnicity_asian_pct": c.ethnicity_asian_pct, - "ethnicity_black_pct": c.ethnicity_black_pct, - "ethnicity_mixed_pct": c.ethnicity_mixed_pct, - "ethnicity_other_pct": c.ethnicity_other_pct, - } if c else None + # Census (fact_pupil_characteristics — minimal until census columns are verified) + result["census"] = None # Admissions (latest year) - a = safe_query(SchoolAdmissions, "urn", "year") - result["admissions"] = { - "year": a.year, - "published_admission_number": a.published_admission_number, - "total_applications": a.total_applications, - "first_preference_offers_pct": a.first_preference_offers_pct, - "oversubscribed": a.oversubscribed, - } if a else None + a = safe_query(FactAdmissions, "urn", "year") + result["admissions"] = ( + { + "year": a.year, + "school_phase": a.school_phase, + "published_admission_number": a.published_admission_number, + "total_applications": a.total_applications, + "first_preference_applications": a.first_preference_applications, + "first_preference_offers": a.first_preference_offers, + "first_preference_offer_pct": a.first_preference_offer_pct, + "oversubscribed": a.oversubscribed, + } + if a + else None + ) - # SEN Detail (latest year) - s = safe_query(SenDetail, "urn", "year") - result["sen_detail"] = { - "year": s.year, - "primary_need_speech_pct": s.primary_need_speech_pct, - "primary_need_autism_pct": s.primary_need_autism_pct, - "primary_need_mld_pct": s.primary_need_mld_pct, - "primary_need_spld_pct": s.primary_need_spld_pct, - "primary_need_semh_pct": s.primary_need_semh_pct, - "primary_need_physical_pct": s.primary_need_physical_pct, - "primary_need_other_pct": s.primary_need_other_pct, - } if s else None + # SEN detail — not available in current marts + result["sen_detail"] = None - # Phonics (latest year) - ph = safe_query(Phonics, "urn", "year") - result["phonics"] = { - "year": ph.year, - "year1_phonics_pct": ph.year1_phonics_pct, - "year2_phonics_pct": ph.year2_phonics_pct, - } if ph else None + # Phonics — no school-level data on EES + result["phonics"] = None # Deprivation - d = safe_query(SchoolDeprivation, "urn") - result["deprivation"] = { - "lsoa_code": d.lsoa_code, - "idaci_score": d.idaci_score, - "idaci_decile": d.idaci_decile, - } if d else None + d = safe_query(FactDeprivation, "urn") + result["deprivation"] = ( + { + "lsoa_code": d.lsoa_code, + "idaci_score": d.idaci_score, + "idaci_decile": d.idaci_decile, + } + if d + else None + ) # Finance (latest year) - f = safe_query(SchoolFinance, "urn", "year") - result["finance"] = { - "year": f.year, - "per_pupil_spend": f.per_pupil_spend, - "staff_cost_pct": f.staff_cost_pct, - "teacher_cost_pct": f.teacher_cost_pct, - "support_staff_cost_pct": f.support_staff_cost_pct, - "premises_cost_pct": f.premises_cost_pct, - } if f else None + f = safe_query(FactFinance, "urn", "year") + result["finance"] = ( + { + "year": f.year, + "per_pupil_spend": f.per_pupil_spend, + "staff_cost_pct": f.staff_cost_pct, + "teacher_cost_pct": f.teacher_cost_pct, + "support_staff_cost_pct": f.support_staff_cost_pct, + "premises_cost_pct": f.premises_cost_pct, + } + if f + else None + ) return result diff --git a/backend/database.py b/backend/database.py index 57508e1..e5ee0ad 100644 --- a/backend/database.py +++ b/backend/database.py @@ -1,36 +1,30 @@ """ Database connection setup using SQLAlchemy. +The schema is managed by dbt — the backend only reads from marts.* tables. """ -from datetime import datetime -from typing import Optional - -from sqlalchemy import create_engine, inspect -from sqlalchemy.orm import sessionmaker, declarative_base from contextlib import contextmanager +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, declarative_base + from .config import settings -# Create engine engine = create_engine( settings.database_url, pool_size=10, max_overflow=20, - pool_pre_ping=True, # Verify connections before use - echo=False, # Set to True for SQL debugging + pool_pre_ping=True, + echo=False, ) -# Session factory SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) -# Base class for models Base = declarative_base() def get_db(): - """ - Dependency for FastAPI routes to get a database session. - """ + """Dependency for FastAPI routes.""" db = SessionLocal() try: yield db @@ -40,10 +34,7 @@ def get_db(): @contextmanager def get_db_session(): - """ - Context manager for database sessions. - Use in non-FastAPI contexts (scripts, etc). - """ + """Context manager for non-FastAPI contexts.""" db = SessionLocal() try: yield db @@ -53,95 +44,3 @@ def get_db_session(): raise finally: db.close() - - -def init_db(): - """ - Initialize database - create all tables. - """ - Base.metadata.create_all(bind=engine) - - -def drop_db(): - """ - Drop all tables - use with caution! - """ - Base.metadata.drop_all(bind=engine) - - -def get_db_schema_version() -> Optional[int]: - """ - Get the current schema version from the database. - Returns None if table doesn't exist or no version is set. - """ - from .models import SchemaVersion # Import here to avoid circular imports - - # Check if schema_version table exists - inspector = inspect(engine) - if "schema_version" not in inspector.get_table_names(): - return None - - try: - with get_db_session() as db: - row = db.query(SchemaVersion).first() - return row.version if row else None - except Exception: - return None - - -def set_db_schema_version(version: int): - """ - Set/update the schema version in the database. - Creates the row if it doesn't exist. - """ - from .models import SchemaVersion - - with get_db_session() as db: - row = db.query(SchemaVersion).first() - if row: - row.version = version - row.migrated_at = datetime.utcnow() - else: - db.add(SchemaVersion(id=1, version=version, migrated_at=datetime.utcnow())) - - -def check_and_migrate_if_needed(): - """ - Check schema version and run migration if needed. - Called during application startup. - """ - from .version import SCHEMA_VERSION - from .migration import run_full_migration - - db_version = get_db_schema_version() - - if db_version == SCHEMA_VERSION: - print(f"Schema version {SCHEMA_VERSION} matches. Fast startup.") - # Still ensure tables exist (they should if version matches) - init_db() - return - - if db_version is None: - print(f"No schema version found. Running initial migration (v{SCHEMA_VERSION})...") - else: - print(f"Schema mismatch: DB has v{db_version}, code expects v{SCHEMA_VERSION}") - print("Running full migration...") - - try: - # Set schema version BEFORE migration so a crash mid-migration - # doesn't cause an infinite re-migration loop on every restart. - init_db() - set_db_schema_version(SCHEMA_VERSION) - - success = run_full_migration(geocode=False) - - if success: - print(f"Migration complete. Schema version {SCHEMA_VERSION}.") - else: - print("Warning: Migration completed but no data was imported.") - - except Exception as e: - print(f"FATAL: Migration failed: {e}") - print("Application cannot start. Please check database and CSV files.") - raise - diff --git a/backend/models.py b/backend/models.py index 1e351de..57f1586 100644 --- a/backend/models.py +++ b/backend/models.py @@ -1,408 +1,216 @@ """ -SQLAlchemy database models for school data. -Normalized schema with separate tables for schools and yearly results. +SQLAlchemy models — all tables live in the marts schema, built by dbt. +Read-only: the pipeline writes to these tables; the backend only reads. """ -from datetime import datetime +from sqlalchemy import Column, Integer, String, Float, Boolean, Date, Text, Index -from sqlalchemy import ( - Column, Integer, String, Float, ForeignKey, Index, UniqueConstraint, - Text, Boolean, DateTime, Date -) -from sqlalchemy.orm import relationship from .database import Base +MARTS = {"schema": "marts"} -class School(Base): - """ - Core school information - relatively static data. - """ - __tablename__ = "schools" - - id = Column(Integer, primary_key=True, autoincrement=True) - urn = Column(Integer, unique=True, nullable=False, index=True) + +class DimSchool(Base): + """Canonical school dimension — one row per active URN.""" + __tablename__ = "dim_school" + __table_args__ = MARTS + + urn = Column(Integer, primary_key=True) school_name = Column(String(255), nullable=False) - local_authority = Column(String(100)) - local_authority_code = Column(Integer) + phase = Column(String(100)) school_type = Column(String(100)) - school_type_code = Column(String(10)) - religious_denomination = Column(String(100)) + academy_trust_name = Column(String(255)) + academy_trust_uid = Column(String(20)) + religious_character = Column(String(100)) + gender = Column(String(20)) age_range = Column(String(20)) - - # Address - address1 = Column(String(255)) - address2 = Column(String(255)) + capacity = Column(Integer) + total_pupils = Column(Integer) + headteacher_name = Column(String(200)) + website = Column(String(255)) + telephone = Column(String(30)) + status = Column(String(50)) + nursery_provision = Column(Boolean) + admissions_policy = Column(String(50)) + # Denormalised Ofsted summary (updated by monthly pipeline) + ofsted_grade = Column(Integer) + ofsted_date = Column(Date) + ofsted_framework = Column(String(20)) + + +class DimLocation(Base): + """School location — address, lat/lng from easting/northing (BNG→WGS84).""" + __tablename__ = "dim_location" + __table_args__ = MARTS + + urn = Column(Integer, primary_key=True) + address_line1 = Column(String(255)) + address_line2 = Column(String(255)) town = Column(String(100)) - postcode = Column(String(20), index=True) - - # Geocoding (cached) + county = Column(String(100)) + postcode = Column(String(20)) + local_authority_code = Column(Integer) + local_authority_name = Column(String(100)) + parliamentary_constituency = Column(String(100)) + urban_rural = Column(String(50)) + easting = Column(Integer) + northing = Column(Integer) latitude = Column(Float) longitude = Column(Float) - - # GIAS enrichment fields - website = Column(String(255)) - headteacher_name = Column(String(200)) - capacity = Column(Integer) - trust_name = Column(String(255)) - trust_uid = Column(String(20)) - gender = Column(String(20)) # Mixed / Girls / Boys - nursery_provision = Column(Boolean) - - # Relationships - results = relationship("SchoolResult", back_populates="school", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - @property - def address(self): - """Combine address fields into single string.""" - parts = [self.address1, self.address2, self.town, self.postcode] - return ", ".join(p for p in parts if p) + # geom is a PostGIS geometry — not mapped to SQLAlchemy (accessed via raw SQL) -class SchoolResult(Base): - """ - Yearly KS2 results for a school. - Each school can have multiple years of results. - """ - __tablename__ = "school_results" - - id = Column(Integer, primary_key=True, autoincrement=True) - school_id = Column(Integer, ForeignKey("schools.id", ondelete="CASCADE"), nullable=False) - year = Column(Integer, nullable=False, index=True) - - # Pupil numbers +class KS2Performance(Base): + """KS2 attainment — one row per URN per year (includes predecessor data).""" + __tablename__ = "fact_ks2_performance" + __table_args__ = ( + Index("ix_ks2_urn_year", "urn", "year"), + MARTS, + ) + + urn = Column(Integer, primary_key=True) + year = Column(Integer, primary_key=True) + source_urn = Column(Integer) total_pupils = Column(Integer) eligible_pupils = Column(Integer) - - # Core KS2 metrics - Expected Standard + # Core attainment rwm_expected_pct = Column(Float) - reading_expected_pct = Column(Float) - writing_expected_pct = Column(Float) - maths_expected_pct = Column(Float) - gps_expected_pct = Column(Float) - science_expected_pct = Column(Float) - - # Higher Standard rwm_high_pct = Column(Float) + reading_expected_pct = Column(Float) reading_high_pct = Column(Float) - writing_high_pct = Column(Float) - maths_high_pct = Column(Float) - gps_high_pct = Column(Float) - - # Progress Scores - reading_progress = Column(Float) - writing_progress = Column(Float) - maths_progress = Column(Float) - - # Average Scores reading_avg_score = Column(Float) + reading_progress = Column(Float) + writing_expected_pct = Column(Float) + writing_high_pct = Column(Float) + writing_progress = Column(Float) + maths_expected_pct = Column(Float) + maths_high_pct = Column(Float) maths_avg_score = Column(Float) + maths_progress = Column(Float) + gps_expected_pct = Column(Float) + gps_high_pct = Column(Float) gps_avg_score = Column(Float) - - # School Context + science_expected_pct = Column(Float) + # Absence + reading_absence_pct = Column(Float) + writing_absence_pct = Column(Float) + maths_absence_pct = Column(Float) + gps_absence_pct = Column(Float) + science_absence_pct = Column(Float) + # Gender + rwm_expected_boys_pct = Column(Float) + rwm_high_boys_pct = Column(Float) + rwm_expected_girls_pct = Column(Float) + rwm_high_girls_pct = Column(Float) + # Disadvantaged + rwm_expected_disadvantaged_pct = Column(Float) + rwm_expected_non_disadvantaged_pct = Column(Float) + disadvantaged_gap = Column(Float) + # Context disadvantaged_pct = Column(Float) eal_pct = Column(Float) sen_support_pct = Column(Float) sen_ehcp_pct = Column(Float) stability_pct = Column(Float) - # Pupil Absence from Tests - reading_absence_pct = Column(Float) - gps_absence_pct = Column(Float) - maths_absence_pct = Column(Float) - writing_absence_pct = Column(Float) - science_absence_pct = Column(Float) - # Gender Breakdown - rwm_expected_boys_pct = Column(Float) - rwm_expected_girls_pct = Column(Float) - rwm_high_boys_pct = Column(Float) - rwm_high_girls_pct = Column(Float) - - # Disadvantaged Performance - rwm_expected_disadvantaged_pct = Column(Float) - rwm_expected_non_disadvantaged_pct = Column(Float) - disadvantaged_gap = Column(Float) - - # 3-Year Averages - rwm_expected_3yr_pct = Column(Float) - reading_avg_3yr = Column(Float) - maths_avg_3yr = Column(Float) - - # Relationship - school = relationship("School", back_populates="results") - - # Constraints +class FactOfstedInspection(Base): + """Full Ofsted inspection history — one row per inspection.""" + __tablename__ = "fact_ofsted_inspection" __table_args__ = ( - UniqueConstraint('school_id', 'year', name='uq_school_year'), - Index('ix_school_results_school_year', 'school_id', 'year'), + Index("ix_ofsted_urn_date", "urn", "inspection_date"), + MARTS, ) - - def __repr__(self): - return f"" - - -class SchemaVersion(Base): - """ - Tracks database schema version for automatic migrations. - Single-row table that stores the current schema version. - """ - __tablename__ = "schema_version" - - id = Column(Integer, primary_key=True, default=1) - version = Column(Integer, nullable=False) - migrated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - def __repr__(self): - return f"" - - -# --------------------------------------------------------------------------- -# Supplementary data tables (populated by the Kestra data integrator) -# --------------------------------------------------------------------------- - -class OfstedInspection(Base): - """Latest Ofsted inspection judgement per school.""" - __tablename__ = "ofsted_inspections" urn = Column(Integer, primary_key=True) - inspection_date = Column(Date) - publication_date = Column(Date) - inspection_type = Column(String(100)) # Section 5 / Section 8 etc. - # Which inspection framework was used: 'OEIF' or 'ReportCard' + inspection_date = Column(Date, primary_key=True) + inspection_type = Column(String(100)) framework = Column(String(20)) - - # --- OEIF grades (old framework, pre-Nov 2025) --- - # 1=Outstanding 2=Good 3=Requires improvement 4=Inadequate overall_effectiveness = Column(Integer) quality_of_education = Column(Integer) behaviour_attitudes = Column(Integer) personal_development = Column(Integer) leadership_management = Column(Integer) - early_years_provision = Column(Integer) # nullable — not all schools - previous_overall = Column(Integer) # for trend display - - # --- Report Card grades (new framework, from Nov 2025) --- - # 1=Exceptional 2=Strong 3=Expected standard 4=Needs attention 5=Urgent improvement - rc_safeguarding_met = Column(Boolean) # True=Met, False=Not met + early_years_provision = Column(Integer) + sixth_form_provision = Column(Integer) + rc_safeguarding_met = Column(Boolean) rc_inclusion = Column(Integer) rc_curriculum_teaching = Column(Integer) rc_achievement = Column(Integer) rc_attendance_behaviour = Column(Integer) rc_personal_development = Column(Integer) rc_leadership_governance = Column(Integer) - rc_early_years = Column(Integer) # nullable — not all schools - rc_sixth_form = Column(Integer) # nullable — secondary only - - def __repr__(self): - return f"" + rc_early_years = Column(Integer) + rc_sixth_form = Column(Integer) + report_url = Column(Text) -class OfstedParentView(Base): - """Ofsted Parent View survey — latest per school. 14 questions, % saying Yes.""" - __tablename__ = "ofsted_parent_view" +class FactParentView(Base): + """Ofsted Parent View survey — latest per school.""" + __tablename__ = "fact_parent_view" + __table_args__ = MARTS urn = Column(Integer, primary_key=True) survey_date = Column(Date) total_responses = Column(Integer) - q_happy_pct = Column(Float) # My child is happy at this school - q_safe_pct = Column(Float) # My child feels safe at this school - q_bullying_pct = Column(Float) # School deals with bullying well - q_communication_pct = Column(Float) # School keeps me informed - q_progress_pct = Column(Float) # My child does well / good progress - q_teaching_pct = Column(Float) # Teaching is good - q_information_pct = Column(Float) # I receive valuable info about progress - q_curriculum_pct = Column(Float) # Broad range of subjects taught - q_future_pct = Column(Float) # Prepares child well for the future - q_leadership_pct = Column(Float) # Led and managed effectively - q_wellbeing_pct = Column(Float) # Supports wider personal development - q_behaviour_pct = Column(Float) # Pupils are well behaved - q_recommend_pct = Column(Float) # I would recommend this school - q_sen_pct = Column(Float) # Good information about child's SEN (where applicable) - - def __repr__(self): - return f"" + q_happy_pct = Column(Float) + q_safe_pct = Column(Float) + q_behaviour_pct = Column(Float) + q_bullying_pct = Column(Float) + q_communication_pct = Column(Float) + q_progress_pct = Column(Float) + q_teaching_pct = Column(Float) + q_information_pct = Column(Float) + q_curriculum_pct = Column(Float) + q_future_pct = Column(Float) + q_leadership_pct = Column(Float) + q_wellbeing_pct = Column(Float) + q_recommend_pct = Column(Float) -class SchoolCensus(Base): - """Annual school census snapshot — class sizes and ethnicity breakdown.""" - __tablename__ = "school_census" - - urn = Column(Integer, primary_key=True) - year = Column(Integer, primary_key=True) - class_size_avg = Column(Float) - ethnicity_white_pct = Column(Float) - ethnicity_asian_pct = Column(Float) - ethnicity_black_pct = Column(Float) - ethnicity_mixed_pct = Column(Float) - ethnicity_other_pct = Column(Float) - +class FactAdmissions(Base): + """School admissions — one row per URN per year.""" + __tablename__ = "fact_admissions" __table_args__ = ( - Index('ix_school_census_urn_year', 'urn', 'year'), + Index("ix_admissions_urn_year", "urn", "year"), + MARTS, ) - def __repr__(self): - return f"" - - -class SchoolAdmissions(Base): - """Annual admissions statistics per school.""" - __tablename__ = "school_admissions" - urn = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) - published_admission_number = Column(Integer) # PAN + school_phase = Column(String(50)) + published_admission_number = Column(Integer) total_applications = Column(Integer) - first_preference_offers_pct = Column(Float) # % receiving 1st choice + first_preference_applications = Column(Integer) + first_preference_offers = Column(Integer) + first_preference_offer_pct = Column(Float) oversubscribed = Column(Boolean) - - __table_args__ = ( - Index('ix_school_admissions_urn_year', 'urn', 'year'), - ) - - def __repr__(self): - return f"" + admissions_policy = Column(String(100)) -class SenDetail(Base): - """SEN primary need type breakdown — more granular than school_results context fields.""" - __tablename__ = "sen_detail" - - urn = Column(Integer, primary_key=True) - year = Column(Integer, primary_key=True) - primary_need_speech_pct = Column(Float) # SLCN - primary_need_autism_pct = Column(Float) # ASD - primary_need_mld_pct = Column(Float) # Moderate learning difficulty - primary_need_spld_pct = Column(Float) # Specific learning difficulty (dyslexia etc.) - primary_need_semh_pct = Column(Float) # Social, emotional, mental health - primary_need_physical_pct = Column(Float) # Physical/sensory - primary_need_other_pct = Column(Float) - - __table_args__ = ( - Index('ix_sen_detail_urn_year', 'urn', 'year'), - ) - - def __repr__(self): - return f"" - - -class Phonics(Base): - """Phonics Screening Check pass rates.""" - __tablename__ = "phonics" - - urn = Column(Integer, primary_key=True) - year = Column(Integer, primary_key=True) - year1_phonics_pct = Column(Float) # % reaching expected standard in Year 1 - year2_phonics_pct = Column(Float) # % reaching standard in Year 2 (re-takers) - - __table_args__ = ( - Index('ix_phonics_urn_year', 'urn', 'year'), - ) - - def __repr__(self): - return f"" - - -class SchoolDeprivation(Base): - """IDACI deprivation index — derived via postcode → LSOA lookup.""" - __tablename__ = "school_deprivation" +class FactDeprivation(Base): + """IDACI deprivation index — one row per URN.""" + __tablename__ = "fact_deprivation" + __table_args__ = MARTS urn = Column(Integer, primary_key=True) lsoa_code = Column(String(20)) - idaci_score = Column(Float) # 0–1, higher = more deprived - idaci_decile = Column(Integer) # 1 = most deprived, 10 = least deprived - - def __repr__(self): - return f"" + idaci_score = Column(Float) + idaci_decile = Column(Integer) -class SchoolFinance(Base): - """FBIT financial benchmarking data.""" - __tablename__ = "school_finance" +class FactFinance(Base): + """FBIT financial benchmarking — one row per URN per year.""" + __tablename__ = "fact_finance" + __table_args__ = ( + Index("ix_finance_urn_year", "urn", "year"), + MARTS, + ) urn = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) - per_pupil_spend = Column(Float) # £ total expenditure per pupil - staff_cost_pct = Column(Float) # % of budget on all staff - teacher_cost_pct = Column(Float) # % on teachers specifically + per_pupil_spend = Column(Float) + staff_cost_pct = Column(Float) + teacher_cost_pct = Column(Float) support_staff_cost_pct = Column(Float) premises_cost_pct = Column(Float) - - __table_args__ = ( - Index('ix_school_finance_urn_year', 'urn', 'year'), - ) - - def __repr__(self): - return f"" - - -# Mapping from CSV columns to model fields -SCHOOL_FIELD_MAPPING = { - 'urn': 'urn', - 'school_name': 'school_name', - 'local_authority': 'local_authority', - 'local_authority_code': 'local_authority_code', - 'school_type': 'school_type', - 'school_type_code': 'school_type_code', - 'religious_denomination': 'religious_denomination', - 'age_range': 'age_range', - 'address1': 'address1', - 'address2': 'address2', - 'town': 'town', - 'postcode': 'postcode', -} - -RESULT_FIELD_MAPPING = { - 'year': 'year', - 'total_pupils': 'total_pupils', - 'eligible_pupils': 'eligible_pupils', - # Expected Standard - 'rwm_expected_pct': 'rwm_expected_pct', - 'reading_expected_pct': 'reading_expected_pct', - 'writing_expected_pct': 'writing_expected_pct', - 'maths_expected_pct': 'maths_expected_pct', - 'gps_expected_pct': 'gps_expected_pct', - 'science_expected_pct': 'science_expected_pct', - # Higher Standard - 'rwm_high_pct': 'rwm_high_pct', - 'reading_high_pct': 'reading_high_pct', - 'writing_high_pct': 'writing_high_pct', - 'maths_high_pct': 'maths_high_pct', - 'gps_high_pct': 'gps_high_pct', - # Progress - 'reading_progress': 'reading_progress', - 'writing_progress': 'writing_progress', - 'maths_progress': 'maths_progress', - # Averages - 'reading_avg_score': 'reading_avg_score', - 'maths_avg_score': 'maths_avg_score', - 'gps_avg_score': 'gps_avg_score', - # Context - 'disadvantaged_pct': 'disadvantaged_pct', - 'eal_pct': 'eal_pct', - 'sen_support_pct': 'sen_support_pct', - 'sen_ehcp_pct': 'sen_ehcp_pct', - 'stability_pct': 'stability_pct', - # Absence - 'reading_absence_pct': 'reading_absence_pct', - 'gps_absence_pct': 'gps_absence_pct', - 'maths_absence_pct': 'maths_absence_pct', - 'writing_absence_pct': 'writing_absence_pct', - 'science_absence_pct': 'science_absence_pct', - # Gender - 'rwm_expected_boys_pct': 'rwm_expected_boys_pct', - 'rwm_expected_girls_pct': 'rwm_expected_girls_pct', - 'rwm_high_boys_pct': 'rwm_high_boys_pct', - 'rwm_high_girls_pct': 'rwm_high_girls_pct', - # Disadvantaged - 'rwm_expected_disadvantaged_pct': 'rwm_expected_disadvantaged_pct', - 'rwm_expected_non_disadvantaged_pct': 'rwm_expected_non_disadvantaged_pct', - 'disadvantaged_gap': 'disadvantaged_gap', - # 3-Year - 'rwm_expected_3yr_pct': 'rwm_expected_3yr_pct', - 'reading_avg_3yr': 'reading_avg_3yr', - 'maths_avg_3yr': 'maths_avg_3yr', -} - diff --git a/pipeline/dags/school_data_pipeline.py b/pipeline/dags/school_data_pipeline.py index 60b6552..52a37bd 100644 --- a/pipeline/dags/school_data_pipeline.py +++ b/pipeline/dags/school_data_pipeline.py @@ -120,12 +120,12 @@ with DAG( extract_ofsted >> dbt_build_ofsted >> sync_typesense_ofsted -# ── Annual DAG (EES: KS2, KS4, Census, Admissions, Phonics) ─────────── +# ── Annual DAG (EES: KS2, KS4, Census, Admissions) ─────────────────── with DAG( dag_id="school_data_annual_ees", default_args=default_args, - description="Annual EES data extraction (KS2, KS4, Census, Admissions, Phonics)", + description="Annual EES data extraction (KS2, KS4, Census, Admissions)", schedule=None, # Triggered manually when new releases are published start_date=datetime(2025, 1, 1), catchup=False, @@ -140,7 +140,7 @@ with DAG( dbt_build_ees = BashOperator( task_id="dbt_build", - bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_phonics+", + bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+", ) sync_typesense_ees = BashOperator( diff --git a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py index 1ca2b79..f9e5756 100644 --- a/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ees/tap_uk_ees/tap.py @@ -1,9 +1,11 @@ -"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data. +"""EES Singer tap — extracts KS2, KS4, Census, Admissions data. Each stream targets a specific CSV file within an EES release ZIP. The EES data uses 'school_urn' for school-level records and 'z' for suppressed values. Column names vary by file — schemas declare all columns needed by downstream dbt staging models. + +Phonics has no school-level data on EES and is not included. """ from __future__ import annotations @@ -38,11 +40,15 @@ def download_release_zip(release_id: str) -> zipfile.ZipFile: class EESDatasetStream(Stream): - """Base stream for an EES dataset extracted from a release ZIP.""" + """Base stream for an EES dataset extracted from a release ZIP. + + Subclasses set _target_filename to a keyword that appears in the + target CSV path inside the ZIP (substring match, not exact). + """ replication_key = None _publication_slug: str = "" - _target_filename: str = "" # exact filename within the ZIP + _target_filename: str = "" # keyword that appears in the CSV path _urn_column: str = "school_urn" # column name for URN in the CSV def get_records(self, context): @@ -56,17 +62,17 @@ class EESDatasetStream(Stream): ) zf = download_release_zip(release_id) - # Find the target file + # Find the target file (substring match) all_files = zf.namelist() target = None for name in all_files: - if name.endswith(self._target_filename): + if self._target_filename in name and name.endswith(".csv"): target = name break if not target: self.logger.error( - "File '%s' not found in ZIP. Available: %s", + "File matching '%s' not found in ZIP. Available: %s", self._target_filename, [n for n in all_files if n.endswith(".csv")], ) @@ -96,7 +102,7 @@ class EESKS2AttainmentStream(EESDatasetStream): name = "ees_ks2_attainment" primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"] _publication_slug = "key-stage-2-attainment" - _target_filename = "ks2_school_attainment_data.csv" + _target_filename = "ks2_school_attainment_data" schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True), @@ -126,7 +132,7 @@ class EESKS2InfoStream(EESDatasetStream): name = "ees_ks2_info" primary_keys = ["school_urn", "time_period"] _publication_slug = "key-stage-2-attainment" - _target_filename = "ks2_school_information_data.csv" + _target_filename = "ks2_school_information_data" schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True), @@ -150,60 +156,172 @@ class EESKS2InfoStream(EESDatasetStream): ).to_dict() -# ── KS4 Attainment ────────────────────────────────────────────────────────── +# ── KS4 Performance (long format: one row per school × breakdown × sex) ───── +# File: 202425_performance_tables_schools_revised.csv (156 cols) +# Dimensions: breakdown_topic, breakdown, sex, disadvantage_status, etc. +# Metrics are already in separate columns (attainment8_average, progress8_average, etc.) -class EESKS4Stream(EESDatasetStream): - name = "ees_ks4" - primary_keys = ["school_urn", "time_period"] +class EESKS4PerformanceStream(EESDatasetStream): + name = "ees_ks4_performance" + primary_keys = ["school_urn", "time_period", "breakdown_topic", "breakdown", "sex"] _publication_slug = "key-stage-4-performance" - _target_filename = "school" # Will be refined once we see the actual ZIP contents + _target_filename = "performance_tables_schools" schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True), + th.Property("school_laestab", th.StringType), + th.Property("school_name", th.StringType), + th.Property("establishment_type_group", th.StringType), + th.Property("breakdown_topic", th.StringType, required=True), + th.Property("breakdown", th.StringType, required=True), + th.Property("sex", th.StringType, required=True), + th.Property("disadvantage_status", th.StringType), + th.Property("first_language", th.StringType), + th.Property("prior_attainment", th.StringType), + th.Property("mobility", th.StringType), + # Pupil counts + th.Property("pupil_count", th.StringType), + th.Property("pupil_percent", th.StringType), + # Attainment 8 + th.Property("attainment8_sum", th.StringType), + th.Property("attainment8_average", th.StringType), + # English & Maths + th.Property("engmath_entering_total", th.StringType), + th.Property("engmath_entering_percent", th.StringType), + th.Property("engmath_95_total", th.StringType), + th.Property("engmath_95_percent", th.StringType), + th.Property("engmath_94_total", th.StringType), + th.Property("engmath_94_percent", th.StringType), + # EBacc + th.Property("ebacc_entering_total", th.StringType), + th.Property("ebacc_entering_percent", th.StringType), + th.Property("ebacc_95_total", th.StringType), + th.Property("ebacc_95_percent", th.StringType), + th.Property("ebacc_94_total", th.StringType), + th.Property("ebacc_94_percent", th.StringType), + th.Property("ebacc_aps_sum", th.StringType), + th.Property("ebacc_aps_average", th.StringType), + # Progress 8 + th.Property("progress8_pupil_count", th.StringType), + th.Property("progress8_sum", th.StringType), + th.Property("progress8_average", th.StringType), + th.Property("progress8_lower_95_ci", th.StringType), + th.Property("progress8_upper_95_ci", th.StringType), + # Progress 8 elements + th.Property("progress8eng_average", th.StringType), + th.Property("progress8mat_average", th.StringType), + th.Property("progress8ebacc_average", th.StringType), + th.Property("progress8open_average", th.StringType), + # GCSE grades + th.Property("gcse_91_total", th.StringType), + th.Property("gcse_91_percent", th.StringType), + # EBacc subject entry/achievement + th.Property("ebacceng_entering_percent", th.StringType), + th.Property("ebaccmat_entering_percent", th.StringType), + th.Property("ebaccsci_entering_percent", th.StringType), + th.Property("ebacchum_entering_percent", th.StringType), + th.Property("ebacclan_entering_percent", th.StringType), + ).to_dict() + + +# ── KS4 Information (wide format: one row per school, context/demographics) ── +# File: 202425_information_about_schools_provisional.csv (38 cols) + +class EESKS4InfoStream(EESDatasetStream): + name = "ees_ks4_info" + primary_keys = ["school_urn", "time_period"] + _publication_slug = "key-stage-4-performance" + _target_filename = "information_about_schools" + schema = th.PropertiesList( + th.Property("time_period", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), + th.Property("school_laestab", th.StringType), + th.Property("school_name", th.StringType), + th.Property("establishment_type_group", th.StringType), + th.Property("reldenom", th.StringType), + th.Property("admpol_pt", th.StringType), + th.Property("egender", th.StringType), + th.Property("agerange", th.StringType), + th.Property("allks_pupil_count", th.StringType), + th.Property("allks_boys_count", th.StringType), + th.Property("allks_girls_count", th.StringType), + th.Property("endks4_pupil_count", th.StringType), + th.Property("ks2_scaledscore_average", th.StringType), + th.Property("sen_with_ehcp_pupil_percent", th.StringType), + th.Property("sen_pupil_percent", th.StringType), + th.Property("sen_no_ehcp_pupil_percent", th.StringType), + th.Property("attainment8_diffn", th.StringType), + th.Property("progress8_diffn", th.StringType), + th.Property("progress8_banding", th.StringType), ).to_dict() # ── Census (school-level pupil characteristics) ───────────────────────────── +# File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/) +# Uses 'urn' not 'school_urn'. Filename has yearly suffix that changes. class EESCensusStream(EESDatasetStream): name = "ees_census" - primary_keys = ["urn", "time_period"] + primary_keys = ["school_urn", "time_period"] _publication_slug = "school-pupils-and-their-characteristics" - _target_filename = "spc_school_level_underlying_data_2025.csv" + _target_filename = "spc_school_level_underlying_data" _urn_column = "urn" schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), - th.Property("urn", th.StringType, required=True), + th.Property("school_urn", th.StringType, required=True), th.Property("school_name", th.StringType), th.Property("laestab", th.StringType), th.Property("phase_type_grouping", th.StringType), + # TODO: Add data columns (ethnicity %, FSM %, SEN %, etc.) once + # actual column names are verified on the container. The CSV has + # 269 columns — only the first 30 (metadata) have been inspected. ).to_dict() # ── Admissions ─────────────────────────────────────────────────────────────── +# File: AppsandOffers_YYYY_SchoolLevelDDMMYYYY.csv (37 cols, in supporting-files/) +# Wide format, no geographic_level column. Uses school_urn. class EESAdmissionsStream(EESDatasetStream): name = "ees_admissions" primary_keys = ["school_urn", "time_period"] _publication_slug = "primary-and-secondary-school-applications-and-offers" - _target_filename = "school" # Will be refined once we see the actual ZIP contents + _target_filename = "SchoolLevel" schema = th.PropertiesList( th.Property("time_period", th.StringType, required=True), th.Property("school_urn", th.StringType, required=True), + th.Property("school_name", th.StringType), + th.Property("school_laestab_as_used", th.StringType), + th.Property("school_phase", th.StringType), + th.Property("entry_year", th.StringType), + # Places and offers + th.Property("total_number_places_offered", th.StringType), + th.Property("number_preferred_offers", th.StringType), + th.Property("number_1st_preference_offers", th.StringType), + th.Property("number_2nd_preference_offers", th.StringType), + th.Property("number_3rd_preference_offers", th.StringType), + # Applications + th.Property("times_put_as_any_preferred_school", th.StringType), + th.Property("times_put_as_1st_preference", th.StringType), + th.Property("times_put_as_2nd_preference", th.StringType), + th.Property("times_put_as_3rd_preference", th.StringType), + # Proportions + th.Property("proportion_1stprefs_v_1stprefoffers", th.StringType), + th.Property("proportion_1stprefs_v_totaloffers", th.StringType), + # Cross-LA + th.Property("all_applications_from_another_LA", th.StringType), + th.Property("offers_to_applicants_from_another_LA", th.StringType), + # Context + th.Property("establishment_type", th.StringType), + th.Property("denomination", th.StringType), + th.Property("FSM_eligible_percent", th.StringType), + th.Property("admissions_policy", th.StringType), + th.Property("urban_rural", th.StringType), ).to_dict() -# ── Phonics ────────────────────────────────────────────────────────────────── - -class EESPhonicsStream(EESDatasetStream): - name = "ees_phonics" - primary_keys = ["school_urn", "time_period"] - _publication_slug = "phonics-screening-check-attainment" - _target_filename = "school" # Will be refined once we see the actual ZIP contents - schema = th.PropertiesList( - th.Property("time_period", th.StringType, required=True), - th.Property("school_urn", th.StringType, required=True), - ).to_dict() +# Note: Phonics (phonics-screening-check-attainment) has NO school-level data +# on EES. Only national and LA-level files are published. class TapUKEES(Tap): @@ -219,10 +337,10 @@ class TapUKEES(Tap): return [ EESKS2AttainmentStream(self), EESKS2InfoStream(self), - EESKS4Stream(self), + EESKS4PerformanceStream(self), + EESKS4InfoStream(self), EESCensusStream(self), EESAdmissionsStream(self), - EESPhonicsStream(self), ] diff --git a/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql index 9008f82..124992f 100644 --- a/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql +++ b/pipeline/transform/models/intermediate/int_ks4_with_lineage.sql @@ -4,16 +4,14 @@ with current_ks4 as ( select urn as current_urn, urn as source_urn, - year, - total_pupils, - progress_8_score, + year, total_pupils, eligible_pupils, prior_attainment_avg, attainment_8_score, - ebacc_entry_pct, - ebacc_achievement_pct, - english_strong_pass_pct, - maths_strong_pass_pct, - english_maths_strong_pass_pct, - staying_in_education_pct + progress_8_score, progress_8_lower_ci, progress_8_upper_ci, + progress_8_english, progress_8_maths, progress_8_ebacc, progress_8_open, + english_maths_strong_pass_pct, english_maths_standard_pass_pct, + ebacc_entry_pct, ebacc_strong_pass_pct, ebacc_standard_pass_pct, ebacc_avg_score, + gcse_grade_91_pct, + sen_pct, sen_ehcp_pct, sen_support_pct from {{ ref('stg_ees_ks4') }} ), @@ -21,16 +19,14 @@ predecessor_ks4 as ( select lin.current_urn, ks4.urn as source_urn, - ks4.year, - ks4.total_pupils, - ks4.progress_8_score, + ks4.year, ks4.total_pupils, ks4.eligible_pupils, ks4.prior_attainment_avg, ks4.attainment_8_score, - ks4.ebacc_entry_pct, - ks4.ebacc_achievement_pct, - ks4.english_strong_pass_pct, - ks4.maths_strong_pass_pct, - ks4.english_maths_strong_pass_pct, - ks4.staying_in_education_pct + ks4.progress_8_score, ks4.progress_8_lower_ci, ks4.progress_8_upper_ci, + ks4.progress_8_english, ks4.progress_8_maths, ks4.progress_8_ebacc, ks4.progress_8_open, + ks4.english_maths_strong_pass_pct, ks4.english_maths_standard_pass_pct, + ks4.ebacc_entry_pct, ks4.ebacc_strong_pass_pct, ks4.ebacc_standard_pass_pct, ks4.ebacc_avg_score, + ks4.gcse_grade_91_pct, + ks4.sen_pct, ks4.sen_ehcp_pct, ks4.sen_support_pct from {{ ref('stg_ees_ks4') }} ks4 inner join {{ ref('int_school_lineage') }} lin on ks4.urn = lin.predecessor_urn diff --git a/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql index 165f3a3..9c171b7 100644 --- a/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql +++ b/pipeline/transform/models/intermediate/int_pupil_chars_merged.sql @@ -1,18 +1,8 @@ -- Intermediate model: Merged pupil characteristics from census data +-- TODO: Expand once census data columns are verified and added to stg_ees_census select urn, year, - fsm_pct, - sen_support_pct, - sen_ehcp_pct, - eal_pct, - disadvantaged_pct, - ethnicity_white_pct, - ethnicity_asian_pct, - ethnicity_black_pct, - ethnicity_mixed_pct, - ethnicity_other_pct, - class_size_avg, - stability_pct + phase_type_grouping from {{ ref('stg_ees_census') }} diff --git a/pipeline/transform/models/marts/_marts_schema.yml b/pipeline/transform/models/marts/_marts_schema.yml index b5b9314..e12fdc2 100644 --- a/pipeline/transform/models/marts/_marts_schema.yml +++ b/pipeline/transform/models/marts/_marts_schema.yml @@ -88,14 +88,6 @@ models: - name: year tests: [not_null] - - name: fact_phonics - description: Phonics screening results — one row per URN per year - columns: - - name: urn - tests: [not_null] - - name: year - tests: [not_null] - - name: fact_parent_view description: Parent View survey responses columns: diff --git a/pipeline/transform/models/marts/fact_admissions.sql b/pipeline/transform/models/marts/fact_admissions.sql index 305db45..9ca5e84 100644 --- a/pipeline/transform/models/marts/fact_admissions.sql +++ b/pipeline/transform/models/marts/fact_admissions.sql @@ -3,8 +3,12 @@ select urn, year, + school_phase, published_admission_number, total_applications, - first_preference_offers_pct, - oversubscribed + first_preference_applications, + first_preference_offers, + first_preference_offer_pct, + oversubscribed, + admissions_policy from {{ ref('stg_ees_admissions') }} diff --git a/pipeline/transform/models/marts/fact_ks4_performance.sql b/pipeline/transform/models/marts/fact_ks4_performance.sql index accae46..4607dfb 100644 --- a/pipeline/transform/models/marts/fact_ks4_performance.sql +++ b/pipeline/transform/models/marts/fact_ks4_performance.sql @@ -1,16 +1,42 @@ -- Mart: KS4 performance fact table — one row per URN per year +-- Includes predecessor data via lineage resolution select current_urn as urn, source_urn, year, total_pupils, - progress_8_score, + eligible_pupils, + prior_attainment_avg, + + -- Attainment 8 attainment_8_score, - ebacc_entry_pct, - ebacc_achievement_pct, - english_strong_pass_pct, - maths_strong_pass_pct, + + -- Progress 8 + progress_8_score, + progress_8_lower_ci, + progress_8_upper_ci, + progress_8_english, + progress_8_maths, + progress_8_ebacc, + progress_8_open, + + -- English & Maths english_maths_strong_pass_pct, - staying_in_education_pct + english_maths_standard_pass_pct, + + -- EBacc + ebacc_entry_pct, + ebacc_strong_pass_pct, + ebacc_standard_pass_pct, + ebacc_avg_score, + + -- GCSE + gcse_grade_91_pct, + + -- Context + sen_pct, + sen_ehcp_pct, + sen_support_pct + from {{ ref('int_ks4_with_lineage') }} diff --git a/pipeline/transform/models/marts/fact_phonics.sql b/pipeline/transform/models/marts/fact_phonics.sql deleted file mode 100644 index 71a6a2d..0000000 --- a/pipeline/transform/models/marts/fact_phonics.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Mart: Phonics screening results — one row per URN per year - -select - urn, - year, - year1_phonics_pct, - year2_phonics_pct -from {{ ref('stg_ees_phonics') }} diff --git a/pipeline/transform/models/marts/fact_pupil_characteristics.sql b/pipeline/transform/models/marts/fact_pupil_characteristics.sql index c12936b..a3081ee 100644 --- a/pipeline/transform/models/marts/fact_pupil_characteristics.sql +++ b/pipeline/transform/models/marts/fact_pupil_characteristics.sql @@ -1,18 +1,8 @@ -- Mart: Pupil characteristics — one row per URN per year +-- TODO: Expand once census data columns are verified and added to staging select urn, year, - fsm_pct, - sen_support_pct, - sen_ehcp_pct, - eal_pct, - disadvantaged_pct, - ethnicity_white_pct, - ethnicity_asian_pct, - ethnicity_black_pct, - ethnicity_mixed_pct, - ethnicity_other_pct, - class_size_avg, - stability_pct + phase_type_grouping from {{ ref('int_pupil_chars_merged') }} diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml index b7da2a9..f4b28d4 100644 --- a/pipeline/transform/models/staging/_stg_sources.yml +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -30,8 +30,11 @@ sources: - name: ees_ks2_info description: KS2 school information (wide format — context/demographics per school) - - name: ees_ks4 - description: KS4 attainment data from Explore Education Statistics + - name: ees_ks4_performance + description: KS4 performance tables (long format — one row per school × breakdown × sex) + + - name: ees_ks4_info + description: KS4 school information (wide format — context/demographics per school) - name: ees_census description: School census pupil characteristics @@ -39,8 +42,7 @@ sources: - name: ees_admissions description: Primary and secondary school admissions data - - name: ees_phonics - description: Phonics screening check results + # Phonics: no school-level data on EES (only national/LA level) - name: parent_view description: Ofsted Parent View survey responses diff --git a/pipeline/transform/models/staging/stg_ees_admissions.sql b/pipeline/transform/models/staging/stg_ees_admissions.sql index 4c8c411..58e5667 100644 --- a/pipeline/transform/models/staging/stg_ees_admissions.sql +++ b/pipeline/transform/models/staging/stg_ees_admissions.sql @@ -1,19 +1,48 @@ -- Staging model: Primary and secondary school admissions from EES +-- Wide format, one row per school per year. No geographic_level column. +-- File is in supporting-files/ subdirectory of the release ZIP. with source as ( select * from {{ source('raw', 'ees_admissions') }} + where school_urn is not null ), renamed as ( select - cast(urn as integer) as urn, - cast(time_period as integer) as year, - cast(published_admission_number as integer) as published_admission_number, - cast(total_applications as integer) as total_applications, - cast(first_preference_offers_pct as numeric) as first_preference_offers_pct, - cast(oversubscribed as boolean) as oversubscribed + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + school_phase, + entry_year, + + -- Places and offers + cast(nullif(total_number_places_offered, 'z') as integer) as published_admission_number, + cast(nullif(number_preferred_offers, 'z') as integer) as total_offers, + cast(nullif(number_1st_preference_offers, 'z') as integer) as first_preference_offers, + cast(nullif(number_2nd_preference_offers, 'z') as integer) as second_preference_offers, + cast(nullif(number_3rd_preference_offers, 'z') as integer) as third_preference_offers, + + -- Applications + cast(nullif(times_put_as_any_preferred_school, 'z') as integer) as total_applications, + cast(nullif(times_put_as_1st_preference, 'z') as integer) as first_preference_applications, + + -- Proportions + cast(nullif(proportion_1stprefs_v_totaloffers, 'z') as numeric) as first_preference_offer_pct, + + -- Derived: oversubscribed if applications > places + case + when nullif(times_put_as_1st_preference, 'z') is not null + and nullif(total_number_places_offered, 'z') is not null + and cast(times_put_as_1st_preference as integer) + > cast(total_number_places_offered as integer) + then true + else false + end as oversubscribed, + + -- Context + admissions_policy, + nullif(FSM_eligible_percent, 'z') as fsm_eligible_pct + from source - where urn is not null ) select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_census.sql b/pipeline/transform/models/staging/stg_ees_census.sql index fe38c51..ec6438f 100644 --- a/pipeline/transform/models/staging/stg_ees_census.sql +++ b/pipeline/transform/models/staging/stg_ees_census.sql @@ -1,27 +1,30 @@ -- Staging model: School census pupil characteristics from EES +-- File: spc_school_level_underlying_data_YYYY.csv (269 cols, in supporting-files/) +-- Uses 'urn' column (not school_urn). Tap normalises to school_urn. +-- +-- TODO: The CSV has 269 columns but only metadata columns have been verified. +-- Data columns (ethnicity %, FSM %, SEN %, class sizes) need to be discovered +-- by inspecting the CSV on the Airflow container. The column references below +-- are placeholders and will fail until the tap schema and this model are updated +-- with the actual column names. with source as ( select * from {{ source('raw', 'ees_census') }} + where school_urn is not null ), renamed as ( select - cast(urn as integer) as urn, - cast(time_period as integer) as year, - cast(fsm_pct as numeric) as fsm_pct, - cast(sen_support_pct as numeric) as sen_support_pct, - cast(sen_ehcp_pct as numeric) as sen_ehcp_pct, - cast(eal_pct as numeric) as eal_pct, - cast(disadvantaged_pct as numeric) as disadvantaged_pct, - cast(ethnicity_white_pct as numeric) as ethnicity_white_pct, - cast(ethnicity_asian_pct as numeric) as ethnicity_asian_pct, - cast(ethnicity_black_pct as numeric) as ethnicity_black_pct, - cast(ethnicity_mixed_pct as numeric) as ethnicity_mixed_pct, - cast(ethnicity_other_pct as numeric) as ethnicity_other_pct, - cast(class_size_avg as numeric) as class_size_avg, - cast(stability_pct as numeric) as stability_pct + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + school_name, + phase_type_grouping + -- TODO: Add census data columns once verified: + -- fsm_pct, sen_support_pct, sen_ehcp_pct, eal_pct, + -- disadvantaged_pct, ethnicity_white_pct, ethnicity_asian_pct, + -- ethnicity_black_pct, ethnicity_mixed_pct, ethnicity_other_pct, + -- class_size_avg, stability_pct from source - where urn is not null ) select * from renamed diff --git a/pipeline/transform/models/staging/stg_ees_ks4.sql b/pipeline/transform/models/staging/stg_ees_ks4.sql index 9e805c1..a4aae9a 100644 --- a/pipeline/transform/models/staging/stg_ees_ks4.sql +++ b/pipeline/transform/models/staging/stg_ees_ks4.sql @@ -1,24 +1,102 @@ --- Staging model: KS4 attainment data from EES (secondary schools — NEW) +-- Staging model: KS4 attainment data from EES +-- KS4 performance data is long-format with breakdown dimensions (breakdown_topic, +-- breakdown, sex). Unlike KS2 which has a subject dimension, KS4 metrics are +-- already in separate columns — we just filter to the 'All pupils' breakdown. +-- EES uses 'z' for suppressed values — cast to null via nullif. -with source as ( - select * from {{ source('raw', 'ees_ks4') }} +with performance as ( + select * from {{ source('raw', 'ees_ks4_performance') }} + where school_urn is not null ), -renamed as ( +-- Filter to all-pupils totals (one row per school per year) +all_pupils as ( select - cast(urn as integer) as urn, - cast(time_period as integer) as year, - cast(t_pupils as integer) as total_pupils, - cast(progress_8_score as numeric) as progress_8_score, - cast(attainment_8_score as numeric) as attainment_8_score, - cast(ebacc_entry_pct as numeric) as ebacc_entry_pct, - cast(ebacc_achievement_pct as numeric) as ebacc_achievement_pct, - cast(english_strong_pass_pct as numeric) as english_strong_pass_pct, - cast(maths_strong_pass_pct as numeric) as maths_strong_pass_pct, - cast(english_maths_strong_pass_pct as numeric) as english_maths_strong_pass_pct, - cast(staying_in_education_pct as numeric) as staying_in_education_pct - from source - where urn is not null + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + cast(nullif(pupil_count, 'z') as integer) as total_pupils, + + -- Attainment 8 + cast(nullif(attainment8_average, 'z') as numeric) as attainment_8_score, + + -- Progress 8 + cast(nullif(progress8_average, 'z') as numeric) as progress_8_score, + cast(nullif(progress8_lower_95_ci, 'z') as numeric) as progress_8_lower_ci, + cast(nullif(progress8_upper_95_ci, 'z') as numeric) as progress_8_upper_ci, + cast(nullif(progress8eng_average, 'z') as numeric) as progress_8_english, + cast(nullif(progress8mat_average, 'z') as numeric) as progress_8_maths, + cast(nullif(progress8ebacc_average, 'z') as numeric) as progress_8_ebacc, + cast(nullif(progress8open_average, 'z') as numeric) as progress_8_open, + + -- English & Maths pass rates + cast(nullif(engmath_95_percent, 'z') as numeric) as english_maths_strong_pass_pct, + cast(nullif(engmath_94_percent, 'z') as numeric) as english_maths_standard_pass_pct, + + -- EBacc + cast(nullif(ebacc_entering_percent, 'z') as numeric) as ebacc_entry_pct, + cast(nullif(ebacc_95_percent, 'z') as numeric) as ebacc_strong_pass_pct, + cast(nullif(ebacc_94_percent, 'z') as numeric) as ebacc_standard_pass_pct, + cast(nullif(ebacc_aps_average, 'z') as numeric) as ebacc_avg_score, + + -- GCSE grade 9-1 + cast(nullif(gcse_91_percent, 'z') as numeric) as gcse_grade_91_pct + + from performance + where breakdown_topic = 'All pupils' + and breakdown = 'Total' + and sex = 'Total' +), + +-- KS4 info table for context/demographics +info as ( + select + cast(school_urn as integer) as urn, + cast(time_period as integer) as year, + cast(nullif(endks4_pupil_count, 'z') as integer) as eligible_pupils, + cast(nullif(ks2_scaledscore_average, 'z') as numeric) as prior_attainment_avg, + cast(nullif(sen_pupil_percent, 'z') as numeric) as sen_pct, + cast(nullif(sen_with_ehcp_pupil_percent, 'z') as numeric) as sen_ehcp_pct, + cast(nullif(sen_no_ehcp_pupil_percent, 'z') as numeric) as sen_support_pct + from {{ source('raw', 'ees_ks4_info') }} + where school_urn is not null ) -select * from renamed +select + p.urn, + p.year, + p.total_pupils, + i.eligible_pupils, + i.prior_attainment_avg, + + -- Attainment 8 + p.attainment_8_score, + + -- Progress 8 + p.progress_8_score, + p.progress_8_lower_ci, + p.progress_8_upper_ci, + p.progress_8_english, + p.progress_8_maths, + p.progress_8_ebacc, + p.progress_8_open, + + -- English & Maths + p.english_maths_strong_pass_pct, + p.english_maths_standard_pass_pct, + + -- EBacc + p.ebacc_entry_pct, + p.ebacc_strong_pass_pct, + p.ebacc_standard_pass_pct, + p.ebacc_avg_score, + + -- GCSE + p.gcse_grade_91_pct, + + -- Context + i.sen_pct, + i.sen_ehcp_pct, + i.sen_support_pct + +from all_pupils p +left join info i on p.urn = i.urn and p.year = i.year diff --git a/pipeline/transform/models/staging/stg_ees_phonics.sql b/pipeline/transform/models/staging/stg_ees_phonics.sql deleted file mode 100644 index f5629df..0000000 --- a/pipeline/transform/models/staging/stg_ees_phonics.sql +++ /dev/null @@ -1,17 +0,0 @@ --- Staging model: Phonics screening check results from EES - -with source as ( - select * from {{ source('raw', 'ees_phonics') }} -), - -renamed as ( - select - cast(urn as integer) as urn, - cast(time_period as integer) as year, - cast(year1_phonics_pct as numeric) as year1_phonics_pct, - cast(year2_phonics_pct as numeric) as year2_phonics_pct - from source - where urn is not null -) - -select * from renamed