Refactoring and bug fixes
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 1m7s
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 1m7s
This commit is contained in:
196
backend/data_loader.py
Normal file
196
backend/data_loader.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""
|
||||
Data loading module with optimized pandas operations.
|
||||
Uses vectorized operations instead of .apply() for performance.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
from functools import lru_cache
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
from .config import settings
|
||||
from .schemas import (
|
||||
COLUMN_MAPPINGS,
|
||||
NUMERIC_COLUMNS,
|
||||
SCHOOL_TYPE_MAP,
|
||||
NULL_VALUES,
|
||||
LA_CODE_TO_NAME,
|
||||
)
|
||||
|
||||
|
||||
def extract_year_from_folder(folder_name: str) -> Optional[int]:
|
||||
"""Extract the end year from folder name like '2023-2024' -> 2024."""
|
||||
match = re.search(r'(\d{4})-(\d{4})', folder_name)
|
||||
if match:
|
||||
return int(match.group(2))
|
||||
return None
|
||||
|
||||
|
||||
def parse_numeric_vectorized(series: pd.Series) -> pd.Series:
|
||||
"""
|
||||
Vectorized numeric parsing - much faster than .apply().
|
||||
Handles SUPP, NE, NA, NP, %, etc.
|
||||
"""
|
||||
# Convert to string first
|
||||
str_series = series.astype(str)
|
||||
|
||||
# Replace null values with NaN
|
||||
for null_val in NULL_VALUES:
|
||||
str_series = str_series.replace(null_val, np.nan)
|
||||
|
||||
# Remove % signs
|
||||
str_series = str_series.str.rstrip('%')
|
||||
|
||||
# Convert to numeric
|
||||
return pd.to_numeric(str_series, errors='coerce')
|
||||
|
||||
|
||||
def create_address_vectorized(df: pd.DataFrame) -> pd.Series:
|
||||
"""
|
||||
Vectorized address creation - much faster than .apply().
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if 'address1' in df.columns:
|
||||
parts.append(df['address1'].fillna('').astype(str))
|
||||
if 'town' in df.columns:
|
||||
parts.append(df['town'].fillna('').astype(str))
|
||||
if 'postcode' in df.columns:
|
||||
parts.append(df['postcode'].fillna('').astype(str))
|
||||
|
||||
if not parts:
|
||||
return pd.Series([''] * len(df), index=df.index)
|
||||
|
||||
# Combine parts with comma separator, filtering empty strings
|
||||
result = pd.Series([''] * len(df), index=df.index)
|
||||
for i, row_idx in enumerate(df.index):
|
||||
row_parts = [p.iloc[i] if hasattr(p, 'iloc') else p[i] for p in parts]
|
||||
row_parts = [p for p in row_parts if p and p.strip()]
|
||||
result.iloc[i] = ', '.join(row_parts)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def create_address_fast(df: pd.DataFrame) -> pd.Series:
|
||||
"""
|
||||
Fast vectorized address creation using string concatenation.
|
||||
"""
|
||||
addr1 = df.get('address1', pd.Series([''] * len(df))).fillna('').astype(str)
|
||||
town = df.get('town', pd.Series([''] * len(df))).fillna('').astype(str)
|
||||
postcode = df.get('postcode', pd.Series([''] * len(df))).fillna('').astype(str)
|
||||
|
||||
# Build address with proper separators
|
||||
result = addr1.str.strip()
|
||||
|
||||
# Add town if not empty
|
||||
town_mask = town.str.strip() != ''
|
||||
result = result.where(~town_mask, result + ', ' + town.str.strip())
|
||||
|
||||
# Add postcode if not empty
|
||||
postcode_mask = postcode.str.strip() != ''
|
||||
result = result.where(~postcode_mask, result + ', ' + postcode.str.strip())
|
||||
|
||||
# Clean up leading commas
|
||||
result = result.str.lstrip(', ')
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def load_year_data(year_folder: Path, year: int) -> Optional[pd.DataFrame]:
|
||||
"""Load and process data for a single year."""
|
||||
ks2_file = year_folder / "england_ks2final.csv"
|
||||
if not ks2_file.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
print(f"Loading data from {ks2_file}")
|
||||
df = pd.read_csv(ks2_file, low_memory=False)
|
||||
|
||||
# Handle column types
|
||||
if 'LEA' in df.columns and df['LEA'].dtype == 'object':
|
||||
df['LEA'] = pd.to_numeric(df['LEA'], errors='coerce')
|
||||
if 'URN' in df.columns and df['URN'].dtype == 'object':
|
||||
df['URN'] = pd.to_numeric(df['URN'], errors='coerce')
|
||||
|
||||
# Filter to schools only (RECTYPE == 1 means school level data)
|
||||
if 'RECTYPE' in df.columns:
|
||||
df = df[df['RECTYPE'] == 1].copy()
|
||||
|
||||
# Add year and local authority name
|
||||
df['year'] = year
|
||||
|
||||
# Try different column names for LA name
|
||||
la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME']
|
||||
la_col_found = None
|
||||
for col in la_name_cols:
|
||||
if col in df.columns:
|
||||
la_col_found = col
|
||||
break
|
||||
|
||||
if la_col_found:
|
||||
df['local_authority'] = df[la_col_found]
|
||||
elif 'LEA' in df.columns:
|
||||
# Map LEA codes to names using our mapping
|
||||
df['local_authority'] = df['LEA'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str))
|
||||
|
||||
# Rename columns using mapping
|
||||
rename_dict = {k: v for k, v in COLUMN_MAPPINGS.items() if k in df.columns}
|
||||
df = df.rename(columns=rename_dict)
|
||||
|
||||
# Create address field (vectorized)
|
||||
df['address'] = create_address_fast(df)
|
||||
|
||||
# Map school type codes to names (vectorized)
|
||||
if 'school_type_code' in df.columns:
|
||||
df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna('Other')
|
||||
|
||||
# Parse numeric columns (vectorized - much faster than .apply())
|
||||
for col in NUMERIC_COLUMNS:
|
||||
if col in df.columns:
|
||||
df[col] = parse_numeric_vectorized(df[col])
|
||||
|
||||
print(f" Loaded {len(df)} schools for year {year}")
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error loading {ks2_file}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_school_data() -> pd.DataFrame:
|
||||
"""
|
||||
Load and combine all school data from CSV files in year folders.
|
||||
Uses lru_cache for singleton-like behavior.
|
||||
"""
|
||||
all_data = []
|
||||
|
||||
data_dir = settings.data_dir
|
||||
if data_dir.exists():
|
||||
for year_folder in data_dir.iterdir():
|
||||
if year_folder.is_dir() and re.match(r'\d{4}-\d{4}', year_folder.name):
|
||||
year = extract_year_from_folder(year_folder.name)
|
||||
if year is None:
|
||||
continue
|
||||
|
||||
df = load_year_data(year_folder, year)
|
||||
if df is not None:
|
||||
all_data.append(df)
|
||||
|
||||
if all_data:
|
||||
result = pd.concat(all_data, ignore_index=True)
|
||||
print(f"\nTotal records loaded: {len(result)}")
|
||||
print(f"Unique schools: {result['urn'].nunique()}")
|
||||
print(f"Years: {sorted(result['year'].unique())}")
|
||||
return result
|
||||
else:
|
||||
print("No data files found. Creating empty DataFrame.")
|
||||
return pd.DataFrame()
|
||||
|
||||
|
||||
def clear_cache():
|
||||
"""Clear the data cache to force reload."""
|
||||
load_school_data.cache_clear()
|
||||
|
||||
Reference in New Issue
Block a user