""" Data loading module with optimized pandas operations. Uses vectorized operations instead of .apply() for performance. """ import pandas as pd import numpy as np from pathlib import Path from functools import lru_cache import re import requests from typing import Optional, Dict, Tuple from .config import settings from .schemas import ( COLUMN_MAPPINGS, NUMERIC_COLUMNS, SCHOOL_TYPE_MAP, NULL_VALUES, LA_CODE_TO_NAME, ) # Cache for postcode geocoding _postcode_cache: Dict[str, Tuple[float, float]] = {} def geocode_postcodes_bulk(postcodes: list) -> Dict[str, Tuple[float, float]]: """ Geocode postcodes in bulk using postcodes.io API. Returns dict of postcode -> (latitude, longitude). """ results = {} # Remove invalid postcodes and deduplicate valid_postcodes = [p.strip().upper() for p in postcodes if p and isinstance(p, str) and len(p.strip()) >= 5] valid_postcodes = list(set(valid_postcodes)) if not valid_postcodes: return results # postcodes.io allows max 100 postcodes per request batch_size = 100 for i in range(0, len(valid_postcodes), batch_size): batch = valid_postcodes[i:i + batch_size] try: response = requests.post( 'https://api.postcodes.io/postcodes', json={'postcodes': batch}, timeout=30 ) if response.status_code == 200: data = response.json() for item in data.get('result', []): if item and item.get('result'): pc = item['query'].upper() lat = item['result'].get('latitude') lon = item['result'].get('longitude') if lat and lon: results[pc] = (lat, lon) except Exception as e: print(f" Warning: Geocoding batch failed: {e}") return results def geocode_single_postcode(postcode: str) -> Optional[Tuple[float, float]]: """Geocode a single postcode using postcodes.io API.""" if not postcode: return None postcode = postcode.strip().upper() # Check cache first if postcode in _postcode_cache: return _postcode_cache[postcode] try: response = requests.get( f'https://api.postcodes.io/postcodes/{postcode}', timeout=10 ) if response.status_code == 200: data = response.json() if data.get('result'): lat = data['result'].get('latitude') lon = data['result'].get('longitude') if lat and lon: _postcode_cache[postcode] = (lat, lon) return (lat, lon) except Exception: pass return None def extract_year_from_folder(folder_name: str) -> Optional[int]: """Extract the end year from folder name like '2023-2024' -> 2024.""" match = re.search(r'(\d{4})-(\d{4})', folder_name) if match: return int(match.group(2)) return None def parse_numeric_vectorized(series: pd.Series) -> pd.Series: """ Vectorized numeric parsing - much faster than .apply(). Handles SUPP, NE, NA, NP, %, etc. """ # Convert to string first str_series = series.astype(str) # Replace null values with NaN for null_val in NULL_VALUES: str_series = str_series.replace(null_val, np.nan) # Remove % signs str_series = str_series.str.rstrip('%') # Convert to numeric return pd.to_numeric(str_series, errors='coerce') def create_address_vectorized(df: pd.DataFrame) -> pd.Series: """ Vectorized address creation - much faster than .apply(). """ parts = [] if 'address1' in df.columns: parts.append(df['address1'].fillna('').astype(str)) if 'town' in df.columns: parts.append(df['town'].fillna('').astype(str)) if 'postcode' in df.columns: parts.append(df['postcode'].fillna('').astype(str)) if not parts: return pd.Series([''] * len(df), index=df.index) # Combine parts with comma separator, filtering empty strings result = pd.Series([''] * len(df), index=df.index) for i, row_idx in enumerate(df.index): row_parts = [p.iloc[i] if hasattr(p, 'iloc') else p[i] for p in parts] row_parts = [p for p in row_parts if p and p.strip()] result.iloc[i] = ', '.join(row_parts) return result def create_address_fast(df: pd.DataFrame) -> pd.Series: """ Fast vectorized address creation using string concatenation. """ addr1 = df.get('address1', pd.Series([''] * len(df))).fillna('').astype(str) town = df.get('town', pd.Series([''] * len(df))).fillna('').astype(str) postcode = df.get('postcode', pd.Series([''] * len(df))).fillna('').astype(str) # Build address with proper separators result = addr1.str.strip() # Add town if not empty town_mask = town.str.strip() != '' result = result.where(~town_mask, result + ', ' + town.str.strip()) # Add postcode if not empty postcode_mask = postcode.str.strip() != '' result = result.where(~postcode_mask, result + ', ' + postcode.str.strip()) # Clean up leading commas result = result.str.lstrip(', ') return result def load_year_data(year_folder: Path, year: int) -> Optional[pd.DataFrame]: """Load and process data for a single year.""" ks2_file = year_folder / "england_ks2final.csv" if not ks2_file.exists(): return None try: print(f"Loading data from {ks2_file}") df = pd.read_csv(ks2_file, low_memory=False) # Handle column types if 'LEA' in df.columns and df['LEA'].dtype == 'object': df['LEA'] = pd.to_numeric(df['LEA'], errors='coerce') if 'URN' in df.columns and df['URN'].dtype == 'object': df['URN'] = pd.to_numeric(df['URN'], errors='coerce') # Filter to schools only (RECTYPE == 1 means school level data) if 'RECTYPE' in df.columns: df = df[df['RECTYPE'] == 1].copy() # Add year and local authority name df['year'] = year # Try different column names for LA name la_name_cols = ['LANAME', 'LA (name)', 'LA_NAME', 'LA NAME'] la_col_found = None for col in la_name_cols: if col in df.columns: la_col_found = col break if la_col_found: df['local_authority'] = df[la_col_found] elif 'LEA' in df.columns: # Map LEA codes to names using our mapping df['local_authority'] = df['LEA'].map(LA_CODE_TO_NAME).fillna(df['LEA'].astype(str)) # Rename columns using mapping rename_dict = {k: v for k, v in COLUMN_MAPPINGS.items() if k in df.columns} df = df.rename(columns=rename_dict) # Create address field (vectorized) df['address'] = create_address_fast(df) # Map school type codes to names (vectorized) if 'school_type_code' in df.columns: df['school_type'] = df['school_type_code'].map(SCHOOL_TYPE_MAP).fillna('Other') # Parse numeric columns (vectorized - much faster than .apply()) for col in NUMERIC_COLUMNS: if col in df.columns: df[col] = parse_numeric_vectorized(df[col]) # Initialize lat/long columns df['latitude'] = None df['longitude'] = None print(f" Loaded {len(df)} schools for year {year}") return df except Exception as e: print(f"Error loading {ks2_file}: {e}") return None @lru_cache(maxsize=1) def load_school_data() -> pd.DataFrame: """ Load and combine all school data from CSV files in year folders. Uses lru_cache for singleton-like behavior. """ all_data = [] data_dir = settings.data_dir if data_dir.exists(): for year_folder in data_dir.iterdir(): if year_folder.is_dir() and re.match(r'\d{4}-\d{4}', year_folder.name): year = extract_year_from_folder(year_folder.name) if year is None: continue df = load_year_data(year_folder, year) if df is not None: all_data.append(df) if all_data: result = pd.concat(all_data, ignore_index=True) print(f"\nTotal records loaded: {len(result)}") print(f"Unique schools: {result['urn'].nunique()}") print(f"Years: {sorted(result['year'].unique())}") # Note: Geocoding is done lazily when location search is used # This keeps startup fast return result else: print("No data files found. Creating empty DataFrame.") return pd.DataFrame() def clear_cache(): """Clear the data cache to force reload.""" load_school_data.cache_clear() def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """ Calculate the great circle distance between two points on Earth (in miles). """ from math import radians, cos, sin, asin, sqrt # Convert to radians lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2]) # Haversine formula dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) # Earth's radius in miles r = 3956 return c * r