diff --git a/scripts/migrate_csv_to_db.py b/scripts/migrate_csv_to_db.py new file mode 100644 index 0000000..cade220 --- /dev/null +++ b/scripts/migrate_csv_to_db.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 +""" +Migration script to import CSV data into PostgreSQL database. + +Usage: + python scripts/migrate_csv_to_db.py [--drop] [--geocode] + +Options: + --drop Drop existing tables before migration + --geocode Geocode postcodes (requires network access) +""" + +import sys +import os +from pathlib import Path + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import argparse +import pandas as pd +import numpy as np +import re +from typing import Optional, Dict +import requests + +from backend.config import settings +from backend.database import engine, Base, get_db_session +from backend.models import School, SchoolResult, SCHOOL_FIELD_MAPPING, RESULT_FIELD_MAPPING +from backend.schemas import ( + COLUMN_MAPPINGS, + NUMERIC_COLUMNS, + SCHOOL_TYPE_MAP, + NULL_VALUES, + LA_CODE_TO_NAME, +) + + +def parse_numeric(value) -> Optional[float]: + """Parse a numeric value, handling special cases.""" + if pd.isna(value): + return None + if isinstance(value, (int, float)): + return float(value) if not np.isnan(value) else None + str_val = str(value).strip().upper() + if str_val in NULL_VALUES or str_val == '': + return None + try: + return float(str_val) + except ValueError: + return None + + +def extract_year_from_folder(folder_name: str) -> Optional[int]: + """Extract year from folder name like '2023-2024'.""" + match = re.search(r'(\d{4})-(\d{4})', folder_name) + if match: + return int(match.group(2)) + match = re.search(r'(\d{4})', folder_name) + if match: + return int(match.group(1)) + return None + + +def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]: + """ + Geocode postcodes in bulk using postcodes.io API. + Returns dict of postcode -> (latitude, longitude). + """ + results = {} + valid_postcodes = [p.strip().upper() for p in postcodes if p and isinstance(p, str) and len(p.strip()) >= 5] + valid_postcodes = list(set(valid_postcodes)) + + if not valid_postcodes: + return results + + batch_size = 100 + total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size + + for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)): + batch = valid_postcodes[batch_start:batch_start + batch_size] + print(f" Geocoding batch {i+1}/{total_batches} ({len(batch)} postcodes)...") + + try: + response = requests.post( + 'https://api.postcodes.io/postcodes', + json={'postcodes': batch}, + timeout=30 + ) + if response.status_code == 200: + data = response.json() + for item in data.get('result', []): + if item and item.get('result'): + pc = item['query'].upper() + lat = item['result'].get('latitude') + lon = item['result'].get('longitude') + if lat and lon: + results[pc] = (lat, lon) + except Exception as e: + print(f" Warning: Geocoding batch failed: {e}") + + return results + + +def load_csv_data(data_dir: Path) -> pd.DataFrame: + """Load all CSV data from data directory.""" + all_data = [] + + for folder in sorted(data_dir.iterdir()): + if not folder.is_dir(): + continue + + year = extract_year_from_folder(folder.name) + if not year: + continue + + csv_files = list(folder.glob("*.csv")) + if not csv_files: + continue + + csv_file = csv_files[0] + print(f" Loading {csv_file.name} (year {year})...") + + try: + df = pd.read_csv(csv_file, encoding='latin-1', low_memory=False) + except Exception as e: + print(f" Error loading {csv_file}: {e}") + continue + + # Rename columns + df.rename(columns=COLUMN_MAPPINGS, inplace=True) + df['year'] = year + + # Handle local authority name + la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME'] + la_name_col = next((c for c in la_name_cols if c in df.columns), None) + + if la_name_col and la_name_col != 'local_authority': + df['local_authority'] = df[la_name_col] + elif 'LEA' in df.columns: + df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce') + df['local_authority'] = df['local_authority_code'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str)) + + # Store LEA code + if 'LEA' in df.columns: + df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce') + + # Map school type + if 'school_type_code' in df.columns: + df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna(df['school_type_code']) + + # Create combined address + addr_parts = ['address1', 'address2', 'town', 'postcode'] + for col in addr_parts: + if col not in df.columns: + df[col] = None + + df['address'] = df.apply( + lambda r: ', '.join(str(v) for v in [r.get('address1'), r.get('address2'), r.get('town'), r.get('postcode')] if pd.notna(v) and str(v).strip()), + axis=1 + ) + + all_data.append(df) + print(f" Loaded {len(df)} records") + + if all_data: + result = pd.concat(all_data, ignore_index=True) + print(f"\nTotal records loaded: {len(result)}") + print(f"Unique schools: {result['urn'].nunique()}") + print(f"Years: {sorted(result['year'].unique())}") + return result + + return pd.DataFrame() + + +def migrate_data(df: pd.DataFrame, geocode: bool = False): + """Migrate DataFrame data to database.""" + + # Group by URN to get unique schools + school_data = df.groupby('urn').first().reset_index() + print(f"\nMigrating {len(school_data)} unique schools...") + + # Geocode if requested + geocoded = {} + if geocode and 'postcode' in df.columns: + print("\nGeocoding postcodes...") + postcodes = df['postcode'].dropna().unique().tolist() + geocoded = geocode_postcodes_bulk(postcodes) + print(f" Successfully geocoded {len(geocoded)} postcodes") + + with get_db_session() as db: + # Create schools + urn_to_school_id = {} + schools_created = 0 + + for _, row in school_data.iterrows(): + urn = int(row['urn']) if pd.notna(row.get('urn')) else None + if not urn: + continue + + # Get geocoding data + postcode = row.get('postcode') + lat, lon = None, None + if postcode and pd.notna(postcode): + coords = geocoded.get(str(postcode).strip().upper()) + if coords: + lat, lon = coords + + school = School( + urn=urn, + school_name=row.get('school_name') if pd.notna(row.get('school_name')) else 'Unknown', + local_authority=row.get('local_authority') if pd.notna(row.get('local_authority')) else None, + local_authority_code=int(row.get('local_authority_code')) if pd.notna(row.get('local_authority_code')) else None, + school_type=row.get('school_type') if pd.notna(row.get('school_type')) else None, + school_type_code=row.get('school_type_code') if pd.notna(row.get('school_type_code')) else None, + religious_denomination=row.get('religious_denomination') if pd.notna(row.get('religious_denomination')) else None, + age_range=row.get('age_range') if pd.notna(row.get('age_range')) else None, + address1=row.get('address1') if pd.notna(row.get('address1')) else None, + address2=row.get('address2') if pd.notna(row.get('address2')) else None, + town=row.get('town') if pd.notna(row.get('town')) else None, + postcode=row.get('postcode') if pd.notna(row.get('postcode')) else None, + latitude=lat, + longitude=lon, + ) + db.add(school) + db.flush() # Get the ID + urn_to_school_id[urn] = school.id + schools_created += 1 + + if schools_created % 1000 == 0: + print(f" Created {schools_created} schools...") + + print(f" Created {schools_created} schools") + + # Create results + print(f"\nMigrating {len(df)} yearly results...") + results_created = 0 + + for _, row in df.iterrows(): + urn = int(row['urn']) if pd.notna(row.get('urn')) else None + if not urn or urn not in urn_to_school_id: + continue + + school_id = urn_to_school_id[urn] + year = int(row['year']) if pd.notna(row.get('year')) else None + if not year: + continue + + result = SchoolResult( + school_id=school_id, + year=year, + total_pupils=parse_numeric(row.get('total_pupils')), + eligible_pupils=parse_numeric(row.get('eligible_pupils')), + # Expected Standard + rwm_expected_pct=parse_numeric(row.get('rwm_expected_pct')), + reading_expected_pct=parse_numeric(row.get('reading_expected_pct')), + writing_expected_pct=parse_numeric(row.get('writing_expected_pct')), + maths_expected_pct=parse_numeric(row.get('maths_expected_pct')), + gps_expected_pct=parse_numeric(row.get('gps_expected_pct')), + science_expected_pct=parse_numeric(row.get('science_expected_pct')), + # Higher Standard + rwm_high_pct=parse_numeric(row.get('rwm_high_pct')), + reading_high_pct=parse_numeric(row.get('reading_high_pct')), + writing_high_pct=parse_numeric(row.get('writing_high_pct')), + maths_high_pct=parse_numeric(row.get('maths_high_pct')), + gps_high_pct=parse_numeric(row.get('gps_high_pct')), + # Progress + reading_progress=parse_numeric(row.get('reading_progress')), + writing_progress=parse_numeric(row.get('writing_progress')), + maths_progress=parse_numeric(row.get('maths_progress')), + # Averages + reading_avg_score=parse_numeric(row.get('reading_avg_score')), + maths_avg_score=parse_numeric(row.get('maths_avg_score')), + gps_avg_score=parse_numeric(row.get('gps_avg_score')), + # Context + disadvantaged_pct=parse_numeric(row.get('disadvantaged_pct')), + eal_pct=parse_numeric(row.get('eal_pct')), + sen_support_pct=parse_numeric(row.get('sen_support_pct')), + sen_ehcp_pct=parse_numeric(row.get('sen_ehcp_pct')), + stability_pct=parse_numeric(row.get('stability_pct')), + # Gender + rwm_expected_boys_pct=parse_numeric(row.get('rwm_expected_boys_pct')), + rwm_expected_girls_pct=parse_numeric(row.get('rwm_expected_girls_pct')), + rwm_high_boys_pct=parse_numeric(row.get('rwm_high_boys_pct')), + rwm_high_girls_pct=parse_numeric(row.get('rwm_high_girls_pct')), + # Disadvantaged + rwm_expected_disadvantaged_pct=parse_numeric(row.get('rwm_expected_disadvantaged_pct')), + rwm_expected_non_disadvantaged_pct=parse_numeric(row.get('rwm_expected_non_disadvantaged_pct')), + disadvantaged_gap=parse_numeric(row.get('disadvantaged_gap')), + # 3-Year + rwm_expected_3yr_pct=parse_numeric(row.get('rwm_expected_3yr_pct')), + reading_avg_3yr=parse_numeric(row.get('reading_avg_3yr')), + maths_avg_3yr=parse_numeric(row.get('maths_avg_3yr')), + ) + db.add(result) + results_created += 1 + + if results_created % 10000 == 0: + print(f" Created {results_created} results...") + db.flush() + + print(f" Created {results_created} results") + + # Commit all changes + db.commit() + print("\nMigration complete!") + + +def main(): + parser = argparse.ArgumentParser(description='Migrate CSV data to PostgreSQL database') + parser.add_argument('--drop', action='store_true', help='Drop existing tables before migration') + parser.add_argument('--geocode', action='store_true', help='Geocode postcodes') + args = parser.parse_args() + + print("=" * 60) + print("School Data Migration: CSV -> PostgreSQL") + print("=" * 60) + print(f"\nDatabase: {settings.database_url.split('@')[-1]}") + print(f"Data directory: {settings.data_dir}") + + if args.drop: + print("\n⚠️ Dropping existing tables...") + Base.metadata.drop_all(bind=engine) + + print("\nCreating tables...") + Base.metadata.create_all(bind=engine) + + print("\nLoading CSV data...") + df = load_csv_data(settings.data_dir) + + if df.empty: + print("No data found to migrate!") + return 1 + + migrate_data(df, geocode=args.geocode) + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) +