""" Batch geocode postcodes via Postcodes.io and update dim_location with lat/lng + PostGIS geometry. Usage: python geocode_postcodes.py [--batch-size 100] """ from __future__ import annotations import argparse import os import time import psycopg2 import psycopg2.extras import requests POSTCODES_IO_BULK = "https://api.postcodes.io/postcodes" BATCH_SIZE = 100 # Postcodes.io max per request def get_db_connection(): return psycopg2.connect( host=os.environ.get("PG_HOST", "localhost"), port=os.environ.get("PG_PORT", "5432"), user=os.environ.get("PG_USER", "postgres"), password=os.environ.get("PG_PASSWORD", "postgres"), dbname=os.environ.get("PG_DATABASE", "school_compare"), ) def fetch_ungeooded_postcodes(conn, limit: int = 5000) -> list[dict]: """Get postcodes from dim_location that don't have lat/lng yet.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT urn, postcode FROM marts.dim_location WHERE geom IS NULL AND postcode IS NOT NULL LIMIT %s """, (limit,)) return cur.fetchall() def bulk_geocode(postcodes: list[str]) -> dict[str, tuple[float, float]]: """Geocode a batch of postcodes via Postcodes.io bulk API.""" resp = requests.post( POSTCODES_IO_BULK, json={"postcodes": postcodes}, timeout=30, ) resp.raise_for_status() results = {} for item in resp.json().get("result", []): if item["result"]: pc = item["query"].upper().replace(" ", "") results[pc] = (item["result"]["latitude"], item["result"]["longitude"]) return results def update_locations(conn, updates: list[tuple[float, float, int]]): """Update dim_location with lat/lng and PostGIS geometry.""" with conn.cursor() as cur: psycopg2.extras.execute_batch(cur, """ UPDATE marts.dim_location SET geom = ST_SetSRID(ST_MakePoint(%s, %s), 4326) WHERE urn = %s """, [(lng, lat, urn) for lat, lng, urn in updates]) conn.commit() def main(): parser = argparse.ArgumentParser(description="Batch geocode school postcodes") parser.add_argument("--batch-size", type=int, default=BATCH_SIZE) args = parser.parse_args() conn = get_db_connection() rows = fetch_ungeooded_postcodes(conn) if not rows: print("All postcodes already geocoded.") return print(f"Geocoding {len(rows)} postcodes...") total_updated = 0 for i in range(0, len(rows), args.batch_size): batch = rows[i : i + args.batch_size] postcodes = [r["postcode"] for r in batch if r["postcode"]] urn_by_pc = {} for r in batch: if r["postcode"]: pc_key = r["postcode"].upper().replace(" ", "") urn_by_pc.setdefault(pc_key, []).append(r["urn"]) results = bulk_geocode(postcodes) updates = [] for pc, (lat, lng) in results.items(): for urn in urn_by_pc.get(pc, []): updates.append((lat, lng, urn)) if updates: update_locations(conn, updates) total_updated += len(updates) print(f" Batch {i // args.batch_size + 1}: geocoded {len(results)}/{len(postcodes)} postcodes") # Rate limit: Postcodes.io is generous but be polite time.sleep(0.2) conn.close() print(f"Done. Updated {total_updated} locations.") if __name__ == "__main__": main()