""" Data integrator HTTP server. Kestra calls this server via HTTP tasks to trigger download/load operations. """ import importlib import sys import traceback from pathlib import Path from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse sys.path.insert(0, "/app/scripts") app = FastAPI(title="SchoolCompare Data Integrator", version="1.0.0") SOURCES = { "ofsted", "gias", "parent_view", "census", "admissions", "sen_detail", "phonics", "idaci", "finance", "ks2", } @app.get("/health") def health(): return {"status": "ok"} @app.post("/run/{source}") def run_source(source: str, action: str = "all"): """ Trigger a data source download and/or load. action: "download" | "load" | "all" """ if source not in SOURCES: raise HTTPException(status_code=404, detail=f"Unknown source '{source}'. Available: {sorted(SOURCES)}") if action not in ("download", "load", "all"): raise HTTPException(status_code=400, detail="action must be 'download', 'load', or 'all'") try: mod = importlib.import_module(f"sources.{source}") result = {} if action in ("download", "all"): mod.download() if action in ("load", "all"): result = mod.load() return {"source": source, "action": action, "result": result} except Exception as e: tb = traceback.format_exc() raise HTTPException(status_code=500, detail={"error": str(e), "traceback": tb}) @app.post("/run-all") def run_all(action: str = "all"): """Trigger all sources in sequence.""" results = {} for source in sorted(SOURCES): try: mod = importlib.import_module(f"sources.{source}") if action in ("download", "all"): mod.download() if action in ("load", "all"): results[source] = mod.load() except Exception as e: results[source] = {"error": str(e)} return results