#!/usr/bin/env python3 """ Migration script to import CSV data into PostgreSQL database. Usage: python scripts/migrate_csv_to_db.py [--drop] [--geocode] Options: --drop Drop existing tables before migration --geocode Geocode postcodes (requires network access) """ import os import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) import argparse import re from typing import Dict, Optional import numpy as np import pandas as pd import requests from backend.config import settings from backend.database import Base, engine, get_db_session from backend.models import ( RESULT_FIELD_MAPPING, SCHOOL_FIELD_MAPPING, School, SchoolResult, ) from backend.schemas import ( COLUMN_MAPPINGS, LA_CODE_TO_NAME, NULL_VALUES, NUMERIC_COLUMNS, SCHOOL_TYPE_MAP, ) def parse_numeric(value) -> Optional[float]: """Parse a numeric value, handling special cases.""" if pd.isna(value): return None if isinstance(value, (int, float)): return float(value) if not np.isnan(value) else None str_val = str(value).strip().upper() if str_val in NULL_VALUES or str_val == "": return None # Remove percentage signs if present str_val = str_val.replace("%", "") try: return float(str_val) except ValueError: return None def extract_year_from_folder(folder_name: str) -> Optional[int]: """Extract year from folder name like '2023-2024'.""" match = re.search(r"(\d{4})-(\d{4})", folder_name) if match: return int(match.group(2)) match = re.search(r"(\d{4})", folder_name) if match: return int(match.group(1)) return None def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]: """ Geocode postcodes in bulk using postcodes.io API. Returns dict of postcode -> (latitude, longitude). """ results = {} valid_postcodes = [ p.strip().upper() for p in postcodes if p and isinstance(p, str) and len(p.strip()) >= 5 ] valid_postcodes = list(set(valid_postcodes)) if not valid_postcodes: return results batch_size = 100 total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)): batch = valid_postcodes[batch_start : batch_start + batch_size] print( f" Geocoding batch {i + 1}/{total_batches} ({len(batch)} postcodes)..." ) try: response = requests.post( "https://api.postcodes.io/postcodes", json={"postcodes": batch}, timeout=30, ) if response.status_code == 200: data = response.json() for item in data.get("result", []): if item and item.get("result"): pc = item["query"].upper() lat = item["result"].get("latitude") lon = item["result"].get("longitude") if lat and lon: results[pc] = (lat, lon) except Exception as e: print(f" Warning: Geocoding batch failed: {e}") return results def load_csv_data(data_dir: Path) -> pd.DataFrame: """Load all CSV data from data directory.""" all_data = [] for folder in sorted(data_dir.iterdir()): if not folder.is_dir(): continue year = extract_year_from_folder(folder.name) if not year: continue # Specifically look for the KS2 results file ks2_file = folder / "england_ks2final.csv" if not ks2_file.exists(): continue csv_file = ks2_file print(f" Loading {csv_file.name} (year {year})...") try: df = pd.read_csv(csv_file, encoding="latin-1", low_memory=False) except Exception as e: print(f" Error loading {csv_file}: {e}") continue # Rename columns df.rename(columns=COLUMN_MAPPINGS, inplace=True) df["year"] = year # Handle local authority name la_name_cols = ["LANAME", "LA (name)", "LA_NAME", "LA NAME"] la_name_col = next((c for c in la_name_cols if c in df.columns), None) if la_name_col and la_name_col != "local_authority": df["local_authority"] = df[la_name_col] elif "LEA" in df.columns: df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") df["local_authority"] = ( df["local_authority_code"] .map(LA_CODE_TO_NAME) .fillna(df["LEA"].astype(str)) ) # Store LEA code if "LEA" in df.columns: df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") # Map school type if "school_type_code" in df.columns: df["school_type"] = ( df["school_type_code"] .map(SCHOOL_TYPE_MAP) .fillna(df["school_type_code"]) ) # Create combined address addr_parts = ["address1", "address2", "town", "postcode"] for col in addr_parts: if col not in df.columns: df[col] = None df["address"] = df.apply( lambda r: ", ".join( str(v) for v in [ r.get("address1"), r.get("address2"), r.get("town"), r.get("postcode"), ] if pd.notna(v) and str(v).strip() ), axis=1, ) all_data.append(df) print(f" Loaded {len(df)} records") if all_data: result = pd.concat(all_data, ignore_index=True) print(f"\nTotal records loaded: {len(result)}") print(f"Unique schools: {result['urn'].nunique()}") print(f"Years: {sorted(result['year'].unique())}") return result return pd.DataFrame() def migrate_data(df: pd.DataFrame, geocode: bool = False): """Migrate DataFrame data to database.""" # Clean URN column - convert to integer, drop invalid values df = df.copy() df["urn"] = pd.to_numeric(df["urn"], errors="coerce") df = df.dropna(subset=["urn"]) df["urn"] = df["urn"].astype(int) # Group by URN to get unique schools (use latest year's data) school_data = ( df.sort_values("year", ascending=False).groupby("urn").first().reset_index() ) print(f"\nMigrating {len(school_data)} unique schools...") # Geocode if requested geocoded = {} if geocode and "postcode" in df.columns: print("\nGeocoding postcodes...") postcodes = df["postcode"].dropna().unique().tolist() geocoded = geocode_postcodes_bulk(postcodes) print(f" Successfully geocoded {len(geocoded)} postcodes") with get_db_session() as db: # Create schools urn_to_school_id = {} schools_created = 0 for _, row in school_data.iterrows(): # Safely parse URN - handle None, NaN, whitespace, and invalid values urn_val = row.get("urn") urn = None if pd.notna(urn_val): try: urn_str = str(urn_val).strip() if urn_str: urn = int(float(urn_str)) # Handle "12345.0" format except (ValueError, TypeError): pass if not urn: continue # Skip if we've already added this URN (handles duplicates in source data) if urn in urn_to_school_id: continue # Get geocoding data postcode = row.get("postcode") lat, lon = None, None if postcode and pd.notna(postcode): coords = geocoded.get(str(postcode).strip().upper()) if coords: lat, lon = coords # Safely parse local_authority_code la_code = None la_code_val = row.get("local_authority_code") if pd.notna(la_code_val): try: la_code_str = str(la_code_val).strip() if la_code_str: la_code = int(float(la_code_str)) except (ValueError, TypeError): pass school = School( urn=urn, school_name=row.get("school_name") if pd.notna(row.get("school_name")) else "Unknown", local_authority=row.get("local_authority") if pd.notna(row.get("local_authority")) else None, local_authority_code=la_code, school_type=row.get("school_type") if pd.notna(row.get("school_type")) else None, school_type_code=row.get("school_type_code") if pd.notna(row.get("school_type_code")) else None, religious_denomination=row.get("religious_denomination") if pd.notna(row.get("religious_denomination")) else None, age_range=row.get("age_range") if pd.notna(row.get("age_range")) else None, address1=row.get("address1") if pd.notna(row.get("address1")) else None, address2=row.get("address2") if pd.notna(row.get("address2")) else None, town=row.get("town") if pd.notna(row.get("town")) else None, postcode=row.get("postcode") if pd.notna(row.get("postcode")) else None, latitude=lat, longitude=lon, ) db.add(school) db.flush() # Get the ID urn_to_school_id[urn] = school.id schools_created += 1 if schools_created % 1000 == 0: print(f" Created {schools_created} schools...") print(f" Created {schools_created} schools") # Create results print(f"\nMigrating {len(df)} yearly results...") results_created = 0 for _, row in df.iterrows(): # Safely parse URN urn_val = row.get("urn") urn = None if pd.notna(urn_val): try: urn_str = str(urn_val).strip() if urn_str: urn = int(float(urn_str)) except (ValueError, TypeError): pass if not urn or urn not in urn_to_school_id: continue school_id = urn_to_school_id[urn] # Safely parse year year_val = row.get("year") year = None if pd.notna(year_val): try: year = int(float(str(year_val).strip())) except (ValueError, TypeError): pass if not year: continue result = SchoolResult( school_id=school_id, year=year, total_pupils=parse_numeric(row.get("total_pupils")), eligible_pupils=parse_numeric(row.get("eligible_pupils")), # Expected Standard rwm_expected_pct=parse_numeric(row.get("rwm_expected_pct")), reading_expected_pct=parse_numeric(row.get("reading_expected_pct")), writing_expected_pct=parse_numeric(row.get("writing_expected_pct")), maths_expected_pct=parse_numeric(row.get("maths_expected_pct")), gps_expected_pct=parse_numeric(row.get("gps_expected_pct")), science_expected_pct=parse_numeric(row.get("science_expected_pct")), # Higher Standard rwm_high_pct=parse_numeric(row.get("rwm_high_pct")), reading_high_pct=parse_numeric(row.get("reading_high_pct")), writing_high_pct=parse_numeric(row.get("writing_high_pct")), maths_high_pct=parse_numeric(row.get("maths_high_pct")), gps_high_pct=parse_numeric(row.get("gps_high_pct")), # Progress reading_progress=parse_numeric(row.get("reading_progress")), writing_progress=parse_numeric(row.get("writing_progress")), maths_progress=parse_numeric(row.get("maths_progress")), # Averages reading_avg_score=parse_numeric(row.get("reading_avg_score")), maths_avg_score=parse_numeric(row.get("maths_avg_score")), gps_avg_score=parse_numeric(row.get("gps_avg_score")), # Context disadvantaged_pct=parse_numeric(row.get("disadvantaged_pct")), eal_pct=parse_numeric(row.get("eal_pct")), sen_support_pct=parse_numeric(row.get("sen_support_pct")), sen_ehcp_pct=parse_numeric(row.get("sen_ehcp_pct")), stability_pct=parse_numeric(row.get("stability_pct")), # Gender rwm_expected_boys_pct=parse_numeric(row.get("rwm_expected_boys_pct")), rwm_expected_girls_pct=parse_numeric(row.get("rwm_expected_girls_pct")), rwm_high_boys_pct=parse_numeric(row.get("rwm_high_boys_pct")), rwm_high_girls_pct=parse_numeric(row.get("rwm_high_girls_pct")), # Disadvantaged rwm_expected_disadvantaged_pct=parse_numeric( row.get("rwm_expected_disadvantaged_pct") ), rwm_expected_non_disadvantaged_pct=parse_numeric( row.get("rwm_expected_non_disadvantaged_pct") ), disadvantaged_gap=parse_numeric(row.get("disadvantaged_gap")), # 3-Year rwm_expected_3yr_pct=parse_numeric(row.get("rwm_expected_3yr_pct")), reading_avg_3yr=parse_numeric(row.get("reading_avg_3yr")), maths_avg_3yr=parse_numeric(row.get("maths_avg_3yr")), ) db.add(result) results_created += 1 if results_created % 10000 == 0: print(f" Created {results_created} results...") db.flush() print(f" Created {results_created} results") # Commit all changes db.commit() print("\nMigration complete!") def main(): parser = argparse.ArgumentParser( description="Migrate CSV data to PostgreSQL database" ) parser.add_argument( "--drop", action="store_true", help="Drop existing tables before migration" ) parser.add_argument("--geocode", action="store_true", help="Geocode postcodes") args = parser.parse_args() print("=" * 60) print("School Data Migration: CSV -> PostgreSQL") print("=" * 60) print(f"\nDatabase: {settings.database_url.split('@')[-1]}") print(f"Data directory: {settings.data_dir}") if args.drop: print("\n⚠️ Dropping existing tables...") Base.metadata.drop_all(bind=engine) print("\nCreating tables...") Base.metadata.create_all(bind=engine) print("\nLoading CSV data...") df = load_csv_data(settings.data_dir) if df.empty: print("No data found to migrate!") return 1 migrate_data(df, geocode=args.geocode) return 0 if __name__ == "__main__": sys.exit(main())