197 lines
6.3 KiB
Python
197 lines
6.3 KiB
Python
|
|
"""
|
||
|
|
Data loading module with optimized pandas operations.
|
||
|
|
Uses vectorized operations instead of .apply() for performance.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
import numpy as np
|
||
|
|
from pathlib import Path
|
||
|
|
from functools import lru_cache
|
||
|
|
import re
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
from .config import settings
|
||
|
|
from .schemas import (
|
||
|
|
COLUMN_MAPPINGS,
|
||
|
|
NUMERIC_COLUMNS,
|
||
|
|
SCHOOL_TYPE_MAP,
|
||
|
|
NULL_VALUES,
|
||
|
|
LA_CODE_TO_NAME,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def extract_year_from_folder(folder_name: str) -> Optional[int]:
|
||
|
|
"""Extract the end year from folder name like '2023-2024' -> 2024."""
|
||
|
|
match = re.search(r'(\d{4})-(\d{4})', folder_name)
|
||
|
|
if match:
|
||
|
|
return int(match.group(2))
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def parse_numeric_vectorized(series: pd.Series) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Vectorized numeric parsing - much faster than .apply().
|
||
|
|
Handles SUPP, NE, NA, NP, %, etc.
|
||
|
|
"""
|
||
|
|
# Convert to string first
|
||
|
|
str_series = series.astype(str)
|
||
|
|
|
||
|
|
# Replace null values with NaN
|
||
|
|
for null_val in NULL_VALUES:
|
||
|
|
str_series = str_series.replace(null_val, np.nan)
|
||
|
|
|
||
|
|
# Remove % signs
|
||
|
|
str_series = str_series.str.rstrip('%')
|
||
|
|
|
||
|
|
# Convert to numeric
|
||
|
|
return pd.to_numeric(str_series, errors='coerce')
|
||
|
|
|
||
|
|
|
||
|
|
def create_address_vectorized(df: pd.DataFrame) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Vectorized address creation - much faster than .apply().
|
||
|
|
"""
|
||
|
|
parts = []
|
||
|
|
|
||
|
|
if 'address1' in df.columns:
|
||
|
|
parts.append(df['address1'].fillna('').astype(str))
|
||
|
|
if 'town' in df.columns:
|
||
|
|
parts.append(df['town'].fillna('').astype(str))
|
||
|
|
if 'postcode' in df.columns:
|
||
|
|
parts.append(df['postcode'].fillna('').astype(str))
|
||
|
|
|
||
|
|
if not parts:
|
||
|
|
return pd.Series([''] * len(df), index=df.index)
|
||
|
|
|
||
|
|
# Combine parts with comma separator, filtering empty strings
|
||
|
|
result = pd.Series([''] * len(df), index=df.index)
|
||
|
|
for i, row_idx in enumerate(df.index):
|
||
|
|
row_parts = [p.iloc[i] if hasattr(p, 'iloc') else p[i] for p in parts]
|
||
|
|
row_parts = [p for p in row_parts if p and p.strip()]
|
||
|
|
result.iloc[i] = ', '.join(row_parts)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def create_address_fast(df: pd.DataFrame) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Fast vectorized address creation using string concatenation.
|
||
|
|
"""
|
||
|
|
addr1 = df.get('address1', pd.Series([''] * len(df))).fillna('').astype(str)
|
||
|
|
town = df.get('town', pd.Series([''] * len(df))).fillna('').astype(str)
|
||
|
|
postcode = df.get('postcode', pd.Series([''] * len(df))).fillna('').astype(str)
|
||
|
|
|
||
|
|
# Build address with proper separators
|
||
|
|
result = addr1.str.strip()
|
||
|
|
|
||
|
|
# Add town if not empty
|
||
|
|
town_mask = town.str.strip() != ''
|
||
|
|
result = result.where(~town_mask, result + ', ' + town.str.strip())
|
||
|
|
|
||
|
|
# Add postcode if not empty
|
||
|
|
postcode_mask = postcode.str.strip() != ''
|
||
|
|
result = result.where(~postcode_mask, result + ', ' + postcode.str.strip())
|
||
|
|
|
||
|
|
# Clean up leading commas
|
||
|
|
result = result.str.lstrip(', ')
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def load_year_data(year_folder: Path, year: int) -> Optional[pd.DataFrame]:
|
||
|
|
"""Load and process data for a single year."""
|
||
|
|
ks2_file = year_folder / "england_ks2final.csv"
|
||
|
|
if not ks2_file.exists():
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
print(f"Loading data from {ks2_file}")
|
||
|
|
df = pd.read_csv(ks2_file, low_memory=False)
|
||
|
|
|
||
|
|
# Handle column types
|
||
|
|
if 'LEA' in df.columns and df['LEA'].dtype == 'object':
|
||
|
|
df['LEA'] = pd.to_numeric(df['LEA'], errors='coerce')
|
||
|
|
if 'URN' in df.columns and df['URN'].dtype == 'object':
|
||
|
|
df['URN'] = pd.to_numeric(df['URN'], errors='coerce')
|
||
|
|
|
||
|
|
# Filter to schools only (RECTYPE == 1 means school level data)
|
||
|
|
if 'RECTYPE' in df.columns:
|
||
|
|
df = df[df['RECTYPE'] == 1].copy()
|
||
|
|
|
||
|
|
# Add year and local authority name
|
||
|
|
df['year'] = year
|
||
|
|
|
||
|
|
# Try different column names for LA name
|
||
|
|
la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME']
|
||
|
|
la_col_found = None
|
||
|
|
for col in la_name_cols:
|
||
|
|
if col in df.columns:
|
||
|
|
la_col_found = col
|
||
|
|
break
|
||
|
|
|
||
|
|
if la_col_found:
|
||
|
|
df['local_authority'] = df[la_col_found]
|
||
|
|
elif 'LEA' in df.columns:
|
||
|
|
# Map LEA codes to names using our mapping
|
||
|
|
df['local_authority'] = df['LEA'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str))
|
||
|
|
|
||
|
|
# Rename columns using mapping
|
||
|
|
rename_dict = {k: v for k, v in COLUMN_MAPPINGS.items() if k in df.columns}
|
||
|
|
df = df.rename(columns=rename_dict)
|
||
|
|
|
||
|
|
# Create address field (vectorized)
|
||
|
|
df['address'] = create_address_fast(df)
|
||
|
|
|
||
|
|
# Map school type codes to names (vectorized)
|
||
|
|
if 'school_type_code' in df.columns:
|
||
|
|
df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna('Other')
|
||
|
|
|
||
|
|
# Parse numeric columns (vectorized - much faster than .apply())
|
||
|
|
for col in NUMERIC_COLUMNS:
|
||
|
|
if col in df.columns:
|
||
|
|
df[col] = parse_numeric_vectorized(df[col])
|
||
|
|
|
||
|
|
print(f" Loaded {len(df)} schools for year {year}")
|
||
|
|
return df
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error loading {ks2_file}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
@lru_cache(maxsize=1)
|
||
|
|
def load_school_data() -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Load and combine all school data from CSV files in year folders.
|
||
|
|
Uses lru_cache for singleton-like behavior.
|
||
|
|
"""
|
||
|
|
all_data = []
|
||
|
|
|
||
|
|
data_dir = settings.data_dir
|
||
|
|
if data_dir.exists():
|
||
|
|
for year_folder in data_dir.iterdir():
|
||
|
|
if year_folder.is_dir() and re.match(r'\d{4}-\d{4}', year_folder.name):
|
||
|
|
year = extract_year_from_folder(year_folder.name)
|
||
|
|
if year is None:
|
||
|
|
continue
|
||
|
|
|
||
|
|
df = load_year_data(year_folder, year)
|
||
|
|
if df is not None:
|
||
|
|
all_data.append(df)
|
||
|
|
|
||
|
|
if all_data:
|
||
|
|
result = pd.concat(all_data, ignore_index=True)
|
||
|
|
print(f"\nTotal records loaded: {len(result)}")
|
||
|
|
print(f"Unique schools: {result['urn'].nunique()}")
|
||
|
|
print(f"Years: {sorted(result['year'].unique())}")
|
||
|
|
return result
|
||
|
|
else:
|
||
|
|
print("No data files found. Creating empty DataFrame.")
|
||
|
|
return pd.DataFrame()
|
||
|
|
|
||
|
|
|
||
|
|
def clear_cache():
|
||
|
|
"""Clear the data cache to force reload."""
|
||
|
|
load_school_data.cache_clear()
|
||
|
|
|