diff --git a/backend/app.py b/backend/app.py index 6c932c6..33ea6db 100644 --- a/backend/app.py +++ b/backend/app.py @@ -26,7 +26,7 @@ from .data_loader import ( geocode_single_postcode, ) from .data_loader import get_data_info as get_db_info -from .database import init_db +from .database import check_and_migrate_if_needed from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS from .utils import clean_for_json @@ -135,16 +135,16 @@ def validate_postcode(postcode: Optional[str]) -> Optional[str]: @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan - startup and shutdown events.""" - # Startup: initialize database and pre-load data - print("Starting up: Initializing database...") - init_db() # Ensure tables exist + # Startup: check schema version and migrate if needed + print("Starting up: Checking database schema...") + check_and_migrate_if_needed() print("Loading school data from database...") df = load_school_data() if df.empty: - print("Warning: No data in database. Run the migration script to import data.") + print("Warning: No data in database. Check CSV files in data/ folder.") else: - print("Data loaded successfully.") + print(f"Data loaded successfully: {len(df)} records.") yield # Application runs here diff --git a/backend/database.py b/backend/database.py index 40d3e7e..75b86a5 100644 --- a/backend/database.py +++ b/backend/database.py @@ -2,7 +2,10 @@ Database connection setup using SQLAlchemy. """ -from sqlalchemy import create_engine +from datetime import datetime +from typing import Optional + +from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker, declarative_base from contextlib import contextmanager @@ -65,3 +68,80 @@ def drop_db(): """ Base.metadata.drop_all(bind=engine) + +def get_db_schema_version() -> Optional[int]: + """ + Get the current schema version from the database. + Returns None if table doesn't exist or no version is set. + """ + from .models import SchemaVersion # Import here to avoid circular imports + + # Check if schema_version table exists + inspector = inspect(engine) + if "schema_version" not in inspector.get_table_names(): + return None + + try: + with get_db_session() as db: + row = db.query(SchemaVersion).first() + return row.version if row else None + except Exception: + return None + + +def set_db_schema_version(version: int): + """ + Set/update the schema version in the database. + Creates the row if it doesn't exist. + """ + from .models import SchemaVersion + + with get_db_session() as db: + row = db.query(SchemaVersion).first() + if row: + row.version = version + row.migrated_at = datetime.utcnow() + else: + db.add(SchemaVersion(id=1, version=version, migrated_at=datetime.utcnow())) + + +def check_and_migrate_if_needed(): + """ + Check schema version and run migration if needed. + Called during application startup. + """ + from .version import SCHEMA_VERSION + from .migration import run_full_migration + + db_version = get_db_schema_version() + + if db_version == SCHEMA_VERSION: + print(f"Schema version {SCHEMA_VERSION} matches. Fast startup.") + # Still ensure tables exist (they should if version matches) + init_db() + return + + if db_version is None: + print(f"No schema version found. Running initial migration (v{SCHEMA_VERSION})...") + else: + print(f"Schema mismatch: DB has v{db_version}, code expects v{SCHEMA_VERSION}") + print("Running full migration...") + + try: + success = run_full_migration(geocode=False) + + if success: + # Ensure schema_version table exists before writing + init_db() + set_db_schema_version(SCHEMA_VERSION) + print(f"Migration complete. Schema version set to {SCHEMA_VERSION}") + else: + print("Warning: Migration completed but no data was imported.") + init_db() + set_db_schema_version(SCHEMA_VERSION) + + except Exception as e: + print(f"FATAL: Migration failed: {e}") + print("Application cannot start. Please check database and CSV files.") + raise + diff --git a/backend/migration.py b/backend/migration.py new file mode 100644 index 0000000..f92ac22 --- /dev/null +++ b/backend/migration.py @@ -0,0 +1,413 @@ +""" +Database migration logic for importing CSV data. +Used by both CLI script and automatic startup migration. +""" + +import re +from pathlib import Path +from typing import Dict, Optional + +import numpy as np +import pandas as pd +import requests + +from .config import settings +from .database import Base, engine, get_db_session +from .models import School, SchoolResult +from .schemas import ( + COLUMN_MAPPINGS, + LA_CODE_TO_NAME, + NULL_VALUES, + SCHOOL_TYPE_MAP, +) + + +def parse_numeric(value) -> Optional[float]: + """Parse a numeric value, handling special cases.""" + if pd.isna(value): + return None + if isinstance(value, (int, float)): + return float(value) if not np.isnan(value) else None + str_val = str(value).strip().upper() + if str_val in NULL_VALUES or str_val == "": + return None + # Remove percentage signs if present + str_val = str_val.replace("%", "") + try: + return float(str_val) + except ValueError: + return None + + +def extract_year_from_folder(folder_name: str) -> Optional[int]: + """Extract year from folder name like '2023-2024'.""" + match = re.search(r"(\d{4})-(\d{4})", folder_name) + if match: + return int(match.group(2)) + match = re.search(r"(\d{4})", folder_name) + if match: + return int(match.group(1)) + return None + + +def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]: + """ + Geocode postcodes in bulk using postcodes.io API. + Returns dict of postcode -> (latitude, longitude). + """ + results = {} + valid_postcodes = [ + p.strip().upper() + for p in postcodes + if p and isinstance(p, str) and len(p.strip()) >= 5 + ] + valid_postcodes = list(set(valid_postcodes)) + + if not valid_postcodes: + return results + + batch_size = 100 + total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size + + for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)): + batch = valid_postcodes[batch_start : batch_start + batch_size] + print( + f" Geocoding batch {i + 1}/{total_batches} ({len(batch)} postcodes)..." + ) + + try: + response = requests.post( + "https://api.postcodes.io/postcodes", + json={"postcodes": batch}, + timeout=30, + ) + if response.status_code == 200: + data = response.json() + for item in data.get("result", []): + if item and item.get("result"): + pc = item["query"].upper() + lat = item["result"].get("latitude") + lon = item["result"].get("longitude") + if lat and lon: + results[pc] = (lat, lon) + except Exception as e: + print(f" Warning: Geocoding batch failed: {e}") + + return results + + +def load_csv_data(data_dir: Path) -> pd.DataFrame: + """Load all CSV data from data directory.""" + all_data = [] + + for folder in sorted(data_dir.iterdir()): + if not folder.is_dir(): + continue + + year = extract_year_from_folder(folder.name) + if not year: + continue + + # Specifically look for the KS2 results file + ks2_file = folder / "england_ks2final.csv" + if not ks2_file.exists(): + continue + + csv_file = ks2_file + print(f" Loading {csv_file.name} (year {year})...") + + try: + df = pd.read_csv(csv_file, encoding="latin-1", low_memory=False) + except Exception as e: + print(f" Error loading {csv_file}: {e}") + continue + + # Rename columns + df.rename(columns=COLUMN_MAPPINGS, inplace=True) + df["year"] = year + + # Handle local authority name + la_name_cols = ["LANAME", "LA (name)", "LA_NAME", "LA NAME"] + la_name_col = next((c for c in la_name_cols if c in df.columns), None) + + if la_name_col and la_name_col != "local_authority": + df["local_authority"] = df[la_name_col] + elif "LEA" in df.columns: + df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") + df["local_authority"] = ( + df["local_authority_code"] + .map(LA_CODE_TO_NAME) + .fillna(df["LEA"].astype(str)) + ) + + # Store LEA code + if "LEA" in df.columns: + df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") + + # Map school type + if "school_type_code" in df.columns: + df["school_type"] = ( + df["school_type_code"] + .map(SCHOOL_TYPE_MAP) + .fillna(df["school_type_code"]) + ) + + # Create combined address + addr_parts = ["address1", "address2", "town", "postcode"] + for col in addr_parts: + if col not in df.columns: + df[col] = None + + df["address"] = df.apply( + lambda r: ", ".join( + str(v) + for v in [ + r.get("address1"), + r.get("address2"), + r.get("town"), + r.get("postcode"), + ] + if pd.notna(v) and str(v).strip() + ), + axis=1, + ) + + all_data.append(df) + print(f" Loaded {len(df)} records") + + if all_data: + result = pd.concat(all_data, ignore_index=True) + print(f"\nTotal records loaded: {len(result)}") + print(f"Unique schools: {result['urn'].nunique()}") + print(f"Years: {sorted(result['year'].unique())}") + return result + + return pd.DataFrame() + + +def migrate_data(df: pd.DataFrame, geocode: bool = False): + """Migrate DataFrame data to database.""" + + # Clean URN column - convert to integer, drop invalid values + df = df.copy() + df["urn"] = pd.to_numeric(df["urn"], errors="coerce") + df = df.dropna(subset=["urn"]) + df["urn"] = df["urn"].astype(int) + + # Group by URN to get unique schools (use latest year's data) + school_data = ( + df.sort_values("year", ascending=False).groupby("urn").first().reset_index() + ) + print(f"\nMigrating {len(school_data)} unique schools...") + + # Geocode if requested + geocoded = {} + if geocode and "postcode" in df.columns: + print("\nGeocoding postcodes...") + postcodes = df["postcode"].dropna().unique().tolist() + geocoded = geocode_postcodes_bulk(postcodes) + print(f" Successfully geocoded {len(geocoded)} postcodes") + + with get_db_session() as db: + # Create schools + urn_to_school_id = {} + schools_created = 0 + + for _, row in school_data.iterrows(): + # Safely parse URN - handle None, NaN, whitespace, and invalid values + urn_val = row.get("urn") + urn = None + if pd.notna(urn_val): + try: + urn_str = str(urn_val).strip() + if urn_str: + urn = int(float(urn_str)) # Handle "12345.0" format + except (ValueError, TypeError): + pass + if not urn: + continue + + # Skip if we've already added this URN (handles duplicates in source data) + if urn in urn_to_school_id: + continue + + # Get geocoding data + postcode = row.get("postcode") + lat, lon = None, None + if postcode and pd.notna(postcode): + coords = geocoded.get(str(postcode).strip().upper()) + if coords: + lat, lon = coords + + # Safely parse local_authority_code + la_code = None + la_code_val = row.get("local_authority_code") + if pd.notna(la_code_val): + try: + la_code_str = str(la_code_val).strip() + if la_code_str: + la_code = int(float(la_code_str)) + except (ValueError, TypeError): + pass + + school = School( + urn=urn, + school_name=row.get("school_name") + if pd.notna(row.get("school_name")) + else "Unknown", + local_authority=row.get("local_authority") + if pd.notna(row.get("local_authority")) + else None, + local_authority_code=la_code, + school_type=row.get("school_type") + if pd.notna(row.get("school_type")) + else None, + school_type_code=row.get("school_type_code") + if pd.notna(row.get("school_type_code")) + else None, + religious_denomination=row.get("religious_denomination") + if pd.notna(row.get("religious_denomination")) + else None, + age_range=row.get("age_range") + if pd.notna(row.get("age_range")) + else None, + address1=row.get("address1") if pd.notna(row.get("address1")) else None, + address2=row.get("address2") if pd.notna(row.get("address2")) else None, + town=row.get("town") if pd.notna(row.get("town")) else None, + postcode=row.get("postcode") if pd.notna(row.get("postcode")) else None, + latitude=lat, + longitude=lon, + ) + db.add(school) + db.flush() # Get the ID + urn_to_school_id[urn] = school.id + schools_created += 1 + + if schools_created % 1000 == 0: + print(f" Created {schools_created} schools...") + + print(f" Created {schools_created} schools") + + # Create results + print(f"\nMigrating {len(df)} yearly results...") + results_created = 0 + + for _, row in df.iterrows(): + # Safely parse URN + urn_val = row.get("urn") + urn = None + if pd.notna(urn_val): + try: + urn_str = str(urn_val).strip() + if urn_str: + urn = int(float(urn_str)) + except (ValueError, TypeError): + pass + if not urn or urn not in urn_to_school_id: + continue + + school_id = urn_to_school_id[urn] + + # Safely parse year + year_val = row.get("year") + year = None + if pd.notna(year_val): + try: + year = int(float(str(year_val).strip())) + except (ValueError, TypeError): + pass + if not year: + continue + + result = SchoolResult( + school_id=school_id, + year=year, + total_pupils=parse_numeric(row.get("total_pupils")), + eligible_pupils=parse_numeric(row.get("eligible_pupils")), + # Expected Standard + rwm_expected_pct=parse_numeric(row.get("rwm_expected_pct")), + reading_expected_pct=parse_numeric(row.get("reading_expected_pct")), + writing_expected_pct=parse_numeric(row.get("writing_expected_pct")), + maths_expected_pct=parse_numeric(row.get("maths_expected_pct")), + gps_expected_pct=parse_numeric(row.get("gps_expected_pct")), + science_expected_pct=parse_numeric(row.get("science_expected_pct")), + # Higher Standard + rwm_high_pct=parse_numeric(row.get("rwm_high_pct")), + reading_high_pct=parse_numeric(row.get("reading_high_pct")), + writing_high_pct=parse_numeric(row.get("writing_high_pct")), + maths_high_pct=parse_numeric(row.get("maths_high_pct")), + gps_high_pct=parse_numeric(row.get("gps_high_pct")), + # Progress + reading_progress=parse_numeric(row.get("reading_progress")), + writing_progress=parse_numeric(row.get("writing_progress")), + maths_progress=parse_numeric(row.get("maths_progress")), + # Averages + reading_avg_score=parse_numeric(row.get("reading_avg_score")), + maths_avg_score=parse_numeric(row.get("maths_avg_score")), + gps_avg_score=parse_numeric(row.get("gps_avg_score")), + # Context + disadvantaged_pct=parse_numeric(row.get("disadvantaged_pct")), + eal_pct=parse_numeric(row.get("eal_pct")), + sen_support_pct=parse_numeric(row.get("sen_support_pct")), + sen_ehcp_pct=parse_numeric(row.get("sen_ehcp_pct")), + stability_pct=parse_numeric(row.get("stability_pct")), + # Absence + reading_absence_pct=parse_numeric(row.get("reading_absence_pct")), + gps_absence_pct=parse_numeric(row.get("gps_absence_pct")), + maths_absence_pct=parse_numeric(row.get("maths_absence_pct")), + writing_absence_pct=parse_numeric(row.get("writing_absence_pct")), + science_absence_pct=parse_numeric(row.get("science_absence_pct")), + # Gender + rwm_expected_boys_pct=parse_numeric(row.get("rwm_expected_boys_pct")), + rwm_expected_girls_pct=parse_numeric(row.get("rwm_expected_girls_pct")), + rwm_high_boys_pct=parse_numeric(row.get("rwm_high_boys_pct")), + rwm_high_girls_pct=parse_numeric(row.get("rwm_high_girls_pct")), + # Disadvantaged + rwm_expected_disadvantaged_pct=parse_numeric( + row.get("rwm_expected_disadvantaged_pct") + ), + rwm_expected_non_disadvantaged_pct=parse_numeric( + row.get("rwm_expected_non_disadvantaged_pct") + ), + disadvantaged_gap=parse_numeric(row.get("disadvantaged_gap")), + # 3-Year + rwm_expected_3yr_pct=parse_numeric(row.get("rwm_expected_3yr_pct")), + reading_avg_3yr=parse_numeric(row.get("reading_avg_3yr")), + maths_avg_3yr=parse_numeric(row.get("maths_avg_3yr")), + ) + db.add(result) + results_created += 1 + + if results_created % 10000 == 0: + print(f" Created {results_created} results...") + db.flush() + + print(f" Created {results_created} results") + + # Commit all changes + db.commit() + print("\nMigration complete!") + + +def run_full_migration(geocode: bool = False) -> bool: + """ + Run a complete migration: drop all tables and reimport from CSV. + + Returns True if successful, False if no data found. + Raises exception on error. + """ + print("Dropping existing tables...") + Base.metadata.drop_all(bind=engine) + + print("Creating tables...") + Base.metadata.create_all(bind=engine) + + print("\nLoading CSV data...") + df = load_csv_data(settings.data_dir) + + if df.empty: + print("Warning: No CSV data found to migrate!") + return False + + migrate_data(df, geocode=geocode) + return True diff --git a/backend/models.py b/backend/models.py index 790fd03..b280342 100644 --- a/backend/models.py +++ b/backend/models.py @@ -3,9 +3,11 @@ SQLAlchemy database models for school data. Normalized schema with separate tables for schools and yearly results. """ +from datetime import datetime + from sqlalchemy import ( Column, Integer, String, Float, ForeignKey, Index, UniqueConstraint, - Text, Boolean + Text, Boolean, DateTime ) from sqlalchemy.orm import relationship from .database import Base @@ -133,6 +135,21 @@ class SchoolResult(Base): return f"" +class SchemaVersion(Base): + """ + Tracks database schema version for automatic migrations. + Single-row table that stores the current schema version. + """ + __tablename__ = "schema_version" + + id = Column(Integer, primary_key=True, default=1) + version = Column(Integer, nullable=False) + migrated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f"" + + # Mapping from CSV columns to model fields SCHOOL_FIELD_MAPPING = { 'urn': 'urn', diff --git a/backend/version.py b/backend/version.py new file mode 100644 index 0000000..49d8f5b --- /dev/null +++ b/backend/version.py @@ -0,0 +1,22 @@ +""" +Schema versioning for database migrations. + +HOW TO USE: +- Bump SCHEMA_VERSION when making changes to database models +- This triggers an automatic full data reimport on next app startup + +WHEN TO BUMP: +- Adding/removing columns in models.py +- Changing column types or constraints +- Modifying CSV column mappings in schemas.py +- Any change that requires fresh data import +""" + +# Current schema version - increment when models change +SCHEMA_VERSION = 2 + +# Changelog for documentation +SCHEMA_CHANGELOG = { + 1: "Initial schema with School and SchoolResult tables", + 2: "Added pupil absence fields (reading, maths, gps, writing, science)", +} diff --git a/scripts/migrate_csv_to_db.py b/scripts/migrate_csv_to_db.py index 9db1674..c4879d2 100644 --- a/scripts/migrate_csv_to_db.py +++ b/scripts/migrate_csv_to_db.py @@ -1,16 +1,15 @@ #!/usr/bin/env python3 """ -Migration script to import CSV data into PostgreSQL database. +CLI script for manual database migration. Usage: python scripts/migrate_csv_to_db.py [--drop] [--geocode] Options: - --drop Drop existing tables before migration + --drop Drop existing tables before migration (full reimport) --geocode Geocode postcodes (requires network access) """ -import os import sys from pathlib import Path @@ -18,389 +17,11 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) import argparse -import re -from typing import Dict, Optional - -import numpy as np -import pandas as pd -import requests from backend.config import settings -from backend.database import Base, engine, get_db_session -from backend.models import ( - RESULT_FIELD_MAPPING, - SCHOOL_FIELD_MAPPING, - School, - SchoolResult, -) -from backend.schemas import ( - COLUMN_MAPPINGS, - LA_CODE_TO_NAME, - NULL_VALUES, - NUMERIC_COLUMNS, - SCHOOL_TYPE_MAP, -) - - -def parse_numeric(value) -> Optional[float]: - """Parse a numeric value, handling special cases.""" - if pd.isna(value): - return None - if isinstance(value, (int, float)): - return float(value) if not np.isnan(value) else None - str_val = str(value).strip().upper() - if str_val in NULL_VALUES or str_val == "": - return None - # Remove percentage signs if present - str_val = str_val.replace("%", "") - try: - return float(str_val) - except ValueError: - return None - - -def extract_year_from_folder(folder_name: str) -> Optional[int]: - """Extract year from folder name like '2023-2024'.""" - match = re.search(r"(\d{4})-(\d{4})", folder_name) - if match: - return int(match.group(2)) - match = re.search(r"(\d{4})", folder_name) - if match: - return int(match.group(1)) - return None - - -def geocode_postcodes_bulk(postcodes: list) -> Dict[str, tuple]: - """ - Geocode postcodes in bulk using postcodes.io API. - Returns dict of postcode -> (latitude, longitude). - """ - results = {} - valid_postcodes = [ - p.strip().upper() - for p in postcodes - if p and isinstance(p, str) and len(p.strip()) >= 5 - ] - valid_postcodes = list(set(valid_postcodes)) - - if not valid_postcodes: - return results - - batch_size = 100 - total_batches = (len(valid_postcodes) + batch_size - 1) // batch_size - - for i, batch_start in enumerate(range(0, len(valid_postcodes), batch_size)): - batch = valid_postcodes[batch_start : batch_start + batch_size] - print( - f" Geocoding batch {i + 1}/{total_batches} ({len(batch)} postcodes)..." - ) - - try: - response = requests.post( - "https://api.postcodes.io/postcodes", - json={"postcodes": batch}, - timeout=30, - ) - if response.status_code == 200: - data = response.json() - for item in data.get("result", []): - if item and item.get("result"): - pc = item["query"].upper() - lat = item["result"].get("latitude") - lon = item["result"].get("longitude") - if lat and lon: - results[pc] = (lat, lon) - except Exception as e: - print(f" Warning: Geocoding batch failed: {e}") - - return results - - -def load_csv_data(data_dir: Path) -> pd.DataFrame: - """Load all CSV data from data directory.""" - all_data = [] - - for folder in sorted(data_dir.iterdir()): - if not folder.is_dir(): - continue - - year = extract_year_from_folder(folder.name) - if not year: - continue - - # Specifically look for the KS2 results file - ks2_file = folder / "england_ks2final.csv" - if not ks2_file.exists(): - continue - - csv_file = ks2_file - print(f" Loading {csv_file.name} (year {year})...") - - try: - df = pd.read_csv(csv_file, encoding="latin-1", low_memory=False) - except Exception as e: - print(f" Error loading {csv_file}: {e}") - continue - - # Rename columns - df.rename(columns=COLUMN_MAPPINGS, inplace=True) - df["year"] = year - - # Handle local authority name - la_name_cols = ["LANAME", "LA (name)", "LA_NAME", "LA NAME"] - la_name_col = next((c for c in la_name_cols if c in df.columns), None) - - if la_name_col and la_name_col != "local_authority": - df["local_authority"] = df[la_name_col] - elif "LEA" in df.columns: - df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") - df["local_authority"] = ( - df["local_authority_code"] - .map(LA_CODE_TO_NAME) - .fillna(df["LEA"].astype(str)) - ) - - # Store LEA code - if "LEA" in df.columns: - df["local_authority_code"] = pd.to_numeric(df["LEA"], errors="coerce") - - # Map school type - if "school_type_code" in df.columns: - df["school_type"] = ( - df["school_type_code"] - .map(SCHOOL_TYPE_MAP) - .fillna(df["school_type_code"]) - ) - - # Create combined address - addr_parts = ["address1", "address2", "town", "postcode"] - for col in addr_parts: - if col not in df.columns: - df[col] = None - - df["address"] = df.apply( - lambda r: ", ".join( - str(v) - for v in [ - r.get("address1"), - r.get("address2"), - r.get("town"), - r.get("postcode"), - ] - if pd.notna(v) and str(v).strip() - ), - axis=1, - ) - - all_data.append(df) - print(f" Loaded {len(df)} records") - - if all_data: - result = pd.concat(all_data, ignore_index=True) - print(f"\nTotal records loaded: {len(result)}") - print(f"Unique schools: {result['urn'].nunique()}") - print(f"Years: {sorted(result['year'].unique())}") - return result - - return pd.DataFrame() - - -def migrate_data(df: pd.DataFrame, geocode: bool = False): - """Migrate DataFrame data to database.""" - - # Clean URN column - convert to integer, drop invalid values - df = df.copy() - df["urn"] = pd.to_numeric(df["urn"], errors="coerce") - df = df.dropna(subset=["urn"]) - df["urn"] = df["urn"].astype(int) - - # Group by URN to get unique schools (use latest year's data) - school_data = ( - df.sort_values("year", ascending=False).groupby("urn").first().reset_index() - ) - print(f"\nMigrating {len(school_data)} unique schools...") - - # Geocode if requested - geocoded = {} - if geocode and "postcode" in df.columns: - print("\nGeocoding postcodes...") - postcodes = df["postcode"].dropna().unique().tolist() - geocoded = geocode_postcodes_bulk(postcodes) - print(f" Successfully geocoded {len(geocoded)} postcodes") - - with get_db_session() as db: - # Create schools - urn_to_school_id = {} - schools_created = 0 - - for _, row in school_data.iterrows(): - # Safely parse URN - handle None, NaN, whitespace, and invalid values - urn_val = row.get("urn") - urn = None - if pd.notna(urn_val): - try: - urn_str = str(urn_val).strip() - if urn_str: - urn = int(float(urn_str)) # Handle "12345.0" format - except (ValueError, TypeError): - pass - if not urn: - continue - - # Skip if we've already added this URN (handles duplicates in source data) - if urn in urn_to_school_id: - continue - - # Get geocoding data - postcode = row.get("postcode") - lat, lon = None, None - if postcode and pd.notna(postcode): - coords = geocoded.get(str(postcode).strip().upper()) - if coords: - lat, lon = coords - - # Safely parse local_authority_code - la_code = None - la_code_val = row.get("local_authority_code") - if pd.notna(la_code_val): - try: - la_code_str = str(la_code_val).strip() - if la_code_str: - la_code = int(float(la_code_str)) - except (ValueError, TypeError): - pass - - school = School( - urn=urn, - school_name=row.get("school_name") - if pd.notna(row.get("school_name")) - else "Unknown", - local_authority=row.get("local_authority") - if pd.notna(row.get("local_authority")) - else None, - local_authority_code=la_code, - school_type=row.get("school_type") - if pd.notna(row.get("school_type")) - else None, - school_type_code=row.get("school_type_code") - if pd.notna(row.get("school_type_code")) - else None, - religious_denomination=row.get("religious_denomination") - if pd.notna(row.get("religious_denomination")) - else None, - age_range=row.get("age_range") - if pd.notna(row.get("age_range")) - else None, - address1=row.get("address1") if pd.notna(row.get("address1")) else None, - address2=row.get("address2") if pd.notna(row.get("address2")) else None, - town=row.get("town") if pd.notna(row.get("town")) else None, - postcode=row.get("postcode") if pd.notna(row.get("postcode")) else None, - latitude=lat, - longitude=lon, - ) - db.add(school) - db.flush() # Get the ID - urn_to_school_id[urn] = school.id - schools_created += 1 - - if schools_created % 1000 == 0: - print(f" Created {schools_created} schools...") - - print(f" Created {schools_created} schools") - - # Create results - print(f"\nMigrating {len(df)} yearly results...") - results_created = 0 - - for _, row in df.iterrows(): - # Safely parse URN - urn_val = row.get("urn") - urn = None - if pd.notna(urn_val): - try: - urn_str = str(urn_val).strip() - if urn_str: - urn = int(float(urn_str)) - except (ValueError, TypeError): - pass - if not urn or urn not in urn_to_school_id: - continue - - school_id = urn_to_school_id[urn] - - # Safely parse year - year_val = row.get("year") - year = None - if pd.notna(year_val): - try: - year = int(float(str(year_val).strip())) - except (ValueError, TypeError): - pass - if not year: - continue - - result = SchoolResult( - school_id=school_id, - year=year, - total_pupils=parse_numeric(row.get("total_pupils")), - eligible_pupils=parse_numeric(row.get("eligible_pupils")), - # Expected Standard - rwm_expected_pct=parse_numeric(row.get("rwm_expected_pct")), - reading_expected_pct=parse_numeric(row.get("reading_expected_pct")), - writing_expected_pct=parse_numeric(row.get("writing_expected_pct")), - maths_expected_pct=parse_numeric(row.get("maths_expected_pct")), - gps_expected_pct=parse_numeric(row.get("gps_expected_pct")), - science_expected_pct=parse_numeric(row.get("science_expected_pct")), - # Higher Standard - rwm_high_pct=parse_numeric(row.get("rwm_high_pct")), - reading_high_pct=parse_numeric(row.get("reading_high_pct")), - writing_high_pct=parse_numeric(row.get("writing_high_pct")), - maths_high_pct=parse_numeric(row.get("maths_high_pct")), - gps_high_pct=parse_numeric(row.get("gps_high_pct")), - # Progress - reading_progress=parse_numeric(row.get("reading_progress")), - writing_progress=parse_numeric(row.get("writing_progress")), - maths_progress=parse_numeric(row.get("maths_progress")), - # Averages - reading_avg_score=parse_numeric(row.get("reading_avg_score")), - maths_avg_score=parse_numeric(row.get("maths_avg_score")), - gps_avg_score=parse_numeric(row.get("gps_avg_score")), - # Context - disadvantaged_pct=parse_numeric(row.get("disadvantaged_pct")), - eal_pct=parse_numeric(row.get("eal_pct")), - sen_support_pct=parse_numeric(row.get("sen_support_pct")), - sen_ehcp_pct=parse_numeric(row.get("sen_ehcp_pct")), - stability_pct=parse_numeric(row.get("stability_pct")), - # Gender - rwm_expected_boys_pct=parse_numeric(row.get("rwm_expected_boys_pct")), - rwm_expected_girls_pct=parse_numeric(row.get("rwm_expected_girls_pct")), - rwm_high_boys_pct=parse_numeric(row.get("rwm_high_boys_pct")), - rwm_high_girls_pct=parse_numeric(row.get("rwm_high_girls_pct")), - # Disadvantaged - rwm_expected_disadvantaged_pct=parse_numeric( - row.get("rwm_expected_disadvantaged_pct") - ), - rwm_expected_non_disadvantaged_pct=parse_numeric( - row.get("rwm_expected_non_disadvantaged_pct") - ), - disadvantaged_gap=parse_numeric(row.get("disadvantaged_gap")), - # 3-Year - rwm_expected_3yr_pct=parse_numeric(row.get("rwm_expected_3yr_pct")), - reading_avg_3yr=parse_numeric(row.get("reading_avg_3yr")), - maths_avg_3yr=parse_numeric(row.get("maths_avg_3yr")), - ) - db.add(result) - results_created += 1 - - if results_created % 10000 == 0: - print(f" Created {results_created} results...") - db.flush() - - print(f" Created {results_created} results") - - # Commit all changes - db.commit() - print("\nMigration complete!") +from backend.database import Base, engine, init_db, set_db_schema_version +from backend.migration import load_csv_data, migrate_data, run_full_migration +from backend.version import SCHEMA_VERSION def main(): @@ -418,24 +39,29 @@ def main(): print("=" * 60) print(f"\nDatabase: {settings.database_url.split('@')[-1]}") print(f"Data directory: {settings.data_dir}") + print(f"Target schema version: {SCHEMA_VERSION}") if args.drop: - print("\n⚠️ Dropping existing tables...") - Base.metadata.drop_all(bind=engine) + print("\nRunning full migration (drop + reimport)...") + success = run_full_migration(geocode=args.geocode) + else: + print("\nCreating tables (preserving existing data)...") + init_db() + print("\nLoading CSV data...") + df = load_csv_data(settings.data_dir) + if df.empty: + print("No data found to migrate!") + return 1 + migrate_data(df, geocode=args.geocode) + success = True - print("\nCreating tables...") - Base.metadata.create_all(bind=engine) + if success: + # Ensure schema_version table exists + init_db() + set_db_schema_version(SCHEMA_VERSION) + print(f"\nSchema version set to {SCHEMA_VERSION}") - print("\nLoading CSV data...") - df = load_csv_data(settings.data_dir) - - if df.empty: - print("No data found to migrate!") - return 1 - - migrate_data(df, geocode=args.geocode) - - return 0 + return 0 if success else 1 if __name__ == "__main__":