From cd75fc4c242ca9c2fb099bad9fa035dd06ce79d0 Mon Sep 17 00:00:00 2001 From: Tudor Date: Thu, 26 Mar 2026 11:13:38 +0000 Subject: [PATCH] fix(taps): align with integrator resilience patterns Port critical patterns from the working integrator into Singer taps: - GIAS: add 404 fallback to yesterday's date, increase timeout to 300s, use latin-1 encoding, use dated URL for links (static URL returns 500) - FBIT: add GIAS date fallback, increase timeout, fix encoding to latin-1 - IDACI: use dated GIAS URL with fallback instead of undated static URL, fix encoding to latin-1, increase timeout to 300s - Ofsted: try utf-8-sig then fall back to latin-1 encoding Co-Authored-By: Claude Opus 4.6 --- .../extractors/tap-uk-fbit/tap_uk_fbit/tap.py | 19 +++++++-- .../extractors/tap-uk-gias/tap_uk_gias/tap.py | 42 +++++++++++++------ .../tap-uk-idaci/tap_uk_idaci/tap.py | 20 +++++++-- .../tap-uk-ofsted/tap_uk_ofsted/tap.py | 6 ++- 4 files changed, 66 insertions(+), 21 deletions(-) diff --git a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py index 1f8fa2e..d99767d 100644 --- a/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py +++ b/pipeline/plugins/extractors/tap-uk-fbit/tap_uk_fbit/tap.py @@ -33,19 +33,30 @@ class FBITFinanceStream(Stream): def _get_school_urns(self) -> list[int]: """Fetch all open school URNs from GIAS to know what to query.""" import io + from datetime import timedelta + import pandas as pd - url = ( + base = ( "https://ea-edubase-api-prod.azurewebsites.net" - f"/edubase/downloads/public/edubasealldata{date.today().strftime('%Y%m%d')}.csv" + "/edubase/downloads/public/edubasealldata{date}.csv" ) + today = date.today() + url = base.format(date=today.strftime("%Y%m%d")) + self.logger.info("Fetching URN list from GIAS for FBIT extraction...") try: - resp = requests.get(url, timeout=120) + resp = requests.get(url, timeout=300) + # Fall back to yesterday if today's file isn't available + if resp.status_code == 404: + yesterday = (today - timedelta(days=1)).strftime("%Y%m%d") + url = base.format(date=yesterday) + self.logger.info("Today's GIAS file not available, trying yesterday") + resp = requests.get(url, timeout=300) resp.raise_for_status() df = pd.read_csv( io.StringIO(resp.text), - encoding="utf-8-sig", + encoding="latin-1", usecols=["URN", "EstablishmentStatus (name)"], dtype=str, ) diff --git a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py index 19f1903..1f014e3 100644 --- a/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py +++ b/pipeline/plugins/extractors/tap-uk-gias/tap_uk_gias/tap.py @@ -2,7 +2,7 @@ from __future__ import annotations -from datetime import date +from datetime import date, timedelta from singer_sdk import Stream, Tap from singer_sdk import typing as th @@ -12,6 +12,11 @@ GIAS_URL_TEMPLATE = ( "/edubase/downloads/public/edubasealldata{date}.csv" ) +GIAS_LINKS_URL_TEMPLATE = ( + "https://ea-edubase-api-prod.azurewebsites.net" + "/edubase/downloads/public/links_edubasealldata{date}.csv" +) + class GIASEstablishmentsStream(Stream): """Stream: GIAS establishments (one row per URN).""" @@ -40,23 +45,30 @@ class GIASEstablishmentsStream(Stream): import pandas as pd import requests - today = date.today().strftime("%Y%m%d") - url = GIAS_URL_TEMPLATE.format(date=today) + today = date.today() + url = GIAS_URL_TEMPLATE.format(date=today.strftime("%Y%m%d")) self.logger.info("Downloading GIAS bulk CSV: %s", url) - resp = requests.get(url, timeout=120) + resp = requests.get(url, timeout=300) + + # GIAS may not have today's file yet — fall back to yesterday + if resp.status_code == 404: + yesterday = (today - timedelta(days=1)).strftime("%Y%m%d") + url = GIAS_URL_TEMPLATE.format(date=yesterday) + self.logger.info("Today's file not available, trying yesterday: %s", url) + resp = requests.get(url, timeout=300) + resp.raise_for_status() df = pd.read_csv( io.StringIO(resp.text), - encoding="utf-8-sig", + encoding="latin-1", dtype=str, keep_default_na=False, ) for _, row in df.iterrows(): record = row.to_dict() - # Cast URN to int try: record["URN"] = int(record["URN"]) except (ValueError, KeyError): @@ -85,18 +97,24 @@ class GIASLinksStream(Stream): import pandas as pd import requests - url = ( - "https://ea-edubase-api-prod.azurewebsites.net" - "/edubase/downloads/public/links_edubasealldata.csv" - ) + today = date.today() + url = GIAS_LINKS_URL_TEMPLATE.format(date=today.strftime("%Y%m%d")) self.logger.info("Downloading GIAS links CSV: %s", url) - resp = requests.get(url, timeout=120) + resp = requests.get(url, timeout=300) + + # Fall back to yesterday if today's file isn't available + if resp.status_code in (404, 500): + yesterday = (today - timedelta(days=1)).strftime("%Y%m%d") + url = GIAS_LINKS_URL_TEMPLATE.format(date=yesterday) + self.logger.info("Today's links file not available, trying yesterday: %s", url) + resp = requests.get(url, timeout=300) + resp.raise_for_status() df = pd.read_csv( io.StringIO(resp.text), - encoding="utf-8-sig", + encoding="latin-1", dtype=str, keep_default_na=False, ) diff --git a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py index 444de43..90070d7 100644 --- a/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py +++ b/pipeline/plugins/extractors/tap-uk-idaci/tap_uk_idaci/tap.py @@ -103,17 +103,29 @@ class IDACIStream(Stream): def _get_school_postcodes(self) -> list[tuple[int, str]]: """Fetch URN + postcode pairs from GIAS.""" - url = ( + from datetime import date, timedelta + + today = date.today() + base = ( "https://ea-edubase-api-prod.azurewebsites.net" - "/edubase/downloads/public/edubasealldata.csv" + "/edubase/downloads/public/edubasealldata{date}.csv" ) + url = base.format(date=today.strftime("%Y%m%d")) self.logger.info("Fetching school postcodes from GIAS...") - resp = requests.get(url, timeout=120) + resp = requests.get(url, timeout=300) + + # Fall back to yesterday if today's file isn't available + if resp.status_code == 404: + yesterday = (today - timedelta(days=1)).strftime("%Y%m%d") + url = base.format(date=yesterday) + self.logger.info("Today's GIAS file not available, trying yesterday: %s", url) + resp = requests.get(url, timeout=300) + resp.raise_for_status() df = pd.read_csv( io.StringIO(resp.text), - encoding="utf-8-sig", + encoding="latin-1", usecols=["URN", "Postcode", "EstablishmentStatus (name)"], dtype=str, ) diff --git a/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py index ae5681d..8f23d13 100644 --- a/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py +++ b/pipeline/plugins/extractors/tap-uk-ofsted/tap_uk_ofsted/tap.py @@ -129,7 +129,11 @@ class OfstedInspectionsStream(Stream): df = pd.read_excel(io.BytesIO(resp.content), engine="odf", dtype=str) else: # Detect header row (may not be row 0) - text = resp.content.decode("utf-8-sig", errors="replace") + # Try utf-8-sig first, fall back to latin-1 (matches integrator) + try: + text = resp.content.decode("utf-8-sig") + except UnicodeDecodeError: + text = resp.content.decode("latin-1") lines = text.split("\n") header_idx = 0 for i, line in enumerate(lines[:20]):