Files
school_compare/pipeline/scripts/geocode_postcodes.py

119 lines
3.5 KiB
Python
Raw Normal View History

"""
Batch geocode postcodes via Postcodes.io and update dim_location with lat/lng + PostGIS geometry.
Usage:
python geocode_postcodes.py [--batch-size 100]
"""
from __future__ import annotations
import argparse
import os
import time
import psycopg2
import psycopg2.extras
import requests
POSTCODES_IO_BULK = "https://api.postcodes.io/postcodes"
BATCH_SIZE = 100 # Postcodes.io max per request
def get_db_connection():
return psycopg2.connect(
host=os.environ.get("PG_HOST", "localhost"),
port=os.environ.get("PG_PORT", "5432"),
user=os.environ.get("PG_USER", "postgres"),
password=os.environ.get("PG_PASSWORD", "postgres"),
dbname=os.environ.get("PG_DATABASE", "school_compare"),
)
def fetch_ungeooded_postcodes(conn, limit: int = 5000) -> list[dict]:
"""Get postcodes from dim_location that don't have lat/lng yet."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT urn, postcode
FROM marts.dim_location
WHERE geom IS NULL
AND postcode IS NOT NULL
LIMIT %s
""", (limit,))
return cur.fetchall()
def bulk_geocode(postcodes: list[str]) -> dict[str, tuple[float, float]]:
"""Geocode a batch of postcodes via Postcodes.io bulk API."""
resp = requests.post(
POSTCODES_IO_BULK,
json={"postcodes": postcodes},
timeout=30,
)
resp.raise_for_status()
results = {}
for item in resp.json().get("result", []):
if item["result"]:
pc = item["query"].upper().replace(" ", "")
results[pc] = (item["result"]["latitude"], item["result"]["longitude"])
return results
def update_locations(conn, updates: list[tuple[float, float, int]]):
"""Update dim_location with lat/lng and PostGIS geometry."""
with conn.cursor() as cur:
psycopg2.extras.execute_batch(cur, """
UPDATE marts.dim_location
SET geom = ST_SetSRID(ST_MakePoint(%s, %s), 4326)
WHERE urn = %s
""", [(lng, lat, urn) for lat, lng, urn in updates])
conn.commit()
def main():
parser = argparse.ArgumentParser(description="Batch geocode school postcodes")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE)
args = parser.parse_args()
conn = get_db_connection()
rows = fetch_ungeooded_postcodes(conn)
if not rows:
print("All postcodes already geocoded.")
return
print(f"Geocoding {len(rows)} postcodes...")
total_updated = 0
for i in range(0, len(rows), args.batch_size):
batch = rows[i : i + args.batch_size]
postcodes = [r["postcode"] for r in batch if r["postcode"]]
urn_by_pc = {}
for r in batch:
if r["postcode"]:
pc_key = r["postcode"].upper().replace(" ", "")
urn_by_pc.setdefault(pc_key, []).append(r["urn"])
results = bulk_geocode(postcodes)
updates = []
for pc, (lat, lng) in results.items():
for urn in urn_by_pc.get(pc, []):
updates.append((lat, lng, urn))
if updates:
update_locations(conn, updates)
total_updated += len(updates)
print(f" Batch {i // args.batch_size + 1}: geocoded {len(results)}/{len(postcodes)} postcodes")
# Rate limit: Postcodes.io is generous but be polite
time.sleep(0.2)
conn.close()
print(f"Done. Updated {total_updated} locations.")
if __name__ == "__main__":
main()