Files
school_compare/scripts/migrate_csv_to_db.py

344 lines
13 KiB
Python
Raw Normal View History

2026-01-06 17:22:26 +00:00
#!/usr/bin/env python3
"""
Migration script to import CSV data into PostgreSQL database.
Usage:
python scripts/migrate_csv_to_db.py [--drop] [--geocode]
Options:
--drop Drop existing tables before migration
--geocode Geocode postcodes (requires network access)
"""
import sys
import os
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
import argparse
import pandas as pd
import numpy as np
import re
from typing import Optional, Dict
import requests
from backend.config import settings
from backend.database import engine, Base, get_db_session
from backend.models import School, SchoolResult, SCHOOL_FIELD_MAPPING, RESULT_FIELD_MAPPING
from backend.schemas import (
COLUMN_MAPPINGS,
NUMERIC_COLUMNS,
SCHOOL_TYPE_MAP,
NULL_VALUES,
LA_CODE_TO_NAME,
)
def parse_numeric(value) -> Optional[float]:
"""Parse a numeric value, handling special cases."""
if pd.isna(value):
return None
if isinstance(value, (int, float)):
return float(value) if not np.isnan(value) else None
str_val = str(value).strip().upper()
if str_val in NULL_VALUES or str_val == '':
return None
try:
return float(str_val)
except ValueError:
return None
def extract_year_from_folder(folder_name: str) -> Optional[int]:
"""Extract year from folder name like '2023-2024'."""
match = re.search(r'(\d{4})-(\d{4})', folder_name)
if match:
return int(match.group(2))
match = re.search(r'(\d{4})', folder_name)
if match:
return int(match.group(1))
return None
def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]:
"""
Geocode postcodes in bulk using postcodes.io API.
Returns dict of postcode -> (latitude, longitude).
"""
results = {}
valid_postcodes = [p.strip().upper() for p in postcodes if p and isinstance(p, str) and len(p.strip()) >= 5]
valid_postcodes = list(set(valid_postcodes))
if not valid_postcodes:
return results
batch_size = 100
total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size
for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)):
batch = valid_postcodes[batch_start:batch_start + batch_size]
print(f" Geocoding batch {i+1}/{total_batches} ({len(batch)} postcodes)...")
try:
response = requests.post(
'https://api.postcodes.io/postcodes',
json={'postcodes': batch},
timeout=30
)
if response.status_code == 200:
data = response.json()
for item in data.get('result', []):
if item and item.get('result'):
pc = item['query'].upper()
lat = item['result'].get('latitude')
lon = item['result'].get('longitude')
if lat and lon:
results[pc] = (lat, lon)
except Exception as e:
print(f" Warning: Geocoding batch failed: {e}")
return results
def load_csv_data(data_dir: Path) -> pd.DataFrame:
"""Load all CSV data from data directory."""
all_data = []
for folder in sorted(data_dir.iterdir()):
if not folder.is_dir():
continue
year = extract_year_from_folder(folder.name)
if not year:
continue
2026-01-06 20:56:09 +00:00
# Specifically look for the KS2 results file
ks2_file = folder / "england_ks2final.csv"
if not ks2_file.exists():
2026-01-06 17:22:26 +00:00
continue
2026-01-06 20:56:09 +00:00
csv_file = ks2_file
2026-01-06 17:22:26 +00:00
print(f" Loading {csv_file.name} (year {year})...")
try:
df = pd.read_csv(csv_file, encoding='latin-1', low_memory=False)
except Exception as e:
print(f" Error loading {csv_file}: {e}")
continue
# Rename columns
df.rename(columns=COLUMN_MAPPINGS, inplace=True)
df['year'] = year
# Handle local authority name
la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME']
la_name_col = next((c for c in la_name_cols if c in df.columns), None)
if la_name_col and la_name_col != 'local_authority':
df['local_authority'] = df[la_name_col]
elif 'LEA' in df.columns:
df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce')
df['local_authority'] = df['local_authority_code'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str))
# Store LEA code
if 'LEA' in df.columns:
df['local_authority_code'] = pd.to_numeric(df['LEA'], errors='coerce')
# Map school type
if 'school_type_code' in df.columns:
df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna(df['school_type_code'])
# Create combined address
addr_parts = ['address1', 'address2', 'town', 'postcode']
for col in addr_parts:
if col not in df.columns:
df[col] = None
df['address'] = df.apply(
lambda r: ', '.join(str(v) for v in [r.get('address1'), r.get('address2'), r.get('town'), r.get('postcode')] if pd.notna(v) and str(v).strip()),
axis=1
)
all_data.append(df)
print(f" Loaded {len(df)} records")
if all_data:
result = pd.concat(all_data, ignore_index=True)
print(f"\nTotal records loaded: {len(result)}")
print(f"Unique schools: {result['urn'].nunique()}")
print(f"Years: {sorted(result['year'].unique())}")
return result
return pd.DataFrame()
def migrate_data(df: pd.DataFrame, geocode: bool = False):
"""Migrate DataFrame data to database."""
# Group by URN to get unique schools
school_data = df.groupby('urn').first().reset_index()
print(f"\nMigrating {len(school_data)} unique schools...")
# Geocode if requested
geocoded = {}
if geocode and 'postcode' in df.columns:
print("\nGeocoding postcodes...")
postcodes = df['postcode'].dropna().unique().tolist()
geocoded = geocode_postcodes_bulk(postcodes)
print(f" Successfully geocoded {len(geocoded)} postcodes")
with get_db_session() as db:
# Create schools
urn_to_school_id = {}
schools_created = 0
for _, row in school_data.iterrows():
urn = int(row['urn']) if pd.notna(row.get('urn')) else None
if not urn:
continue
# Get geocoding data
postcode = row.get('postcode')
lat, lon = None, None
if postcode and pd.notna(postcode):
coords = geocoded.get(str(postcode).strip().upper())
if coords:
lat, lon = coords
school = School(
urn=urn,
school_name=row.get('school_name') if pd.notna(row.get('school_name')) else 'Unknown',
local_authority=row.get('local_authority') if pd.notna(row.get('local_authority')) else None,
local_authority_code=int(row.get('local_authority_code')) if pd.notna(row.get('local_authority_code')) else None,
school_type=row.get('school_type') if pd.notna(row.get('school_type')) else None,
school_type_code=row.get('school_type_code') if pd.notna(row.get('school_type_code')) else None,
religious_denomination=row.get('religious_denomination') if pd.notna(row.get('religious_denomination')) else None,
age_range=row.get('age_range') if pd.notna(row.get('age_range')) else None,
address1=row.get('address1') if pd.notna(row.get('address1')) else None,
address2=row.get('address2') if pd.notna(row.get('address2')) else None,
town=row.get('town') if pd.notna(row.get('town')) else None,
postcode=row.get('postcode') if pd.notna(row.get('postcode')) else None,
latitude=lat,
longitude=lon,
)
db.add(school)
db.flush() # Get the ID
urn_to_school_id[urn] = school.id
schools_created += 1
if schools_created % 1000 == 0:
print(f" Created {schools_created} schools...")
print(f" Created {schools_created} schools")
# Create results
print(f"\nMigrating {len(df)} yearly results...")
results_created = 0
for _, row in df.iterrows():
urn = int(row['urn']) if pd.notna(row.get('urn')) else None
if not urn or urn not in urn_to_school_id:
continue
school_id = urn_to_school_id[urn]
year = int(row['year']) if pd.notna(row.get('year')) else None
if not year:
continue
result = SchoolResult(
school_id=school_id,
year=year,
total_pupils=parse_numeric(row.get('total_pupils')),
eligible_pupils=parse_numeric(row.get('eligible_pupils')),
# Expected Standard
rwm_expected_pct=parse_numeric(row.get('rwm_expected_pct')),
reading_expected_pct=parse_numeric(row.get('reading_expected_pct')),
writing_expected_pct=parse_numeric(row.get('writing_expected_pct')),
maths_expected_pct=parse_numeric(row.get('maths_expected_pct')),
gps_expected_pct=parse_numeric(row.get('gps_expected_pct')),
science_expected_pct=parse_numeric(row.get('science_expected_pct')),
# Higher Standard
rwm_high_pct=parse_numeric(row.get('rwm_high_pct')),
reading_high_pct=parse_numeric(row.get('reading_high_pct')),
writing_high_pct=parse_numeric(row.get('writing_high_pct')),
maths_high_pct=parse_numeric(row.get('maths_high_pct')),
gps_high_pct=parse_numeric(row.get('gps_high_pct')),
# Progress
reading_progress=parse_numeric(row.get('reading_progress')),
writing_progress=parse_numeric(row.get('writing_progress')),
maths_progress=parse_numeric(row.get('maths_progress')),
# Averages
reading_avg_score=parse_numeric(row.get('reading_avg_score')),
maths_avg_score=parse_numeric(row.get('maths_avg_score')),
gps_avg_score=parse_numeric(row.get('gps_avg_score')),
# Context
disadvantaged_pct=parse_numeric(row.get('disadvantaged_pct')),
eal_pct=parse_numeric(row.get('eal_pct')),
sen_support_pct=parse_numeric(row.get('sen_support_pct')),
sen_ehcp_pct=parse_numeric(row.get('sen_ehcp_pct')),
stability_pct=parse_numeric(row.get('stability_pct')),
# Gender
rwm_expected_boys_pct=parse_numeric(row.get('rwm_expected_boys_pct')),
rwm_expected_girls_pct=parse_numeric(row.get('rwm_expected_girls_pct')),
rwm_high_boys_pct=parse_numeric(row.get('rwm_high_boys_pct')),
rwm_high_girls_pct=parse_numeric(row.get('rwm_high_girls_pct')),
# Disadvantaged
rwm_expected_disadvantaged_pct=parse_numeric(row.get('rwm_expected_disadvantaged_pct')),
rwm_expected_non_disadvantaged_pct=parse_numeric(row.get('rwm_expected_non_disadvantaged_pct')),
disadvantaged_gap=parse_numeric(row.get('disadvantaged_gap')),
# 3-Year
rwm_expected_3yr_pct=parse_numeric(row.get('rwm_expected_3yr_pct')),
reading_avg_3yr=parse_numeric(row.get('reading_avg_3yr')),
maths_avg_3yr=parse_numeric(row.get('maths_avg_3yr')),
)
db.add(result)
results_created += 1
if results_created % 10000 == 0:
print(f" Created {results_created} results...")
db.flush()
print(f" Created {results_created} results")
# Commit all changes
db.commit()
print("\nMigration complete!")
def main():
parser = argparse.ArgumentParser(description='Migrate CSV data to PostgreSQL database')
parser.add_argument('--drop', action='store_true', help='Drop existing tables before migration')
parser.add_argument('--geocode', action='store_true', help='Geocode postcodes')
args = parser.parse_args()
print("=" * 60)
print("School Data Migration: CSV -> PostgreSQL")
print("=" * 60)
print(f"\nDatabase: {settings.database_url.split('@')[-1]}")
print(f"Data directory: {settings.data_dir}")
if args.drop:
print("\n⚠️ Dropping existing tables...")
Base.metadata.drop_all(bind=engine)
print("\nCreating tables...")
Base.metadata.create_all(bind=engine)
print("\nLoading CSV data...")
df = load_csv_data(settings.data_dir)
if df.empty:
print("No data found to migrate!")
return 1
migrate_data(df, geocode=args.geocode)
return 0
if __name__ == '__main__':
sys.exit(main())