From 97d975114aa75f6c676a9ecf26f9e63d233599d9 Mon Sep 17 00:00:00 2001 From: Tudor Date: Thu, 26 Mar 2026 10:38:07 +0000 Subject: [PATCH] feat(pipeline): implement parent-view, fbit, idaci Singer taps + align staging/mart models MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port extraction logic from integrator scripts into Singer SDK taps: - tap-uk-parent-view: scrapes Ofsted open data portal, parses survey responses (14 questions) - tap-uk-fbit: queries FBIT API per-URN with rate limiting, computes per-pupil spend - tap-uk-idaci: downloads IoD2019 XLSX, batch-resolves postcodes→LSOAs via postcodes.io Update dbt models to match actual tap output schemas: - stg_idaci now includes URN (tap does the postcode→LSOA→school join) - stg_parent_view expanded from 8 to 13 question columns - fact_deprivation simplified (no longer needs postcode→LSOA join in dbt) - fact_parent_view expanded to include all 13 question metrics Co-Authored-By: Claude Opus 4.6 --- .../extractors/tap-uk-fbit/tap_uk_fbit/tap.py | 77 +++++++++- .../tap-uk-idaci/tap_uk_idaci/tap.py | 145 ++++++++++++++++-- .../tap_uk_parent_view/tap.py | 122 +++++++++++++-- .../transform/models/marts/_marts_schema.yml | 5 +- .../models/marts/fact_deprivation.sql | 25 +-- .../models/marts/fact_parent_view.sql | 11 +- .../transform/models/staging/_stg_sources.yml | 3 + .../transform/models/staging/stg_idaci.sql | 4 + .../models/staging/stg_parent_view.sql | 28 ++-- 9 files changed, 360 insertions(+), 60 deletions(-) diff --git a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py index 887f016..1f8fa2e 100644 --- a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py +++ b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py @@ -2,9 +2,16 @@ from __future__ import annotations +import time +from datetime import date + +import requests from singer_sdk import Stream, Tap from singer_sdk import typing as th +API_BASE = "https://schools-financial-benchmarking.service.gov.uk/api" +RATE_LIMIT_DELAY = 0.1 # seconds between requests + class FBITFinanceStream(Stream): """Stream: School financial benchmarking data.""" @@ -23,13 +30,68 @@ class FBITFinanceStream(Stream): th.Property("premises_cost_pct", th.NumberType), ).to_dict() + def _get_school_urns(self) -> list[int]: + """Fetch all open school URNs from GIAS to know what to query.""" + import io + import pandas as pd + + url = ( + "https://ea-edubase-api-prod.azurewebsites.net" + f"/edubase/downloads/public/edubasealldata{date.today().strftime('%Y%m%d')}.csv" + ) + self.logger.info("Fetching URN list from GIAS for FBIT extraction...") + try: + resp = requests.get(url, timeout=120) + resp.raise_for_status() + df = pd.read_csv( + io.StringIO(resp.text), + encoding="utf-8-sig", + usecols=["URN", "EstablishmentStatus (name)"], + dtype=str, + ) + df = df[df["EstablishmentStatus (name)"] == "Open"] + return [int(u) for u in df["URN"].dropna().unique()] + except Exception as e: + self.logger.error("Failed to fetch URN list: %s", e) + return [] + def get_records(self, context): - # TODO: Implement FBIT API extraction - # The FBIT API requires per-URN requests with rate limiting. - # Implementation will batch URNs from dim_school and request - # financial data for each. - self.logger.warning("FBIT extraction not yet implemented") - return iter([]) + urns = self._get_school_urns() + year = date.today().year - 1 + + self.logger.info("Fetching FBIT data for %d schools (year %d)...", len(urns), year) + + for i, urn in enumerate(urns): + if i % 1000 == 0: + self.logger.info(" Progress: %d/%d", i, len(urns)) + + try: + resp = requests.get( + f"{API_BASE}/schoolFinancialDataObject/{urn}", + timeout=10, + ) + if resp.status_code == 200: + data = resp.json() + if data: + per_pupil = None + total_exp = data.get("totalExpenditure") + num_pupils = data.get("numberOfPupils") + if total_exp and num_pupils: + per_pupil = round(total_exp / num_pupils, 2) + + yield { + "urn": urn, + "year": year, + "per_pupil_spend": per_pupil, + "staff_cost_pct": data.get("staffCostPercent"), + "teacher_cost_pct": data.get("teachingStaffCostPercent"), + "support_staff_cost_pct": data.get("educationSupportStaffCostPercent"), + "premises_cost_pct": data.get("premisesStaffCostPercent"), + } + except requests.RequestException: + pass + + time.sleep(RATE_LIMIT_DELAY) class TapUKFBIT(Tap): @@ -41,7 +103,8 @@ class TapUKFBIT(Tap): th.Property( "base_url", th.StringType, - default="https://financial-benchmarking-and-insights-tool.education.gov.uk/api", + default=API_BASE, + description="FBIT API base URL", ), ).to_dict() diff --git a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py index 3aefb46..444de43 100644 --- a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py +++ b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py @@ -1,30 +1,157 @@ -"""IDACI Singer tap — extracts deprivation index lookup data.""" +"""IDACI Singer tap — extracts deprivation index from IoD2019 + postcodes.io LSOA lookup.""" from __future__ import annotations +import io + +import pandas as pd +import requests from singer_sdk import Stream, Tap from singer_sdk import typing as th +IOD_2019_URL = ( + "https://assets.publishing.service.gov.uk/government/uploads/system/uploads/" + "attachment_data/file/833970/File_1_-_IMD2019_Index_of_Multiple_Deprivation.xlsx" +) + +POSTCODES_IO_BATCH = "https://api.postcodes.io/postcodes" +BATCH_SIZE = 100 + + +def _postcode_to_lsoa(postcodes: list[str], logger) -> dict[str, str]: + """Batch-resolve postcodes to LSOA codes via postcodes.io.""" + result = {} + valid = list({p.strip().upper() for p in postcodes if p and len(str(p).strip()) >= 5}) + + for i in range(0, len(valid), BATCH_SIZE): + batch = valid[i : i + BATCH_SIZE] + try: + resp = requests.post(POSTCODES_IO_BATCH, json={"postcodes": batch}, timeout=30) + if resp.status_code == 200: + for item in resp.json().get("result", []): + if item and item.get("result"): + lsoa = item["result"].get("lsoa") + if lsoa: + result[item["query"].upper()] = lsoa + except Exception as e: + logger.warning("postcodes.io batch failed: %s", e) + + if i % 5000 == 0 and i > 0: + logger.info(" LSOA resolution: %d/%d postcodes", i, len(valid)) + + return result + class IDACIStream(Stream): - """Stream: IDACI scores by LSOA.""" + """Stream: IDACI scores joined to school URNs via postcode → LSOA lookup.""" name = "idaci" - primary_keys = ["lsoa_code"] + primary_keys = ["urn"] replication_key = None schema = th.PropertiesList( - th.Property("lsoa_code", th.StringType, required=True), + th.Property("urn", th.IntegerType, required=True), + th.Property("lsoa_code", th.StringType), th.Property("idaci_score", th.NumberType), th.Property("idaci_decile", th.IntegerType), ).to_dict() + def _load_iod_data(self) -> dict[str, dict]: + """Download and parse IoD2019 IDACI data into a LSOA lookup.""" + self.logger.info("Downloading IoD2019 IDACI data...") + resp = requests.get(IOD_2019_URL, timeout=300) + resp.raise_for_status() + + iod_sheets = pd.read_excel(io.BytesIO(resp.content), sheet_name=None) + + # Find the IDACI sheet + idaci_sheet = None + for name, df in iod_sheets.items(): + if "IDACI" in name.upper() or "IDACI" in str(df.columns.tolist()).upper(): + idaci_sheet = name + break + if idaci_sheet is None: + idaci_sheet = list(iod_sheets.keys())[0] + + df_iod = iod_sheets[idaci_sheet] + + # Find columns dynamically + col_lsoa = next( + (c for c in df_iod.columns if "LSOA" in str(c).upper() and "code" in str(c).lower()), + None, + ) + col_score = next( + (c for c in df_iod.columns if "IDACI" in str(c).upper() and "score" in str(c).lower()), + None, + ) + + if not col_lsoa or not col_score: + self.logger.error("Could not find LSOA/IDACI columns. Available: %s", list(df_iod.columns)[:20]) + return {} + + df_iod = df_iod[[col_lsoa, col_score]].copy() + df_iod.columns = ["lsoa_code", "idaci_score"] + df_iod = df_iod.dropna() + + # Compute deciles (1 = most deprived) + df_iod = df_iod.sort_values("idaci_score", ascending=False) + df_iod["idaci_decile"] = (pd.qcut(df_iod["idaci_score"], 10, labels=False) + 1).astype(int) + df_iod["idaci_decile"] = 11 - df_iod["idaci_decile"] + + self.logger.info("Loaded %d LSOA IDACI records", len(df_iod)) + return df_iod.set_index("lsoa_code")[["idaci_score", "idaci_decile"]].to_dict("index") + + def _get_school_postcodes(self) -> list[tuple[int, str]]: + """Fetch URN + postcode pairs from GIAS.""" + url = ( + "https://ea-edubase-api-prod.azurewebsites.net" + "/edubase/downloads/public/edubasealldata.csv" + ) + self.logger.info("Fetching school postcodes from GIAS...") + resp = requests.get(url, timeout=120) + resp.raise_for_status() + + df = pd.read_csv( + io.StringIO(resp.text), + encoding="utf-8-sig", + usecols=["URN", "Postcode", "EstablishmentStatus (name)"], + dtype=str, + ) + df = df[df["EstablishmentStatus (name)"] == "Open"] + df = df.dropna(subset=["URN", "Postcode"]) + + return [(int(row["URN"]), row["Postcode"]) for _, row in df.iterrows()] + def get_records(self, context): - # TODO: Implement IDACI extraction - # Source: MHCLG IoD 2019 LSOA-level data - # Available as a static CSV download - self.logger.warning("IDACI extraction not yet implemented") - return iter([]) + lsoa_lookup = self._load_iod_data() + if not lsoa_lookup: + return + + schools = self._get_school_postcodes() + postcodes = [pc for _, pc in schools] + + self.logger.info("Resolving %d postcodes to LSOAs...", len(postcodes)) + pc_to_lsoa = _postcode_to_lsoa(postcodes, self.logger) + self.logger.info("Resolved %d postcodes", len(pc_to_lsoa)) + + yielded = 0 + for urn, postcode in schools: + lsoa = pc_to_lsoa.get(postcode.strip().upper()) + if not lsoa: + continue + iod = lsoa_lookup.get(lsoa) + if not iod: + continue + + yield { + "urn": urn, + "lsoa_code": lsoa, + "idaci_score": float(iod["idaci_score"]), + "idaci_decile": int(iod["idaci_decile"]), + } + yielded += 1 + + self.logger.info("Yielded IDACI data for %d schools", yielded) class TapUKIDACI(Tap): diff --git a/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py index b209bbc..2239c27 100644 --- a/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py +++ b/pipeline/plugins/extractors/tap-uk-parent-view/tap_uk_parent_view/tap.py @@ -1,10 +1,33 @@ -"""Parent View Singer tap — extracts survey data from Ofsted Parent View portal.""" +"""Parent View Singer tap — extracts survey data from Ofsted Parent View open data portal.""" from __future__ import annotations +import io +import re +from datetime import date + +import pandas as pd +import requests from singer_sdk import Stream, Tap from singer_sdk import typing as th +OPEN_DATA_PAGE = "https://parentview.ofsted.gov.uk/open-data" + + +def _positive_pct(row: pd.Series, q_col_base: str) -> float | None: + """Sum 'Strongly agree' + 'Agree' percentages for a question.""" + strongly = row.get(f"{q_col_base} - Strongly agree %") or row.get(f"{q_col_base} - Strongly Agree %") + agree = row.get(f"{q_col_base} - Agree %") + try: + total = 0.0 + if pd.notna(strongly): + total += float(strongly) + if pd.notna(agree): + total += float(agree) + return round(total, 1) if total > 0 else None + except (TypeError, ValueError): + return None + class ParentViewStream(Stream): """Stream: Parent View survey responses per school.""" @@ -19,27 +42,106 @@ class ParentViewStream(Stream): th.Property("total_responses", th.IntegerType), th.Property("q_happy_pct", th.NumberType), th.Property("q_safe_pct", th.NumberType), - th.Property("q_progress_pct", th.NumberType), - th.Property("q_well_taught_pct", th.NumberType), - th.Property("q_well_led_pct", th.NumberType), th.Property("q_behaviour_pct", th.NumberType), th.Property("q_bullying_pct", th.NumberType), + th.Property("q_communication_pct", th.NumberType), + th.Property("q_progress_pct", th.NumberType), + th.Property("q_teaching_pct", th.NumberType), + th.Property("q_information_pct", th.NumberType), + th.Property("q_curriculum_pct", th.NumberType), + th.Property("q_future_pct", th.NumberType), + th.Property("q_leadership_pct", th.NumberType), + th.Property("q_wellbeing_pct", th.NumberType), th.Property("q_recommend_pct", th.NumberType), ).to_dict() + def _discover_download_url(self) -> str: + """Scrape the open data page for the download link.""" + resp = requests.get(OPEN_DATA_PAGE, timeout=30) + resp.raise_for_status() + urls = re.findall(r'href="([^"]+\.(?:xlsx|csv|zip))"', resp.text, re.IGNORECASE) + if not urls: + msg = "No download link found on Parent View open data page" + raise RuntimeError(msg) + url = urls[0] + if not url.startswith("http"): + url = "https://parentview.ofsted.gov.uk" + url + return url + def get_records(self, context): - # TODO: Implement Parent View data extraction - # Source: Ofsted Parent View portal XLSX/CSV download - # URL discovery requires scraping parentview.ofsted.gov.uk - self.logger.warning("Parent View extraction not yet implemented") - return iter([]) + url = self._discover_download_url() + self.logger.info("Downloading Parent View data: %s", url) + + resp = requests.get(url, timeout=120) + resp.raise_for_status() + + if url.endswith(".xlsx"): + df = pd.read_excel(io.BytesIO(resp.content)) + else: + df = pd.read_csv( + io.BytesIO(resp.content), + encoding="latin-1", + low_memory=False, + ) + + # Normalise URN column + urn_col = next((c for c in df.columns if c.strip().upper() == "URN"), None) + if not urn_col: + self.logger.error("URN column not found. Columns: %s", list(df.columns)[:20]) + return + + df.rename(columns={urn_col: "urn"}, inplace=True) + df["urn"] = pd.to_numeric(df["urn"], errors="coerce") + df = df.dropna(subset=["urn"]) + + # Find total responses column + resp_col = next( + (c for c in df.columns if "total" in c.lower() and "respon" in c.lower()), + None, + ) + + today = date.today().isoformat() + + for _, row in df.iterrows(): + try: + urn = int(row["urn"]) + except (ValueError, TypeError): + continue + + total = None + if resp_col and pd.notna(row.get(resp_col)): + try: + total = int(row[resp_col]) + except (ValueError, TypeError): + pass + + yield { + "urn": urn, + "survey_date": today, + "total_responses": total, + "q_happy_pct": _positive_pct(row, "Q1"), + "q_safe_pct": _positive_pct(row, "Q2"), + "q_behaviour_pct": _positive_pct(row, "Q3"), + "q_bullying_pct": _positive_pct(row, "Q4"), + "q_communication_pct": _positive_pct(row, "Q5"), + "q_progress_pct": _positive_pct(row, "Q7"), + "q_teaching_pct": _positive_pct(row, "Q8"), + "q_information_pct": _positive_pct(row, "Q9"), + "q_curriculum_pct": _positive_pct(row, "Q10"), + "q_future_pct": _positive_pct(row, "Q11"), + "q_leadership_pct": _positive_pct(row, "Q12"), + "q_wellbeing_pct": _positive_pct(row, "Q13"), + "q_recommend_pct": _positive_pct(row, "Q14"), + } class TapUKParentView(Tap): """Singer tap for UK Ofsted Parent View.""" name = "tap-uk-parent-view" - config_jsonschema = th.PropertiesList().to_dict() + config_jsonschema = th.PropertiesList( + th.Property("download_url", th.StringType, description="Direct URL to Parent View data file"), + ).to_dict() def discover_streams(self): return [ParentViewStream(self)] diff --git a/pipeline/transform/models/marts/_marts_schema.yml b/pipeline/transform/models/marts/_marts_schema.yml index 6e6c86b..91395f4 100644 --- a/pipeline/transform/models/marts/_marts_schema.yml +++ b/pipeline/transform/models/marts/_marts_schema.yml @@ -107,4 +107,7 @@ models: tests: [not_null] - name: fact_deprivation - description: IDACI deprivation index + description: IDACI deprivation index — one row per URN + columns: + - name: urn + tests: [not_null, unique] diff --git a/pipeline/transform/models/marts/fact_deprivation.sql b/pipeline/transform/models/marts/fact_deprivation.sql index a8b323e..f366fed 100644 --- a/pipeline/transform/models/marts/fact_deprivation.sql +++ b/pipeline/transform/models/marts/fact_deprivation.sql @@ -1,22 +1,9 @@ -- Mart: Deprivation index — one row per URN --- Joins school postcode → LSOA → IDACI score - -with school_postcodes as ( - select - urn, - postcode - from {{ ref('stg_gias_establishments') }} - where status = 'Open' - and postcode is not null -) - --- Note: The join between postcode and LSOA requires a postcode-to-LSOA --- lookup table. This will be populated by the geocode script or a seed. --- For now, this model serves as a placeholder that will be completed --- once the IDACI tap provides the postcode→LSOA mapping. +-- The IDACI tap already resolves postcode → LSOA → IoD2019 score per school. select - i.lsoa_code, - i.idaci_score, - i.idaci_decile -from {{ ref('stg_idaci') }} i + urn, + lsoa_code, + idaci_score, + idaci_decile +from {{ ref('stg_idaci') }} diff --git a/pipeline/transform/models/marts/fact_parent_view.sql b/pipeline/transform/models/marts/fact_parent_view.sql index 10d984a..3178903 100644 --- a/pipeline/transform/models/marts/fact_parent_view.sql +++ b/pipeline/transform/models/marts/fact_parent_view.sql @@ -6,10 +6,15 @@ select total_responses, q_happy_pct, q_safe_pct, - q_progress_pct, - q_well_taught_pct, - q_well_led_pct, q_behaviour_pct, q_bullying_pct, + q_communication_pct, + q_progress_pct, + q_teaching_pct, + q_information_pct, + q_curriculum_pct, + q_future_pct, + q_leadership_pct, + q_wellbeing_pct, q_recommend_pct from {{ ref('stg_parent_view') }} diff --git a/pipeline/transform/models/staging/_stg_sources.yml b/pipeline/transform/models/staging/_stg_sources.yml index bd45419..0b360ad 100644 --- a/pipeline/transform/models/staging/_stg_sources.yml +++ b/pipeline/transform/models/staging/_stg_sources.yml @@ -67,3 +67,6 @@ sources: - name: idaci description: Income Deprivation Affecting Children Index lookups + columns: + - name: urn + tests: [not_null] diff --git a/pipeline/transform/models/staging/stg_idaci.sql b/pipeline/transform/models/staging/stg_idaci.sql index cc36418..c97734f 100644 --- a/pipeline/transform/models/staging/stg_idaci.sql +++ b/pipeline/transform/models/staging/stg_idaci.sql @@ -1,4 +1,6 @@ -- Staging model: Income Deprivation Affecting Children Index +-- The IDACI tap resolves postcode → LSOA and joins to IoD2019 data, +-- so each row already has a URN. with source as ( select * from {{ source('raw', 'idaci') }} @@ -6,10 +8,12 @@ with source as ( renamed as ( select + cast(urn as integer) as urn, lsoa_code, cast(idaci_score as numeric) as idaci_score, cast(idaci_decile as integer) as idaci_decile from source + where urn is not null ) select * from renamed diff --git a/pipeline/transform/models/staging/stg_parent_view.sql b/pipeline/transform/models/staging/stg_parent_view.sql index c51e124..cbfa9bb 100644 --- a/pipeline/transform/models/staging/stg_parent_view.sql +++ b/pipeline/transform/models/staging/stg_parent_view.sql @@ -1,4 +1,5 @@ -- Staging model: Ofsted Parent View survey responses +-- The tap computes positive percentages (Strongly agree + Agree) per question. with source as ( select * from {{ source('raw', 'parent_view') }} @@ -6,17 +7,22 @@ with source as ( renamed as ( select - cast(urn as integer) as urn, - cast(survey_date as date) as survey_date, - cast(total_responses as integer) as total_responses, - cast(q_happy_pct as numeric) as q_happy_pct, - cast(q_safe_pct as numeric) as q_safe_pct, - cast(q_progress_pct as numeric) as q_progress_pct, - cast(q_well_taught_pct as numeric) as q_well_taught_pct, - cast(q_well_led_pct as numeric) as q_well_led_pct, - cast(q_behaviour_pct as numeric) as q_behaviour_pct, - cast(q_bullying_pct as numeric) as q_bullying_pct, - cast(q_recommend_pct as numeric) as q_recommend_pct + cast(urn as integer) as urn, + cast(survey_date as date) as survey_date, + cast(total_responses as integer) as total_responses, + cast(q_happy_pct as numeric) as q_happy_pct, + cast(q_safe_pct as numeric) as q_safe_pct, + cast(q_behaviour_pct as numeric) as q_behaviour_pct, + cast(q_bullying_pct as numeric) as q_bullying_pct, + cast(q_communication_pct as numeric) as q_communication_pct, + cast(q_progress_pct as numeric) as q_progress_pct, + cast(q_teaching_pct as numeric) as q_teaching_pct, + cast(q_information_pct as numeric) as q_information_pct, + cast(q_curriculum_pct as numeric) as q_curriculum_pct, + cast(q_future_pct as numeric) as q_future_pct, + cast(q_leadership_pct as numeric) as q_leadership_pct, + cast(q_wellbeing_pct as numeric) as q_wellbeing_pct, + cast(q_recommend_pct as numeric) as q_recommend_pct from source where urn is not null )