diff --git a/pipeline/scripts/sync_typesense.py b/pipeline/scripts/sync_typesense.py index 3c11d75..cce0660 100644 --- a/pipeline/scripts/sync_typesense.py +++ b/pipeline/scripts/sync_typesense.py @@ -35,12 +35,12 @@ COLLECTION_SCHEMA = { {"name": "progress_8_score", "type": "float", "optional": True}, {"name": "total_pupils", "type": "int32", "optional": True}, ], - "default_sorting_field": "school_name", + "default_sorting_field": "total_pupils", } OFSTED_LABELS = {1: "Outstanding", 2: "Good", 3: "Requires Improvement", 4: "Inadequate"} -QUERY = """ +QUERY_BASE = """ SELECT s.urn, s.school_name, @@ -52,12 +52,13 @@ QUERY = """ l.postcode, s.headteacher_name, s.total_pupils, - -- Latest KS2 - ks2.rwm_expected_pct, - -- Latest KS4 - ks4.progress_8_score + ST_Y(l.geom) as lat, + ST_X(l.geom) as lng FROM marts.dim_school s LEFT JOIN marts.dim_location l ON s.urn = l.urn +""" + +QUERY_KS2_JOIN = """ LEFT JOIN LATERAL ( SELECT rwm_expected_pct FROM marts.fact_ks2_performance @@ -65,6 +66,9 @@ QUERY = """ ORDER BY year DESC LIMIT 1 ) ks2 ON true +""" + +QUERY_KS4_JOIN = """ LEFT JOIN LATERAL ( SELECT progress_8_score FROM marts.fact_ks4_performance @@ -103,15 +107,14 @@ def build_document(row: dict) -> dict: doc["ofsted_rating"] = OFSTED_LABELS.get(row["ofsted_grade"], "") if row.get("headteacher_name"): doc["headteacher_name"] = row["headteacher_name"] - if row.get("total_pupils"): - doc["total_pupils"] = row["total_pupils"] + doc["total_pupils"] = row["total_pupils"] or 0 if row.get("rwm_expected_pct") is not None: doc["rwm_expected_pct"] = float(row["rwm_expected_pct"]) if row.get("progress_8_score") is not None: doc["progress_8_score"] = float(row["progress_8_score"]) - # Geo: location field expects [lat, lng] — will be populated once - # dim_location has lat/lng from PostGIS geocoding + if row.get("lat") is not None and row.get("lng") is not None: + doc["location"] = [float(row["lat"]), float(row["lng"])] return doc @@ -133,10 +136,32 @@ def sync(typesense_url: str, api_key: str): schema = {**COLLECTION_SCHEMA, "name": collection_name} client.collections.create(schema) - # Fetch data from marts + # Fetch data from marts — dynamically include KS2/KS4 joins if tables exist conn = get_db_connection() with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: - cur.execute(QUERY) + # Check which fact tables exist + cur.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'marts' AND table_name IN ('fact_ks2_performance', 'fact_ks4_performance') + """) + existing_tables = {r["table_name"] for r in cur.fetchall()} + + select_extra = [] + joins = "" + if "fact_ks2_performance" in existing_tables: + select_extra.append("ks2.rwm_expected_pct") + joins += QUERY_KS2_JOIN + if "fact_ks4_performance" in existing_tables: + select_extra.append("ks4.progress_8_score") + joins += QUERY_KS4_JOIN + + query = QUERY_BASE + if select_extra: + # Insert extra select columns before FROM + query = query.replace("ST_X(l.geom) as lng", "ST_X(l.geom) as lng,\n " + ",\n ".join(select_extra)) + query += joins + + cur.execute(query) rows = cur.fetchall() conn.close()