From e2b2ddfb66aa2913d47b537723d9b29018055a7d Mon Sep 17 00:00:00 2001 From: Tudor Sitaru Date: Tue, 6 Jan 2026 22:22:33 +0000 Subject: [PATCH] Fix migration script to handle percentage signs in CSV data - Updated parse_numeric() to strip percentage signs before parsing - This fixes the issue where all percentage metrics were showing as NULL/empty - School cards will now display actual performance data after re-running migration --- scripts/migrate_csv_to_db.py | 339 ++++++++++++++++++++--------------- 1 file changed, 198 insertions(+), 141 deletions(-) diff --git a/scripts/migrate_csv_to_db.py b/scripts/migrate_csv_to_db.py index 921ef66..cd005f9 100644 --- a/scripts/migrate_csv_to_db.py +++ b/scripts/migrate_csv_to_db.py @@ -10,23 +10,28 @@ Options: --geocode Geocode postcodes (requires network access) """ -import sys import os +import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) import argparse -import pandas as pd -import numpy as np import re -from typing import Optional, Dict +from typing import Dict, Optional + +import numpy as np +import pandas as pd import requests from backend.config import settings -from backend.database import engine, Base, get_db_session -from backend.models import School, SchoolResult, SCHOOL_FIELD_MAPPING, RESULT_FIELD_MAPPING +from backend.models import ( + School, + SchoolResult, + SCHOOL_FIELD_MAPPING, + RESULT_FIELD_MAPPING, +) from backend.schemas import ( COLUMN_MAPPINGS, NUMERIC_COLUMNS, @@ -43,8 +48,10 @@ def parse_numeric(value) -> Optional[float]: if isinstance(value, (int, float)): return float(value) if not np.isnan(value) else None str_val = str(value).strip().upper() - if str_val in NULL_VALUES or str_val == '': + if str_val in NULL_VALUES or str_val == "": return None + # Remove percentage signs if present + str_val = str_val.replace("%", "") try: return float(str_val) except ValueError: @@ -53,10 +60,10 @@ def parse_numeric(value) -> Optional[float]: def extract_year_from_folder(folder_name: str) -> Optional[int]: """Extract year from folder name like '2023-2024'.""" - match = re.search(r'(\d{4})-(\d{4})', folder_name) + match = re.search(r"(\d{4})-(\d{4})", folder_name) if match: return int(match.group(2)) - match = re.search(r'(\d{4})', folder_name) + match = re.search(r"(\d{4})", folder_name) if match: return int(match.group(1)) return None @@ -68,141 +75,166 @@ def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]: Returns dict of postcode -> (latitude, longitude). """ results = {} - valid_postcodes = [p.strip().upper() for p in postcodes if p and isinstance(p, str) and len(p.strip()) >= 5] + valid_postcodes = [ + p.strip().upper() + for p in postcodes + if p and isinstance(p, str) and len(p.strip()) >= 5 + ] valid_postcodes = list(set(valid_postcodes)) - + if not valid_postcodes: return results - + batch_size = 100 total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size - + for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)): - batch = valid_postcodes[batch_start:batch_start + batch_size] - print(f" Geocoding batch {i+1}/{total_batches} ({len(batch)} postcodes)...") - + batch = valid_postcodes[batch_start : batch_start + batch_size] + print( + f" Geocoding batch {i + 1}/{total_batches} ({len(batch)} postcodes)..." + ) + try: response = requests.post( - 'https://api.postcodes.io/postcodes', - json={'postcodes': batch}, - timeout=30 + "https://api.postcodes.io/postcodes", + json={"postcodes": batch}, + timeout=30, ) if response.status_code == 200: data = response.json() - for item in data.get('result', []): - if item and item.get('result'): - pc = item['query'].upper() - lat = item['result'].get('latitude') - lon = item['result'].get('longitude') + for item in data.get("result", []): + if item and item.get("result"): + pc = item["query"].upper() + lat = item["result"].get("latitude") + lon = item["result"].get("longitude") if lat and lon: results[pc] = (lat, lon) except Exception as e: print(f" Warning: Geocoding batch failed: {e}") - + return results def load_csv_data(data_dir: Path) -> pd.DataFrame: """Load all CSV data from data directory.""" all_data = [] - + for folder in sorted(data_dir.iterdir()): if not folder.is_dir(): continue - + year = extract_year_from_folder(folder.name) if not year: continue - + # Specifically look for the KS2 results file ks2_file = folder / "england_ks2final.csv" if not ks2_file.exists(): continue - + csv_file = ks2_file print(f" Loading {csv_file.name} (year {year})...") - + try: - df = pd.read_csv(csv_file, encoding='latin-1', low_memory=False) + df = pd.read_csv(csv_file, encoding="latin-1", low_memory=False) except Exception as e: print(f" Error loading {csv_file}: {e}") continue - + # Rename columns df.rename(columns=COLUMN_MAPPINGS, inplace=True) - df['year'] = year - + df["year"] = year + # Handle local authority name - la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME'] + la_name_cols = ["LANAME", "LA (name)", "LA_NAME", "LA NAME"] la_name_col = next((c for c in la_name_cols if c in df.columns), None) - - if la_name_col and la_name_col != 'local_authority': - df['local_authority'] = df[la_name_col] - elif 'LEA' in df.columns: - df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce') - df['local_authority'] = df['local_authority_code'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str)) - + + if la_name_col and la_name_col != "local_authority": + df["local_authority"] = df[la_name_col] + elif "LEA" in df.columns: + df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") + df["local_authority"] = ( + df["local_authority_code"] + .map(LA_CODE_TO_NAME) + .fillna(df["LEA"].astype(str)) + ) + # Store LEA code - if 'LEA' in df.columns: - df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce') - + if "LEA" in df.columns: + df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") + # Map school type - if 'school_type_code' in df.columns: - df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna(df['school_type_code']) - + if "school_type_code" in df.columns: + df["school_type"] = ( + df["school_type_code"] + .map(SCHOOL_TYPE_MAP) + .fillna(df["school_type_code"]) + ) + # Create combined address - addr_parts = ['address1', 'address2', 'town', 'postcode'] + addr_parts = ["address1", "address2", "town", "postcode"] for col in addr_parts: if col not in df.columns: df[col] = None - - df['address'] = df.apply( - lambda r: ', '.join(str(v) for v in [r.get('address1'), r.get('address2'), r.get('town'), r.get('postcode')] if pd.notna(v) and str(v).strip()), - axis=1 + + df["address"] = df.apply( + lambda r: ", ".join( + str(v) + for v in [ + r.get("address1"), + r.get("address2"), + r.get("town"), + r.get("postcode"), + ] + if pd.notna(v) and str(v).strip() + ), + axis=1, ) - + all_data.append(df) print(f" Loaded {len(df)} records") - + if all_data: result = pd.concat(all_data, ignore_index=True) print(f"\nTotal records loaded: {len(result)}") print(f"Unique schools: {result['urn'].nunique()}") print(f"Years: {sorted(result['year'].unique())}") return result - + return pd.DataFrame() def migrate_data(df: pd.DataFrame, geocode: bool = False): """Migrate DataFrame data to database.""" - + # Clean URN column - convert to integer, drop invalid values df = df.copy() - df['urn'] = pd.to_numeric(df['urn'], errors='coerce') - df = df.dropna(subset=['urn']) - df['urn'] = df['urn'].astype(int) - + df["urn"] = pd.to_numeric(df["urn"], errors="coerce") + df = df.dropna(subset=["urn"]) + df["urn"] = df["urn"].astype(int) + # Group by URN to get unique schools (use latest year's data) - school_data = df.sort_values('year', ascending=False).groupby('urn').first().reset_index() + school_data = ( + df.sort_values("year", ascending=False).groupby("urn").first().reset_index() + ) print(f"\nMigrating {len(school_data)} unique schools...") - + # Geocode if requested geocoded = {} - if geocode and 'postcode' in df.columns: + if geocode and "postcode" in df.columns: print("\nGeocoding postcodes...") - postcodes = df['postcode'].dropna().unique().tolist() + postcodes = df["postcode"].dropna().unique().tolist() geocoded = geocode_postcodes_bulk(postcodes) print(f" Successfully geocoded {len(geocoded)} postcodes") - + with get_db_session() as db: # Create schools urn_to_school_id = {} schools_created = 0 - + for _, row in school_data.iterrows(): # Safely parse URN - handle None, NaN, whitespace, and invalid values - urn_val = row.get('urn') + urn_val = row.get("urn") urn = None if pd.notna(urn_val): try: @@ -213,22 +245,22 @@ def migrate_data(df: pd.DataFrame, geocode: bool = False): pass if not urn: continue - + # Skip if we've already added this URN (handles duplicates in source data) if urn in urn_to_school_id: continue - + # Get geocoding data - postcode = row.get('postcode') + postcode = row.get("postcode") lat, lon = None, None if postcode and pd.notna(postcode): coords = geocoded.get(str(postcode).strip().upper()) if coords: lat, lon = coords - + # Safely parse local_authority_code la_code = None - la_code_val = row.get('local_authority_code') + la_code_val = row.get("local_authority_code") if pd.notna(la_code_val): try: la_code_str = str(la_code_val).strip() @@ -236,20 +268,32 @@ def migrate_data(df: pd.DataFrame, geocode: bool = False): la_code = int(float(la_code_str)) except (ValueError, TypeError): pass - + school = School( urn=urn, - school_name=row.get('school_name') if pd.notna(row.get('school_name')) else 'Unknown', - local_authority=row.get('local_authority') if pd.notna(row.get('local_authority')) else None, + school_name=row.get("school_name") + if pd.notna(row.get("school_name")) + else "Unknown", + local_authority=row.get("local_authority") + if pd.notna(row.get("local_authority")) + else None, local_authority_code=la_code, - school_type=row.get('school_type') if pd.notna(row.get('school_type')) else None, - school_type_code=row.get('school_type_code') if pd.notna(row.get('school_type_code')) else None, - religious_denomination=row.get('religious_denomination') if pd.notna(row.get('religious_denomination')) else None, - age_range=row.get('age_range') if pd.notna(row.get('age_range')) else None, - address1=row.get('address1') if pd.notna(row.get('address1')) else None, - address2=row.get('address2') if pd.notna(row.get('address2')) else None, - town=row.get('town') if pd.notna(row.get('town')) else None, - postcode=row.get('postcode') if pd.notna(row.get('postcode')) else None, + school_type=row.get("school_type") + if pd.notna(row.get("school_type")) + else None, + school_type_code=row.get("school_type_code") + if pd.notna(row.get("school_type_code")) + else None, + religious_denomination=row.get("religious_denomination") + if pd.notna(row.get("religious_denomination")) + else None, + age_range=row.get("age_range") + if pd.notna(row.get("age_range")) + else None, + address1=row.get("address1") if pd.notna(row.get("address1")) else None, + address2=row.get("address2") if pd.notna(row.get("address2")) else None, + town=row.get("town") if pd.notna(row.get("town")) else None, + postcode=row.get("postcode") if pd.notna(row.get("postcode")) else None, latitude=lat, longitude=lon, ) @@ -257,19 +301,19 @@ def migrate_data(df: pd.DataFrame, geocode: bool = False): db.flush() # Get the ID urn_to_school_id[urn] = school.id schools_created += 1 - + if schools_created % 1000 == 0: print(f" Created {schools_created} schools...") - + print(f" Created {schools_created} schools") - + # Create results print(f"\nMigrating {len(df)} yearly results...") results_created = 0 - + for _, row in df.iterrows(): # Safely parse URN - urn_val = row.get('urn') + urn_val = row.get("urn") urn = None if pd.notna(urn_val): try: @@ -280,11 +324,11 @@ def migrate_data(df: pd.DataFrame, geocode: bool = False): pass if not urn or urn not in urn_to_school_id: continue - + school_id = urn_to_school_id[urn] - + # Safely parse year - year_val = row.get('year') + year_val = row.get("year") year = None if pd.notna(year_val): try: @@ -293,98 +337,111 @@ def migrate_data(df: pd.DataFrame, geocode: bool = False): pass if not year: continue - + result = SchoolResult( school_id=school_id, year=year, - total_pupils=parse_numeric(row.get('total_pupils')), - eligible_pupils=parse_numeric(row.get('eligible_pupils')), + total_pupils=parse_numeric(row.get("total_pupils")), + eligible_pupils=parse_numeric(row.get("eligible_pupils")), # Expected Standard - rwm_expected_pct=parse_numeric(row.get('rwm_expected_pct')), - reading_expected_pct=parse_numeric(row.get('reading_expected_pct')), - writing_expected_pct=parse_numeric(row.get('writing_expected_pct')), - maths_expected_pct=parse_numeric(row.get('maths_expected_pct')), - gps_expected_pct=parse_numeric(row.get('gps_expected_pct')), - science_expected_pct=parse_numeric(row.get('science_expected_pct')), + rwm_expected_pct=parse_numeric(row.get("rwm_expected_pct")), + reading_expected_pct=parse_numeric(row.get("reading_expected_pct")), + writing_expected_pct=parse_numeric(row.get("writing_expected_pct")), + maths_expected_pct=parse_numeric(row.get("maths_expected_pct")), + gps_expected_pct=parse_numeric(row.get("gps_expected_pct")), + science_expected_pct=parse_numeric(row.get("science_expected_pct")), # Higher Standard - rwm_high_pct=parse_numeric(row.get('rwm_high_pct')), - reading_high_pct=parse_numeric(row.get('reading_high_pct')), - writing_high_pct=parse_numeric(row.get('writing_high_pct')), - maths_high_pct=parse_numeric(row.get('maths_high_pct')), - gps_high_pct=parse_numeric(row.get('gps_high_pct')), + rwm_high_pct=parse_numeric(row.get("rwm_high_pct")), + reading_high_pct=parse_numeric(row.get("reading_high_pct")), + writing_high_pct=parse_numeric(row.get("writing_high_pct")), + maths_high_pct=parse_numeric(row.get("maths_high_pct")), + gps_high_pct=parse_numeric(row.get("gps_high_pct")), # Progress - reading_progress=parse_numeric(row.get('reading_progress')), - writing_progress=parse_numeric(row.get('writing_progress')), - maths_progress=parse_numeric(row.get('maths_progress')), + reading_progress=parse_numeric(row.get("reading_progress")), + writing_progress=parse_numeric(row.get("writing_progress")), + maths_progress=parse_numeric(row.get("maths_progress")), # Averages - reading_avg_score=parse_numeric(row.get('reading_avg_score')), - maths_avg_score=parse_numeric(row.get('maths_avg_score')), - gps_avg_score=parse_numeric(row.get('gps_avg_score')), + reading_avg_score=parse_numeric(row.get("reading_avg_score")), + maths_avg_score=parse_numeric(row.get("maths_avg_score")), + gps_avg_score=parse_numeric(row.get("gps_avg_score")), # Context - disadvantaged_pct=parse_numeric(row.get('disadvantaged_pct')), - eal_pct=parse_numeric(row.get('eal_pct')), - sen_support_pct=parse_numeric(row.get('sen_support_pct')), - sen_ehcp_pct=parse_numeric(row.get('sen_ehcp_pct')), - stability_pct=parse_numeric(row.get('stability_pct')), + disadvantaged_pct=parse_numeric(row.get("disadvantaged_pct")), + eal_pct=parse_numeric(row.get("eal_pct")), + sen_support_pct=parse_numeric(row.get("sen_support_pct")), + sen_ehcp_pct=parse_numeric(row.get("sen_ehcp_pct")), + stability_pct=parse_numeric(row.get("stability_pct")), # Gender - rwm_expected_boys_pct=parse_numeric(row.get('rwm_expected_boys_pct')), - rwm_expected_girls_pct=parse_numeric(row.get('rwm_expected_girls_pct')), - rwm_high_boys_pct=parse_numeric(row.get('rwm_high_boys_pct')), - rwm_high_girls_pct=parse_numeric(row.get('rwm_high_girls_pct')), + rwm_expected_boys_pct=parse_numeric(row.get("rwm_expected_boys_pct")), + rwm_expected_girls_pct=parse_numeric(row.get("rwm_expected_girls_pct")), + rwm_high_boys_pct=parse_numeric(row.get("rwm_high_boys_pct")), + rwm_high_girls_pct=parse_numeric(row.get("rwm_high_girls_pct")), # Disadvantaged - rwm_expected_disadvantaged_pct=parse_numeric(row.get('rwm_expected_disadvantaged_pct')), - rwm_expected_non_disadvantaged_pct=parse_numeric(row.get('rwm_expected_non_disadvantaged_pct')), - disadvantaged_gap=parse_numeric(row.get('disadvantaged_gap')), + rwm_expected_disadvantaged_pct=parse_numeric( + row.get("rwm_expected_disadvantaged_pct") + ), + rwm_expected_non_disadvantaged_pct=parse_numeric( + row.get("rwm_expected_non_disadvantaged_pct") + ), + disadvantaged_gap=parse_numeric(row.get("disadvantaged_gap")), # 3-Year - rwm_expected_3yr_pct=parse_numeric(row.get('rwm_expected_3yr_pct')), - reading_avg_3yr=parse_numeric(row.get('reading_avg_3yr')), - maths_avg_3yr=parse_numeric(row.get('maths_avg_3yr')), + rwm_expected_3yr_pct=parse_numeric(row.get("rwm_expected_3yr_pct")), + reading_avg_3yr=parse_numeric(row.get("reading_avg_3yr")), + maths_avg_3yr=parse_numeric(row.get("maths_avg_3yr")), ) db.add(result) results_created += 1 - + if results_created % 10000 == 0: print(f" Created {results_created} results...") db.flush() - + print(f" Created {results_created} results") - + # Commit all changes db.commit() print("\nMigration complete!") def main(): - parser = argparse.ArgumentParser(description='Migrate CSV data to PostgreSQL database') - parser.add_argument('--drop', action='store_true', help='Drop existing tables before migration') - parser.add_argument('--geocode', action='store_true', help='Geocode postcodes') + parser = argparse.ArgumentParser( + description="Migrate CSV data to PostgreSQL database" + ) + parser.add_argument( + "--drop", action="store_true", help="Drop existing tables before migration" + ) + parser.add_argument("--geocode", action="store_true", help="Geocode postcodes") args = parser.parse_args() - + print("=" * 60) print("School Data Migration: CSV -> PostgreSQL") print("=" * 60) print(f"\nDatabase: {settings.database_url.split('@')[-1]}") print(f"Data directory: {settings.data_dir}") - + if args.drop: print("\n⚠️ Dropping existing tables...") Base.metadata.drop_all(bind=engine) - + print("\nCreating tables...") Base.metadata.create_all(bind=engine) - + print("\nLoading CSV data...") df = load_csv_data(settings.data_dir) - + if df.empty: print("No data found to migrate!") return 1 - + migrate_data(df, geocode=args.geocode) - + + return 0 + + +if __name__ == "__main__": + migrate_data(df, geocode=args.geocode) + return 0 if __name__ == '__main__': sys.exit(main()) -