feat(pipeline): implement parent-view, fbit, idaci Singer taps + align staging/mart models
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 34s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m5s
Build and Push Docker Images / Build Integrator (push) Successful in 57s
Build and Push Docker Images / Build Kestra Init (push) Successful in 31s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m6s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s

Port extraction logic from integrator scripts into Singer SDK taps:
- tap-uk-parent-view: scrapes Ofsted open data portal, parses survey responses (14 questions)
- tap-uk-fbit: queries FBIT API per-URN with rate limiting, computes per-pupil spend
- tap-uk-idaci: downloads IoD2019 XLSX, batch-resolves postcodes→LSOAs via postcodes.io

Update dbt models to match actual tap output schemas:
- stg_idaci now includes URN (tap does the postcode→LSOA→school join)
- stg_parent_view expanded from 8 to 13 question columns
- fact_deprivation simplified (no longer needs postcode→LSOA join in dbt)
- fact_parent_view expanded to include all 13 question metrics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 10:38:07 +00:00
parent 904093ea8a
commit 97d975114a
9 changed files with 360 additions and 60 deletions

View File

@@ -2,9 +2,16 @@
from __future__ import annotations
import time
from datetime import date
import requests
from singer_sdk import Stream, Tap
from singer_sdk import typing as th
API_BASE = "https://schools-financial-benchmarking.service.gov.uk/api"
RATE_LIMIT_DELAY = 0.1 # seconds between requests
class FBITFinanceStream(Stream):
"""Stream: School financial benchmarking data."""
@@ -23,13 +30,68 @@ class FBITFinanceStream(Stream):
th.Property("premises_cost_pct", th.NumberType),
).to_dict()
def _get_school_urns(self) -> list[int]:
"""Fetch all open school URNs from GIAS to know what to query."""
import io
import pandas as pd
url = (
"https://ea-edubase-api-prod.azurewebsites.net"
f"/edubase/downloads/public/edubasealldata{date.today().strftime('%Y%m%d')}.csv"
)
self.logger.info("Fetching URN list from GIAS for FBIT extraction...")
try:
resp = requests.get(url, timeout=120)
resp.raise_for_status()
df = pd.read_csv(
io.StringIO(resp.text),
encoding="utf-8-sig",
usecols=["URN", "EstablishmentStatus (name)"],
dtype=str,
)
df = df[df["EstablishmentStatus (name)"] == "Open"]
return [int(u) for u in df["URN"].dropna().unique()]
except Exception as e:
self.logger.error("Failed to fetch URN list: %s", e)
return []
def get_records(self, context):
# TODO: Implement FBIT API extraction
# The FBIT API requires per-URN requests with rate limiting.
# Implementation will batch URNs from dim_school and request
# financial data for each.
self.logger.warning("FBIT extraction not yet implemented")
return iter([])
urns = self._get_school_urns()
year = date.today().year - 1
self.logger.info("Fetching FBIT data for %d schools (year %d)...", len(urns), year)
for i, urn in enumerate(urns):
if i % 1000 == 0:
self.logger.info(" Progress: %d/%d", i, len(urns))
try:
resp = requests.get(
f"{API_BASE}/schoolFinancialDataObject/{urn}",
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data:
per_pupil = None
total_exp = data.get("totalExpenditure")
num_pupils = data.get("numberOfPupils")
if total_exp and num_pupils:
per_pupil = round(total_exp / num_pupils, 2)
yield {
"urn": urn,
"year": year,
"per_pupil_spend": per_pupil,
"staff_cost_pct": data.get("staffCostPercent"),
"teacher_cost_pct": data.get("teachingStaffCostPercent"),
"support_staff_cost_pct": data.get("educationSupportStaffCostPercent"),
"premises_cost_pct": data.get("premisesStaffCostPercent"),
}
except requests.RequestException:
pass
time.sleep(RATE_LIMIT_DELAY)
class TapUKFBIT(Tap):
@@ -41,7 +103,8 @@ class TapUKFBIT(Tap):
th.Property(
"base_url",
th.StringType,
default="https://financial-benchmarking-and-insights-tool.education.gov.uk/api",
default=API_BASE,
description="FBIT API base URL",
),
).to_dict()

View File

@@ -1,30 +1,157 @@
"""IDACI Singer tap — extracts deprivation index lookup data."""
"""IDACI Singer tap — extracts deprivation index from IoD2019 + postcodes.io LSOA lookup."""
from __future__ import annotations
import io
import pandas as pd
import requests
from singer_sdk import Stream, Tap
from singer_sdk import typing as th
IOD_2019_URL = (
"https://assets.publishing.service.gov.uk/government/uploads/system/uploads/"
"attachment_data/file/833970/File_1_-_IMD2019_Index_of_Multiple_Deprivation.xlsx"
)
POSTCODES_IO_BATCH = "https://api.postcodes.io/postcodes"
BATCH_SIZE = 100
def _postcode_to_lsoa(postcodes: list[str], logger) -> dict[str, str]:
"""Batch-resolve postcodes to LSOA codes via postcodes.io."""
result = {}
valid = list({p.strip().upper() for p in postcodes if p and len(str(p).strip()) >= 5})
for i in range(0, len(valid), BATCH_SIZE):
batch = valid[i : i + BATCH_SIZE]
try:
resp = requests.post(POSTCODES_IO_BATCH, json={"postcodes": batch}, timeout=30)
if resp.status_code == 200:
for item in resp.json().get("result", []):
if item and item.get("result"):
lsoa = item["result"].get("lsoa")
if lsoa:
result[item["query"].upper()] = lsoa
except Exception as e:
logger.warning("postcodes.io batch failed: %s", e)
if i % 5000 == 0 and i > 0:
logger.info(" LSOA resolution: %d/%d postcodes", i, len(valid))
return result
class IDACIStream(Stream):
"""Stream: IDACI scores by LSOA."""
"""Stream: IDACI scores joined to school URNs via postcode → LSOA lookup."""
name = "idaci"
primary_keys = ["lsoa_code"]
primary_keys = ["urn"]
replication_key = None
schema = th.PropertiesList(
th.Property("lsoa_code", th.StringType, required=True),
th.Property("urn", th.IntegerType, required=True),
th.Property("lsoa_code", th.StringType),
th.Property("idaci_score", th.NumberType),
th.Property("idaci_decile", th.IntegerType),
).to_dict()
def _load_iod_data(self) -> dict[str, dict]:
"""Download and parse IoD2019 IDACI data into a LSOA lookup."""
self.logger.info("Downloading IoD2019 IDACI data...")
resp = requests.get(IOD_2019_URL, timeout=300)
resp.raise_for_status()
iod_sheets = pd.read_excel(io.BytesIO(resp.content), sheet_name=None)
# Find the IDACI sheet
idaci_sheet = None
for name, df in iod_sheets.items():
if "IDACI" in name.upper() or "IDACI" in str(df.columns.tolist()).upper():
idaci_sheet = name
break
if idaci_sheet is None:
idaci_sheet = list(iod_sheets.keys())[0]
df_iod = iod_sheets[idaci_sheet]
# Find columns dynamically
col_lsoa = next(
(c for c in df_iod.columns if "LSOA" in str(c).upper() and "code" in str(c).lower()),
None,
)
col_score = next(
(c for c in df_iod.columns if "IDACI" in str(c).upper() and "score" in str(c).lower()),
None,
)
if not col_lsoa or not col_score:
self.logger.error("Could not find LSOA/IDACI columns. Available: %s", list(df_iod.columns)[:20])
return {}
df_iod = df_iod[[col_lsoa, col_score]].copy()
df_iod.columns = ["lsoa_code", "idaci_score"]
df_iod = df_iod.dropna()
# Compute deciles (1 = most deprived)
df_iod = df_iod.sort_values("idaci_score", ascending=False)
df_iod["idaci_decile"] = (pd.qcut(df_iod["idaci_score"], 10, labels=False) + 1).astype(int)
df_iod["idaci_decile"] = 11 - df_iod["idaci_decile"]
self.logger.info("Loaded %d LSOA IDACI records", len(df_iod))
return df_iod.set_index("lsoa_code")[["idaci_score", "idaci_decile"]].to_dict("index")
def _get_school_postcodes(self) -> list[tuple[int, str]]:
"""Fetch URN + postcode pairs from GIAS."""
url = (
"https://ea-edubase-api-prod.azurewebsites.net"
"/edubase/downloads/public/edubasealldata.csv"
)
self.logger.info("Fetching school postcodes from GIAS...")
resp = requests.get(url, timeout=120)
resp.raise_for_status()
df = pd.read_csv(
io.StringIO(resp.text),
encoding="utf-8-sig",
usecols=["URN", "Postcode", "EstablishmentStatus (name)"],
dtype=str,
)
df = df[df["EstablishmentStatus (name)"] == "Open"]
df = df.dropna(subset=["URN", "Postcode"])
return [(int(row["URN"]), row["Postcode"]) for _, row in df.iterrows()]
def get_records(self, context):
# TODO: Implement IDACI extraction
# Source: MHCLG IoD 2019 LSOA-level data
# Available as a static CSV download
self.logger.warning("IDACI extraction not yet implemented")
return iter([])
lsoa_lookup = self._load_iod_data()
if not lsoa_lookup:
return
schools = self._get_school_postcodes()
postcodes = [pc for _, pc in schools]
self.logger.info("Resolving %d postcodes to LSOAs...", len(postcodes))
pc_to_lsoa = _postcode_to_lsoa(postcodes, self.logger)
self.logger.info("Resolved %d postcodes", len(pc_to_lsoa))
yielded = 0
for urn, postcode in schools:
lsoa = pc_to_lsoa.get(postcode.strip().upper())
if not lsoa:
continue
iod = lsoa_lookup.get(lsoa)
if not iod:
continue
yield {
"urn": urn,
"lsoa_code": lsoa,
"idaci_score": float(iod["idaci_score"]),
"idaci_decile": int(iod["idaci_decile"]),
}
yielded += 1
self.logger.info("Yielded IDACI data for %d schools", yielded)
class TapUKIDACI(Tap):

View File

@@ -1,10 +1,33 @@
"""Parent View Singer tap — extracts survey data from Ofsted Parent View portal."""
"""Parent View Singer tap — extracts survey data from Ofsted Parent View open data portal."""
from __future__ import annotations
import io
import re
from datetime import date
import pandas as pd
import requests
from singer_sdk import Stream, Tap
from singer_sdk import typing as th
OPEN_DATA_PAGE = "https://parentview.ofsted.gov.uk/open-data"
def _positive_pct(row: pd.Series, q_col_base: str) -> float | None:
"""Sum 'Strongly agree' + 'Agree' percentages for a question."""
strongly = row.get(f"{q_col_base} - Strongly agree %") or row.get(f"{q_col_base} - Strongly Agree %")
agree = row.get(f"{q_col_base} - Agree %")
try:
total = 0.0
if pd.notna(strongly):
total += float(strongly)
if pd.notna(agree):
total += float(agree)
return round(total, 1) if total > 0 else None
except (TypeError, ValueError):
return None
class ParentViewStream(Stream):
"""Stream: Parent View survey responses per school."""
@@ -19,27 +42,106 @@ class ParentViewStream(Stream):
th.Property("total_responses", th.IntegerType),
th.Property("q_happy_pct", th.NumberType),
th.Property("q_safe_pct", th.NumberType),
th.Property("q_progress_pct", th.NumberType),
th.Property("q_well_taught_pct", th.NumberType),
th.Property("q_well_led_pct", th.NumberType),
th.Property("q_behaviour_pct", th.NumberType),
th.Property("q_bullying_pct", th.NumberType),
th.Property("q_communication_pct", th.NumberType),
th.Property("q_progress_pct", th.NumberType),
th.Property("q_teaching_pct", th.NumberType),
th.Property("q_information_pct", th.NumberType),
th.Property("q_curriculum_pct", th.NumberType),
th.Property("q_future_pct", th.NumberType),
th.Property("q_leadership_pct", th.NumberType),
th.Property("q_wellbeing_pct", th.NumberType),
th.Property("q_recommend_pct", th.NumberType),
).to_dict()
def _discover_download_url(self) -> str:
"""Scrape the open data page for the download link."""
resp = requests.get(OPEN_DATA_PAGE, timeout=30)
resp.raise_for_status()
urls = re.findall(r'href="([^"]+\.(?:xlsx|csv|zip))"', resp.text, re.IGNORECASE)
if not urls:
msg = "No download link found on Parent View open data page"
raise RuntimeError(msg)
url = urls[0]
if not url.startswith("http"):
url = "https://parentview.ofsted.gov.uk" + url
return url
def get_records(self, context):
# TODO: Implement Parent View data extraction
# Source: Ofsted Parent View portal XLSX/CSV download
# URL discovery requires scraping parentview.ofsted.gov.uk
self.logger.warning("Parent View extraction not yet implemented")
return iter([])
url = self._discover_download_url()
self.logger.info("Downloading Parent View data: %s", url)
resp = requests.get(url, timeout=120)
resp.raise_for_status()
if url.endswith(".xlsx"):
df = pd.read_excel(io.BytesIO(resp.content))
else:
df = pd.read_csv(
io.BytesIO(resp.content),
encoding="latin-1",
low_memory=False,
)
# Normalise URN column
urn_col = next((c for c in df.columns if c.strip().upper() == "URN"), None)
if not urn_col:
self.logger.error("URN column not found. Columns: %s", list(df.columns)[:20])
return
df.rename(columns={urn_col: "urn"}, inplace=True)
df["urn"] = pd.to_numeric(df["urn"], errors="coerce")
df = df.dropna(subset=["urn"])
# Find total responses column
resp_col = next(
(c for c in df.columns if "total" in c.lower() and "respon" in c.lower()),
None,
)
today = date.today().isoformat()
for _, row in df.iterrows():
try:
urn = int(row["urn"])
except (ValueError, TypeError):
continue
total = None
if resp_col and pd.notna(row.get(resp_col)):
try:
total = int(row[resp_col])
except (ValueError, TypeError):
pass
yield {
"urn": urn,
"survey_date": today,
"total_responses": total,
"q_happy_pct": _positive_pct(row, "Q1"),
"q_safe_pct": _positive_pct(row, "Q2"),
"q_behaviour_pct": _positive_pct(row, "Q3"),
"q_bullying_pct": _positive_pct(row, "Q4"),
"q_communication_pct": _positive_pct(row, "Q5"),
"q_progress_pct": _positive_pct(row, "Q7"),
"q_teaching_pct": _positive_pct(row, "Q8"),
"q_information_pct": _positive_pct(row, "Q9"),
"q_curriculum_pct": _positive_pct(row, "Q10"),
"q_future_pct": _positive_pct(row, "Q11"),
"q_leadership_pct": _positive_pct(row, "Q12"),
"q_wellbeing_pct": _positive_pct(row, "Q13"),
"q_recommend_pct": _positive_pct(row, "Q14"),
}
class TapUKParentView(Tap):
"""Singer tap for UK Ofsted Parent View."""
name = "tap-uk-parent-view"
config_jsonschema = th.PropertiesList().to_dict()
config_jsonschema = th.PropertiesList(
th.Property("download_url", th.StringType, description="Direct URL to Parent View data file"),
).to_dict()
def discover_streams(self):
return [ParentViewStream(self)]

View File

@@ -107,4 +107,7 @@ models:
tests: [not_null]
- name: fact_deprivation
description: IDACI deprivation index
description: IDACI deprivation index — one row per URN
columns:
- name: urn
tests: [not_null, unique]

View File

@@ -1,22 +1,9 @@
-- Mart: Deprivation index — one row per URN
-- Joins school postcode → LSOA → IDACI score
-- The IDACI tap already resolves postcode → LSOA → IoD2019 score per school.
with school_postcodes as (
select
urn,
postcode
from {{ ref('stg_gias_establishments') }}
where status = 'Open'
and postcode is not null
)
-- Note: The join between postcode and LSOA requires a postcode-to-LSOA
-- lookup table. This will be populated by the geocode script or a seed.
-- For now, this model serves as a placeholder that will be completed
-- once the IDACI tap provides the postcode→LSOA mapping.
select
i.lsoa_code,
i.idaci_score,
i.idaci_decile
from {{ ref('stg_idaci') }} i
lsoa_code,
idaci_score,
idaci_decile
from {{ ref('stg_idaci') }}

View File

@@ -6,10 +6,15 @@ select
total_responses,
q_happy_pct,
q_safe_pct,
q_progress_pct,
q_well_taught_pct,
q_well_led_pct,
q_behaviour_pct,
q_bullying_pct,
q_communication_pct,
q_progress_pct,
q_teaching_pct,
q_information_pct,
q_curriculum_pct,
q_future_pct,
q_leadership_pct,
q_wellbeing_pct,
q_recommend_pct
from {{ ref('stg_parent_view') }}

View File

@@ -67,3 +67,6 @@ sources:
- name: idaci
description: Income Deprivation Affecting Children Index lookups
columns:
- name: urn
tests: [not_null]

View File

@@ -1,4 +1,6 @@
-- Staging model: Income Deprivation Affecting Children Index
-- The IDACI tap resolves postcode → LSOA and joins to IoD2019 data,
-- so each row already has a URN.
with source as (
select * from {{ source('raw', 'idaci') }}
@@ -6,10 +8,12 @@ with source as (
renamed as (
select
cast(urn as integer) as urn,
lsoa_code,
cast(idaci_score as numeric) as idaci_score,
cast(idaci_decile as integer) as idaci_decile
from source
where urn is not null
)
select * from renamed

View File

@@ -1,4 +1,5 @@
-- Staging model: Ofsted Parent View survey responses
-- The tap computes positive percentages (Strongly agree + Agree) per question.
with source as (
select * from {{ source('raw', 'parent_view') }}
@@ -11,11 +12,16 @@ renamed as (
cast(total_responses as integer) as total_responses,
cast(q_happy_pct as numeric) as q_happy_pct,
cast(q_safe_pct as numeric) as q_safe_pct,
cast(q_progress_pct as numeric) as q_progress_pct,
cast(q_well_taught_pct as numeric) as q_well_taught_pct,
cast(q_well_led_pct as numeric) as q_well_led_pct,
cast(q_behaviour_pct as numeric) as q_behaviour_pct,
cast(q_bullying_pct as numeric) as q_bullying_pct,
cast(q_communication_pct as numeric) as q_communication_pct,
cast(q_progress_pct as numeric) as q_progress_pct,
cast(q_teaching_pct as numeric) as q_teaching_pct,
cast(q_information_pct as numeric) as q_information_pct,
cast(q_curriculum_pct as numeric) as q_curriculum_pct,
cast(q_future_pct as numeric) as q_future_pct,
cast(q_leadership_pct as numeric) as q_leadership_pct,
cast(q_wellbeing_pct as numeric) as q_wellbeing_pct,
cast(q_recommend_pct as numeric) as q_recommend_pct
from source
where urn is not null