feat(integrator): add KS2 re-import via Kestra and backend admin endpoint
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 47s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m13s
Build and Push Docker Images / Build Integrator (push) Successful in 40s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 47s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m13s
Build and Push Docker Images / Build Integrator (push) Successful in 40s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 0s
- backend: POST /api/admin/reimport-ks2 runs full CSV migration in a thread - backend/docker-compose: ADMIN_API_KEY env var (default: changeme) so the key is stable across restarts and the integrator can call the endpoint - integrator: sources/ks2.py triggers the backend endpoint (900s timeout) - integrator: flows/ks2.yml Kestra flow (manual trigger, no schedule) To re-ingest after a DB wipe: trigger the ks2-reimport flow from the Kestra UI at http://localhost:8080. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -19,6 +19,7 @@ from slowapi.util import get_remote_address
|
|||||||
from slowapi.errors import RateLimitExceeded
|
from slowapi.errors import RateLimitExceeded
|
||||||
from starlette.middleware.base import BaseHTTPMiddleware
|
from starlette.middleware.base import BaseHTTPMiddleware
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from .config import settings
|
from .config import settings
|
||||||
from .data_loader import (
|
from .data_loader import (
|
||||||
clear_cache,
|
clear_cache,
|
||||||
@@ -28,6 +29,7 @@ from .data_loader import (
|
|||||||
)
|
)
|
||||||
from .data_loader import get_data_info as get_db_info
|
from .data_loader import get_data_info as get_db_info
|
||||||
from .database import check_and_migrate_if_needed
|
from .database import check_and_migrate_if_needed
|
||||||
|
from .migration import run_full_migration
|
||||||
from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS
|
from .schemas import METRIC_DEFINITIONS, RANKING_COLUMNS, SCHOOL_COLUMNS
|
||||||
from .utils import clean_for_json
|
from .utils import clean_for_json
|
||||||
|
|
||||||
@@ -633,6 +635,27 @@ async def reload_data(
|
|||||||
return {"status": "reloaded"}
|
return {"status": "reloaded"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/api/admin/reimport-ks2")
|
||||||
|
@limiter.limit("2/minute")
|
||||||
|
async def reimport_ks2(
|
||||||
|
request: Request,
|
||||||
|
_: bool = Depends(verify_admin_api_key)
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Re-run the full KS2 CSV migration (drop + reimport schools and results).
|
||||||
|
Use when the database has been wiped and needs repopulating from the CSV files.
|
||||||
|
Runs synchronously — expect this to take several minutes.
|
||||||
|
Requires X-API-Key header with valid admin API key.
|
||||||
|
"""
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
success = await loop.run_in_executor(None, run_full_migration)
|
||||||
|
if not success:
|
||||||
|
raise HTTPException(status_code=500, detail="Migration failed: no CSV data found in data directory")
|
||||||
|
clear_cache()
|
||||||
|
load_school_data()
|
||||||
|
return {"status": "reimported"}
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# SEO FILES
|
# SEO FILES
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare
|
DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare
|
||||||
PYTHONUNBUFFERED: 1
|
PYTHONUNBUFFERED: 1
|
||||||
|
ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme}
|
||||||
volumes:
|
volumes:
|
||||||
- ./data:/app/data:ro
|
- ./data:/app/data:ro
|
||||||
depends_on:
|
depends_on:
|
||||||
@@ -121,6 +122,8 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare
|
DATABASE_URL: postgresql://schoolcompare:schoolcompare@db:5432/schoolcompare
|
||||||
DATA_DIR: /data
|
DATA_DIR: /data
|
||||||
|
BACKEND_URL: http://backend:80
|
||||||
|
ADMIN_API_KEY: ${ADMIN_API_KEY:-changeme}
|
||||||
PYTHONUNBUFFERED: 1
|
PYTHONUNBUFFERED: 1
|
||||||
volumes:
|
volumes:
|
||||||
- ./data:/data
|
- ./data:/data
|
||||||
|
|||||||
22
integrator/flows/ks2.yml
Normal file
22
integrator/flows/ks2.yml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
id: ks2-reimport
|
||||||
|
namespace: schoolcompare.data
|
||||||
|
description: Re-import KS2 attainment data from bundled CSV files (use after DB wipe)
|
||||||
|
|
||||||
|
# No scheduled trigger — run manually from the Kestra UI when needed.
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
- id: reimport
|
||||||
|
type: io.kestra.plugin.core.http.Request
|
||||||
|
uri: http://integrator:8001/run/ks2?action=load
|
||||||
|
method: POST
|
||||||
|
allowFailed: false
|
||||||
|
timeout: PT20M # full migration takes ~10 min; give headroom
|
||||||
|
|
||||||
|
errors:
|
||||||
|
- id: notify-failure
|
||||||
|
type: io.kestra.plugin.core.log.Log
|
||||||
|
message: "KS2 re-import FAILED: {{ error.message }}"
|
||||||
|
|
||||||
|
retry:
|
||||||
|
maxAttempts: 2
|
||||||
|
delay: PT5M
|
||||||
@@ -9,3 +9,6 @@ DATABASE_URL = os.environ.get(
|
|||||||
|
|
||||||
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
||||||
SUPPLEMENTARY_DIR = DATA_DIR / "supplementary"
|
SUPPLEMENTARY_DIR = DATA_DIR / "supplementary"
|
||||||
|
|
||||||
|
BACKEND_URL = os.environ.get("BACKEND_URL", "http://backend:80")
|
||||||
|
ADMIN_API_KEY = os.environ.get("ADMIN_API_KEY", "changeme")
|
||||||
|
|||||||
47
integrator/scripts/sources/ks2.py
Normal file
47
integrator/scripts/sources/ks2.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
"""
|
||||||
|
KS2 attainment data re-importer.
|
||||||
|
|
||||||
|
Triggers a full re-import of the KS2 CSV data by calling the backend's
|
||||||
|
admin endpoint. The backend owns the migration logic and CSV column mappings;
|
||||||
|
this module is a thin trigger so the re-import can be orchestrated via Kestra
|
||||||
|
like all other data sources.
|
||||||
|
|
||||||
|
The CSV files must already be present in the data volume under
|
||||||
|
/data/{year}/england_ks2final.csv
|
||||||
|
(populated at deploy time from the repo's data/ directory).
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import requests
|
||||||
|
from config import BACKEND_URL, ADMIN_API_KEY
|
||||||
|
|
||||||
|
|
||||||
|
def download():
|
||||||
|
"""No download step — CSVs are shipped with the repo."""
|
||||||
|
print("KS2 CSVs are bundled in the data volume; no download needed.")
|
||||||
|
return {"skipped": True}
|
||||||
|
|
||||||
|
|
||||||
|
def load():
|
||||||
|
"""Trigger full KS2 re-import via the backend admin endpoint."""
|
||||||
|
url = f"{BACKEND_URL}/api/admin/reimport-ks2"
|
||||||
|
print(f"POST {url}")
|
||||||
|
resp = requests.post(
|
||||||
|
url,
|
||||||
|
headers={"X-API-Key": ADMIN_API_KEY},
|
||||||
|
timeout=900, # migration can take ~10 minutes
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
result = resp.json()
|
||||||
|
print(f"Result: {result}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--action", choices=["download", "load", "all"], default="all")
|
||||||
|
args = parser.parse_args()
|
||||||
|
if args.action in ("download", "all"):
|
||||||
|
download()
|
||||||
|
if args.action in ("load", "all"):
|
||||||
|
load()
|
||||||
Reference in New Issue
Block a user