feat(ees): rewrite EES tap and KS2 models for actual data structure
All checks were successful
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 31s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 1m8s
Build and Push Docker Images / Build Integrator (push) Successful in 55s
Build and Push Docker Images / Build Kestra Init (push) Successful in 32s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m45s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s

- Fix publication slugs (KS4, Phonics, Admissions were wrong)
- Split KS2 into two streams: ees_ks2_attainment (long format) and
  ees_ks2_info (wide format context data)
- Target specific filenames instead of keyword matching
- Handle school_urn vs urn column naming
- Pivot KS2 attainment from long to wide format in dbt staging
- Add all ~40 KS2 columns the backend needs (GPS, absence, gender,
  disadvantaged breakdowns, context demographics)
- Pass through all columns in int_ks2_with_lineage and fact_ks2

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-26 23:08:50 +00:00
parent 719f06e480
commit d82e36e7b2
5 changed files with 354 additions and 99 deletions

View File

@@ -1,4 +1,10 @@
"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data."""
"""EES Singer tap — extracts KS2, KS4, Census, Admissions, Phonics data.
Each stream targets a specific CSV file within an EES release ZIP.
The EES data uses 'school_urn' for school-level records and 'z' for
suppressed values. Column names vary by file — schemas declare all
columns needed by downstream dbt staging models.
"""
from __future__ import annotations
@@ -12,7 +18,6 @@ from singer_sdk import typing as th
CONTENT_API_BASE = (
"https://content.explore-education-statistics.service.gov.uk/api"
)
STATS_API_BASE = "https://api.education.gov.uk/statistics/v1"
TIMEOUT = 120
@@ -37,7 +42,8 @@ class EESDatasetStream(Stream):
replication_key = None
_publication_slug: str = ""
_file_keyword: str = ""
_target_filename: str = "" # exact filename within the ZIP
_urn_column: str = "school_urn" # column name for URN in the CSV
def get_records(self, context):
import pandas as pd
@@ -50,84 +56,153 @@ class EESDatasetStream(Stream):
)
zf = download_release_zip(release_id)
# Find the CSV matching our keyword
csv_names = [n for n in zf.namelist() if n.endswith(".csv")]
# Find the target file
all_files = zf.namelist()
target = None
for name in csv_names:
if self._file_keyword.lower() in name.lower():
for name in all_files:
if name.endswith(self._target_filename):
target = name
break
if not target and csv_names:
target = csv_names[0]
if not target:
self.logger.warning("No CSV found in release ZIP")
self.logger.error(
"File '%s' not found in ZIP. Available: %s",
self._target_filename,
[n for n in all_files if n.endswith(".csv")],
)
return
self.logger.info("Reading %s from ZIP", target)
with zf.open(target) as f:
df = pd.read_csv(f, dtype=str, keep_default_na=False)
# Filter to school-level data
# Filter to school-level data if the column exists
if "geographic_level" in df.columns:
df = df[df["geographic_level"] == "School"]
self.logger.info("Emitting %d school-level rows", len(df))
for _, row in df.iterrows():
yield row.to_dict()
record = row.to_dict()
# Normalise URN column to 'school_urn' for consistency
if self._urn_column in record and self._urn_column != "school_urn":
record["school_urn"] = record.pop(self._urn_column)
yield record
class EESKS2Stream(EESDatasetStream):
name = "ees_ks2"
primary_keys = ["urn", "time_period"]
# ── KS2 Attainment (long format: one row per school × subject × breakdown) ──
class EESKS2AttainmentStream(EESDatasetStream):
name = "ees_ks2_attainment"
primary_keys = ["school_urn", "time_period", "subject", "breakdown_topic", "breakdown"]
_publication_slug = "key-stage-2-attainment"
_file_keyword = "school"
_target_filename = "ks2_school_attainment_data.csv"
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("breakdown_topic", th.StringType, required=True),
th.Property("breakdown", th.StringType, required=True),
th.Property("subject", th.StringType, required=True),
th.Property("expected_standard_pupil_percent", th.StringType),
th.Property("higher_standard_pupil_percent", th.StringType),
th.Property("average_scaled_score", th.StringType),
th.Property("progress_measure_score", th.StringType),
th.Property("progress_measure_lower_conf_interval", th.StringType),
th.Property("progress_measure_upper_conf_interval", th.StringType),
th.Property("absent_or_not_able_to_access_percent", th.StringType),
th.Property("working_towards_expected_standard_pupil_percent", th.StringType),
th.Property("absent_or_disapplied_percent", th.StringType),
th.Property("higher_standard", th.StringType),
th.Property("progress_measure_unadjusted", th.StringType),
th.Property("progress_measure_description", th.StringType),
).to_dict()
# ── KS2 Information (wide format: one row per school, context/demographics) ──
class EESKS2InfoStream(EESDatasetStream):
name = "ees_ks2_info"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-2-attainment"
_target_filename = "ks2_school_information_data.csv"
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
th.Property("school_laestab", th.StringType),
th.Property("school_name", th.StringType),
th.Property("nftype", th.StringType),
th.Property("reldenom", th.StringType),
th.Property("agerange", th.StringType),
th.Property("totpups", th.StringType),
th.Property("telig", th.StringType),
th.Property("belig", th.StringType),
th.Property("gelig", th.StringType),
th.Property("ptfsm6cla1a", th.StringType),
th.Property("ptnotfsm6cla1a", th.StringType),
th.Property("ptealgrp2", th.StringType),
th.Property("ptmobn", th.StringType),
th.Property("psenelk", th.StringType),
th.Property("psenele", th.StringType),
th.Property("psenelek", th.StringType),
th.Property("telig_3yr", th.StringType),
).to_dict()
# ── KS4 Attainment ──────────────────────────────────────────────────────────
class EESKS4Stream(EESDatasetStream):
name = "ees_ks4"
primary_keys = ["urn", "time_period"]
_publication_slug = "key-stage-4-performance-revised"
_file_keyword = "school"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "key-stage-4-performance"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
).to_dict()
# ── Census (school-level pupil characteristics) ─────────────────────────────
class EESCensusStream(EESDatasetStream):
name = "ees_census"
primary_keys = ["urn", "time_period"]
_publication_slug = "school-pupils-and-their-characteristics"
_file_keyword = "school"
_target_filename = "spc_school_level_underlying_data_2025.csv"
_urn_column = "urn"
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("time_period", th.StringType, required=True),
th.Property("urn", th.StringType, required=True),
th.Property("school_name", th.StringType),
th.Property("laestab", th.StringType),
th.Property("phase_type_grouping", th.StringType),
).to_dict()
# ── Admissions ───────────────────────────────────────────────────────────────
class EESAdmissionsStream(EESDatasetStream):
name = "ees_admissions"
primary_keys = ["urn", "time_period"]
_publication_slug = "secondary-and-primary-school-applications-and-offers"
_file_keyword = "school"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "primary-and-secondary-school-applications-and-offers"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
).to_dict()
# ── Phonics ──────────────────────────────────────────────────────────────────
class EESPhonicsStream(EESDatasetStream):
name = "ees_phonics"
primary_keys = ["urn", "time_period"]
_publication_slug = "phonics-screening-check-and-key-stage-1-assessments"
_file_keyword = "school"
primary_keys = ["school_urn", "time_period"]
_publication_slug = "phonics-screening-check-attainment"
_target_filename = "school" # Will be refined once we see the actual ZIP contents
schema = th.PropertiesList(
th.Property("urn", th.StringType, required=True),
th.Property("time_period", th.StringType, required=True),
th.Property("school_urn", th.StringType, required=True),
).to_dict()
@@ -142,7 +217,8 @@ class TapUKEES(Tap):
def discover_streams(self):
return [
EESKS2Stream(self),
EESKS2AttainmentStream(self),
EESKS2InfoStream(self),
EESKS4Stream(self),
EESCensusStream(self),
EESAdmissionsStream(self),