feat(pipeline): add Meltano + dbt + Airflow ELT pipeline scaffold
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 35s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m9s
Build and Push Docker Images / Build Integrator (push) Successful in 56s
Build and Push Docker Images / Build Kestra Init (push) Successful in 32s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s

Replaces the hand-rolled integrator with a production-grade ELT pipeline
using Meltano (Singer taps), dbt Core (medallion architecture), and
Apache Airflow (orchestration). Adds Typesense for search and PostGIS
for geospatial queries.

- 6 custom Singer taps (GIAS, EES, Ofsted, Parent View, FBIT, IDACI)
- dbt project: 12 staging, 5 intermediate, 12 mart models
- 3 Airflow DAGs (daily/monthly/annual schedules)
- Typesense sync + batch geocoding scripts
- docker-compose: add Airflow, Typesense; upgrade to PostGIS
- Portainer stack definition matching live deployment topology

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 08:37:53 +00:00
parent 8aca0a7a53
commit 8f02b5125e
65 changed files with 2822 additions and 72 deletions

View File

@@ -0,0 +1,118 @@
"""
Batch geocode postcodes via Postcodes.io and update dim_location with lat/lng + PostGIS geometry.
Usage:
python geocode_postcodes.py [--batch-size 100]
"""
from __future__ import annotations
import argparse
import os
import time
import psycopg2
import psycopg2.extras
import requests
POSTCODES_IO_BULK = "https://api.postcodes.io/postcodes"
BATCH_SIZE = 100 # Postcodes.io max per request
def get_db_connection():
return psycopg2.connect(
host=os.environ.get("PG_HOST", "localhost"),
port=os.environ.get("PG_PORT", "5432"),
user=os.environ.get("PG_USER", "postgres"),
password=os.environ.get("PG_PASSWORD", "postgres"),
dbname=os.environ.get("PG_DATABASE", "school_compare"),
)
def fetch_ungeooded_postcodes(conn, limit: int = 5000) -> list[dict]:
"""Get postcodes from dim_location that don't have lat/lng yet."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT urn, postcode
FROM marts.dim_location
WHERE geom IS NULL
AND postcode IS NOT NULL
LIMIT %s
""", (limit,))
return cur.fetchall()
def bulk_geocode(postcodes: list[str]) -> dict[str, tuple[float, float]]:
"""Geocode a batch of postcodes via Postcodes.io bulk API."""
resp = requests.post(
POSTCODES_IO_BULK,
json={"postcodes": postcodes},
timeout=30,
)
resp.raise_for_status()
results = {}
for item in resp.json().get("result", []):
if item["result"]:
pc = item["query"].upper().replace(" ", "")
results[pc] = (item["result"]["latitude"], item["result"]["longitude"])
return results
def update_locations(conn, updates: list[tuple[float, float, int]]):
"""Update dim_location with lat/lng and PostGIS geometry."""
with conn.cursor() as cur:
psycopg2.extras.execute_batch(cur, """
UPDATE marts.dim_location
SET geom = ST_SetSRID(ST_MakePoint(%s, %s), 4326)
WHERE urn = %s
""", [(lng, lat, urn) for lat, lng, urn in updates])
conn.commit()
def main():
parser = argparse.ArgumentParser(description="Batch geocode school postcodes")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE)
args = parser.parse_args()
conn = get_db_connection()
rows = fetch_ungeooded_postcodes(conn)
if not rows:
print("All postcodes already geocoded.")
return
print(f"Geocoding {len(rows)} postcodes...")
total_updated = 0
for i in range(0, len(rows), args.batch_size):
batch = rows[i : i + args.batch_size]
postcodes = [r["postcode"] for r in batch if r["postcode"]]
urn_by_pc = {}
for r in batch:
if r["postcode"]:
pc_key = r["postcode"].upper().replace(" ", "")
urn_by_pc.setdefault(pc_key, []).append(r["urn"])
results = bulk_geocode(postcodes)
updates = []
for pc, (lat, lng) in results.items():
for urn in urn_by_pc.get(pc, []):
updates.append((lat, lng, urn))
if updates:
update_locations(conn, updates)
total_updated += len(updates)
print(f" Batch {i // args.batch_size + 1}: geocoded {len(results)}/{len(postcodes)} postcodes")
# Rate limit: Postcodes.io is generous but be polite
time.sleep(0.2)
conn.close()
print(f"Done. Updated {total_updated} locations.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,177 @@
"""
Sync dbt marts → Typesense search index.
Reads dim_school + dim_location + latest fact data from PostgreSQL marts,
then upserts into a Typesense collection with zero-downtime alias swapping.
Usage:
python sync_typesense.py [--typesense-url http://localhost:8108] [--api-key xyz]
"""
from __future__ import annotations
import argparse
import os
import sys
import time
import psycopg2
import psycopg2.extras
import typesense
COLLECTION_SCHEMA = {
"fields": [
{"name": "urn", "type": "int32"},
{"name": "school_name", "type": "string"},
{"name": "phase", "type": "string", "facet": True},
{"name": "school_type", "type": "string", "facet": True},
{"name": "local_authority", "type": "string", "facet": True},
{"name": "religious_character", "type": "string", "facet": True, "optional": True},
{"name": "ofsted_rating", "type": "string", "facet": True, "optional": True},
{"name": "postcode", "type": "string"},
{"name": "location", "type": "geopoint", "optional": True},
{"name": "headteacher_name", "type": "string", "optional": True},
{"name": "rwm_expected_pct", "type": "float", "optional": True},
{"name": "progress_8_score", "type": "float", "optional": True},
{"name": "total_pupils", "type": "int32", "optional": True},
],
"default_sorting_field": "school_name",
}
OFSTED_LABELS = {1: "Outstanding", 2: "Good", 3: "Requires Improvement", 4: "Inadequate"}
QUERY = """
SELECT
s.urn,
s.school_name,
s.phase,
s.school_type,
l.local_authority_name as local_authority,
s.religious_character,
s.ofsted_grade,
l.postcode,
s.headteacher_name,
s.total_pupils,
-- Latest KS2
ks2.rwm_expected_pct,
-- Latest KS4
ks4.progress_8_score
FROM marts.dim_school s
LEFT JOIN marts.dim_location l ON s.urn = l.urn
LEFT JOIN LATERAL (
SELECT rwm_expected_pct
FROM marts.fact_ks2_performance
WHERE urn = s.urn
ORDER BY year DESC
LIMIT 1
) ks2 ON true
LEFT JOIN LATERAL (
SELECT progress_8_score
FROM marts.fact_ks4_performance
WHERE urn = s.urn
ORDER BY year DESC
LIMIT 1
) ks4 ON true
"""
def get_db_connection():
return psycopg2.connect(
host=os.environ.get("PG_HOST", "localhost"),
port=os.environ.get("PG_PORT", "5432"),
user=os.environ.get("PG_USER", "postgres"),
password=os.environ.get("PG_PASSWORD", "postgres"),
dbname=os.environ.get("PG_DATABASE", "school_compare"),
)
def build_document(row: dict) -> dict:
"""Convert a DB row to a Typesense document."""
doc = {
"id": str(row["urn"]),
"urn": row["urn"],
"school_name": row["school_name"] or "",
"phase": row["phase"] or "",
"school_type": row["school_type"] or "",
"local_authority": row["local_authority"] or "",
"postcode": row["postcode"] or "",
}
if row.get("religious_character"):
doc["religious_character"] = row["religious_character"]
if row.get("ofsted_grade"):
doc["ofsted_rating"] = OFSTED_LABELS.get(row["ofsted_grade"], "")
if row.get("headteacher_name"):
doc["headteacher_name"] = row["headteacher_name"]
if row.get("total_pupils"):
doc["total_pupils"] = row["total_pupils"]
if row.get("rwm_expected_pct") is not None:
doc["rwm_expected_pct"] = float(row["rwm_expected_pct"])
if row.get("progress_8_score") is not None:
doc["progress_8_score"] = float(row["progress_8_score"])
# Geo: location field expects [lat, lng] — will be populated once
# dim_location has lat/lng from PostGIS geocoding
return doc
def sync(typesense_url: str, api_key: str):
client = typesense.Client({
"nodes": [{"host": typesense_url.split("//")[-1].split(":")[0],
"port": typesense_url.split(":")[-1],
"protocol": "http"}],
"api_key": api_key,
"connection_timeout_seconds": 10,
})
# Create timestamped collection for zero-downtime swap
ts = int(time.time())
collection_name = f"schools_{ts}"
print(f"Creating collection: {collection_name}")
schema = {**COLLECTION_SCHEMA, "name": collection_name}
client.collections.create(schema)
# Fetch data from marts
conn = get_db_connection()
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(QUERY)
rows = cur.fetchall()
conn.close()
print(f"Indexing {len(rows)} schools...")
# Batch import
batch_size = 500
for i in range(0, len(rows), batch_size):
batch = [build_document(r) for r in rows[i : i + batch_size]]
client.collections[collection_name].documents.import_(batch, {"action": "upsert"})
print(f" Indexed {min(i + batch_size, len(rows))}/{len(rows)}")
# Swap alias
print("Swapping alias 'schools' → new collection")
try:
client.aliases.upsert("schools", {"collection_name": collection_name})
except Exception:
# If alias doesn't exist yet, create it
client.aliases.upsert("schools", {"collection_name": collection_name})
print("Done.")
def main():
parser = argparse.ArgumentParser(description="Sync marts to Typesense")
parser.add_argument("--typesense-url", default=os.environ.get("TYPESENSE_URL", "http://localhost:8108"))
parser.add_argument("--api-key", default=os.environ.get("TYPESENSE_API_KEY", ""))
args = parser.parse_args()
if not args.api_key:
print("Error: --api-key or TYPESENSE_API_KEY required", file=sys.stderr)
sys.exit(1)
sync(args.typesense_url, args.api_key)
if __name__ == "__main__":
main()