""" IDACI (Income Deprivation Affecting Children Index) loader. Source: English Indices of Deprivation 2019 https://www.gov.uk/government/statistics/english-indices-of-deprivation-2019 This is a one-time download (5-yearly release). We join school postcodes to LSOAs via postcodes.io, then look up IDACI scores from the IoD2019 file. Update: ~5-yearly (next release expected 2025/26) """ import argparse import sys from pathlib import Path import pandas as pd import requests sys.path.insert(0, str(Path(__file__).parent.parent)) from config import SUPPLEMENTARY_DIR from db import get_session DEST_DIR = SUPPLEMENTARY_DIR / "idaci" # IoD 2019 supplementary data — "Income Deprivation Affecting Children Index (IDACI)" IOD_2019_URL = ( "https://assets.publishing.service.gov.uk/government/uploads/system/uploads/" "attachment_data/file/833970/File_1_-_IMD2019_Index_of_Multiple_Deprivation.xlsx" ) POSTCODES_IO_BATCH = "https://api.postcodes.io/postcodes" BATCH_SIZE = 100 def download(data_dir: Path | None = None) -> Path: dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR dest.mkdir(parents=True, exist_ok=True) filename = "iod2019_idaci.xlsx" dest_file = dest / filename if dest_file.exists(): print(f" IDACI: {filename} already exists, skipping download.") return dest_file print(f" IDACI: downloading IoD2019 file ...") resp = requests.get(IOD_2019_URL, timeout=300, stream=True) resp.raise_for_status() with open(dest_file, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): f.write(chunk) print(f" IDACI: saved {dest_file}") return dest_file def _postcode_to_lsoa(postcodes: list[str]) -> dict[str, str]: """Batch-resolve postcodes to LSOA codes via postcodes.io.""" result = {} valid = [p.strip().upper() for p in postcodes if p and len(str(p).strip()) >= 5] valid = list(set(valid)) for i in range(0, len(valid), BATCH_SIZE): batch = valid[i:i + BATCH_SIZE] try: resp = requests.post(POSTCODES_IO_BATCH, json={"postcodes": batch}, timeout=30) if resp.status_code == 200: for item in resp.json().get("result", []): if item and item.get("result"): lsoa = item["result"].get("lsoa") if lsoa: result[item["query"].upper()] = lsoa except Exception as e: print(f" Warning: postcodes.io batch failed: {e}") return result def load(path: Path | None = None, data_dir: Path | None = None) -> dict: dest = (data_dir / "supplementary" / "idaci") if data_dir else DEST_DIR if path is None: files = sorted(dest.glob("*.xlsx")) if not files: raise FileNotFoundError(f"No IDACI file found in {dest}") path = files[-1] print(f" IDACI: loading IoD2019 from {path} ...") # IoD2019 File 1 — sheet "IoD2019 IDACI" or similar try: iod_df = pd.read_excel(path, sheet_name=None) # Find sheet with IDACI data idaci_sheet = None for name, df in iod_df.items(): if "IDACI" in name.upper() or "IDACI" in str(df.columns.tolist()).upper(): idaci_sheet = name break if idaci_sheet is None: idaci_sheet = list(iod_df.keys())[0] df_iod = iod_df[idaci_sheet] except Exception as e: raise RuntimeError(f"Could not read IoD2019 file: {e}") # Normalise column names — IoD2019 uses specific headers col_lsoa = next((c for c in df_iod.columns if "LSOA" in str(c).upper() and "code" in str(c).lower()), None) col_score = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "score" in str(c).lower()), None) col_rank = next((c for c in df_iod.columns if "IDACI" in str(c).upper() and "rank" in str(c).lower()), None) if not col_lsoa or not col_score: print(f" IDACI columns available: {list(df_iod.columns)[:20]}") raise ValueError("Could not find LSOA code or IDACI score columns") df_iod = df_iod[[col_lsoa, col_score]].copy() df_iod.columns = ["lsoa_code", "idaci_score"] df_iod = df_iod.dropna() # Compute decile from rank (or from score distribution) total = len(df_iod) df_iod = df_iod.sort_values("idaci_score", ascending=False) df_iod["idaci_decile"] = (pd.qcut(df_iod["idaci_score"], 10, labels=False) + 1).astype(int) # Decile 1 = most deprived (highest IDACI score) df_iod["idaci_decile"] = 11 - df_iod["idaci_decile"] lsoa_lookup = df_iod.set_index("lsoa_code")[["idaci_score", "idaci_decile"]].to_dict("index") print(f" IDACI: loaded {len(lsoa_lookup)} LSOA records") # Fetch all school postcodes from the database with get_session() as session: from sqlalchemy import text rows = session.execute(text("SELECT urn, postcode FROM schools WHERE postcode IS NOT NULL")).fetchall() postcodes = [r[1] for r in rows] print(f" IDACI: resolving {len(postcodes)} postcodes via postcodes.io ...") pc_to_lsoa = _postcode_to_lsoa(postcodes) print(f" IDACI: resolved {len(pc_to_lsoa)} postcodes to LSOAs") inserted = skipped = 0 with get_session() as session: from sqlalchemy import text for urn, postcode in rows: lsoa = pc_to_lsoa.get(str(postcode).strip().upper()) if not lsoa: skipped += 1 continue iod = lsoa_lookup.get(lsoa) if not iod: skipped += 1 continue session.execute( text(""" INSERT INTO school_deprivation (urn, lsoa_code, idaci_score, idaci_decile) VALUES (:urn, :lsoa, :score, :decile) ON CONFLICT (urn) DO UPDATE SET lsoa_code = EXCLUDED.lsoa_code, idaci_score = EXCLUDED.idaci_score, idaci_decile = EXCLUDED.idaci_decile """), {"urn": urn, "lsoa": lsoa, "score": float(iod["idaci_score"]), "decile": int(iod["idaci_decile"])}, ) inserted += 1 if inserted % 2000 == 0: session.flush() print(f" IDACI: upserted {inserted}, skipped {skipped}") return {"inserted": inserted, "updated": 0, "skipped": skipped} if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--action", choices=["download", "load", "all"], default="all") parser.add_argument("--data-dir", type=Path, default=None) args = parser.parse_args() if args.action in ("download", "all"): download(args.data_dir) if args.action in ("load", "all"): load(data_dir=args.data_dir)