fix(sync): use numeric default_sorting_field, dynamic KS2/KS4 joins, populate geopoints
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m5s
Build and Push Docker Images / Build Integrator (push) Successful in 55s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m28s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 32s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m5s
Build and Push Docker Images / Build Integrator (push) Successful in 55s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m28s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s
- Typesense requires numeric default_sorting_field — use total_pupils - Dynamically include KS2/KS4 joins only if those tables exist - Extract lat/lng from PostGIS geom and populate Typesense geopoint field Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -35,12 +35,12 @@ COLLECTION_SCHEMA = {
|
|||||||
{"name": "progress_8_score", "type": "float", "optional": True},
|
{"name": "progress_8_score", "type": "float", "optional": True},
|
||||||
{"name": "total_pupils", "type": "int32", "optional": True},
|
{"name": "total_pupils", "type": "int32", "optional": True},
|
||||||
],
|
],
|
||||||
"default_sorting_field": "school_name",
|
"default_sorting_field": "total_pupils",
|
||||||
}
|
}
|
||||||
|
|
||||||
OFSTED_LABELS = {1: "Outstanding", 2: "Good", 3: "Requires Improvement", 4: "Inadequate"}
|
OFSTED_LABELS = {1: "Outstanding", 2: "Good", 3: "Requires Improvement", 4: "Inadequate"}
|
||||||
|
|
||||||
QUERY = """
|
QUERY_BASE = """
|
||||||
SELECT
|
SELECT
|
||||||
s.urn,
|
s.urn,
|
||||||
s.school_name,
|
s.school_name,
|
||||||
@@ -52,12 +52,13 @@ QUERY = """
|
|||||||
l.postcode,
|
l.postcode,
|
||||||
s.headteacher_name,
|
s.headteacher_name,
|
||||||
s.total_pupils,
|
s.total_pupils,
|
||||||
-- Latest KS2
|
ST_Y(l.geom) as lat,
|
||||||
ks2.rwm_expected_pct,
|
ST_X(l.geom) as lng
|
||||||
-- Latest KS4
|
|
||||||
ks4.progress_8_score
|
|
||||||
FROM marts.dim_school s
|
FROM marts.dim_school s
|
||||||
LEFT JOIN marts.dim_location l ON s.urn = l.urn
|
LEFT JOIN marts.dim_location l ON s.urn = l.urn
|
||||||
|
"""
|
||||||
|
|
||||||
|
QUERY_KS2_JOIN = """
|
||||||
LEFT JOIN LATERAL (
|
LEFT JOIN LATERAL (
|
||||||
SELECT rwm_expected_pct
|
SELECT rwm_expected_pct
|
||||||
FROM marts.fact_ks2_performance
|
FROM marts.fact_ks2_performance
|
||||||
@@ -65,6 +66,9 @@ QUERY = """
|
|||||||
ORDER BY year DESC
|
ORDER BY year DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
) ks2 ON true
|
) ks2 ON true
|
||||||
|
"""
|
||||||
|
|
||||||
|
QUERY_KS4_JOIN = """
|
||||||
LEFT JOIN LATERAL (
|
LEFT JOIN LATERAL (
|
||||||
SELECT progress_8_score
|
SELECT progress_8_score
|
||||||
FROM marts.fact_ks4_performance
|
FROM marts.fact_ks4_performance
|
||||||
@@ -103,15 +107,14 @@ def build_document(row: dict) -> dict:
|
|||||||
doc["ofsted_rating"] = OFSTED_LABELS.get(row["ofsted_grade"], "")
|
doc["ofsted_rating"] = OFSTED_LABELS.get(row["ofsted_grade"], "")
|
||||||
if row.get("headteacher_name"):
|
if row.get("headteacher_name"):
|
||||||
doc["headteacher_name"] = row["headteacher_name"]
|
doc["headteacher_name"] = row["headteacher_name"]
|
||||||
if row.get("total_pupils"):
|
doc["total_pupils"] = row["total_pupils"] or 0
|
||||||
doc["total_pupils"] = row["total_pupils"]
|
|
||||||
if row.get("rwm_expected_pct") is not None:
|
if row.get("rwm_expected_pct") is not None:
|
||||||
doc["rwm_expected_pct"] = float(row["rwm_expected_pct"])
|
doc["rwm_expected_pct"] = float(row["rwm_expected_pct"])
|
||||||
if row.get("progress_8_score") is not None:
|
if row.get("progress_8_score") is not None:
|
||||||
doc["progress_8_score"] = float(row["progress_8_score"])
|
doc["progress_8_score"] = float(row["progress_8_score"])
|
||||||
|
|
||||||
# Geo: location field expects [lat, lng] — will be populated once
|
if row.get("lat") is not None and row.get("lng") is not None:
|
||||||
# dim_location has lat/lng from PostGIS geocoding
|
doc["location"] = [float(row["lat"]), float(row["lng"])]
|
||||||
|
|
||||||
return doc
|
return doc
|
||||||
|
|
||||||
@@ -133,10 +136,32 @@ def sync(typesense_url: str, api_key: str):
|
|||||||
schema = {**COLLECTION_SCHEMA, "name": collection_name}
|
schema = {**COLLECTION_SCHEMA, "name": collection_name}
|
||||||
client.collections.create(schema)
|
client.collections.create(schema)
|
||||||
|
|
||||||
# Fetch data from marts
|
# Fetch data from marts — dynamically include KS2/KS4 joins if tables exist
|
||||||
conn = get_db_connection()
|
conn = get_db_connection()
|
||||||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||||||
cur.execute(QUERY)
|
# Check which fact tables exist
|
||||||
|
cur.execute("""
|
||||||
|
SELECT table_name FROM information_schema.tables
|
||||||
|
WHERE table_schema = 'marts' AND table_name IN ('fact_ks2_performance', 'fact_ks4_performance')
|
||||||
|
""")
|
||||||
|
existing_tables = {r["table_name"] for r in cur.fetchall()}
|
||||||
|
|
||||||
|
select_extra = []
|
||||||
|
joins = ""
|
||||||
|
if "fact_ks2_performance" in existing_tables:
|
||||||
|
select_extra.append("ks2.rwm_expected_pct")
|
||||||
|
joins += QUERY_KS2_JOIN
|
||||||
|
if "fact_ks4_performance" in existing_tables:
|
||||||
|
select_extra.append("ks4.progress_8_score")
|
||||||
|
joins += QUERY_KS4_JOIN
|
||||||
|
|
||||||
|
query = QUERY_BASE
|
||||||
|
if select_extra:
|
||||||
|
# Insert extra select columns before FROM
|
||||||
|
query = query.replace("ST_X(l.geom) as lng", "ST_X(l.geom) as lng,\n " + ",\n ".join(select_extra))
|
||||||
|
query += joins
|
||||||
|
|
||||||
|
cur.execute(query)
|
||||||
rows = cur.fetchall()
|
rows = cur.fetchall()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user