feat: ingest official DfE KS2 national averages from EES data catalogue
Build and Push Docker Images / Build Backend (FastAPI) (push) Successful in 19s
Build and Push Docker Images / Build Frontend (Next.js) (push) Successful in 53s
Build and Push Docker Images / Build Pipeline (Meltano + dbt + Airflow) (push) Successful in 1m24s
Build and Push Docker Images / Trigger Portainer Update (push) Successful in 1s

Replaces computed means from our school dataset with the published DfE
national headline figures for the KS2 chart reference line.

- tap-uk-ees: new EESKs2NationalStream fetches the stable EES data-catalogue
  CSV (one row per year, England national total, AllSchools filter)
- dbt staging: stg_ees_ks2_national normalises columns, casts to float,
  filters to years >= 201617
- dbt mart: fact_ks2_national_averages — one row per year, official figures
- backend/models: Ks2NationalAverage SQLAlchemy model
- backend/app: /api/national-averages queries the mart for KS2 by_year;
  secondary by_year stays computed (no DfE KS4 national dataset yet)
- DAG: extract_ks2_national task added to school_data_annual_ees,
  runs in parallel with the main EES extract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Tudor Sitaru
2026-04-09 14:40:33 +01:00
parent a3cfffa4d0
commit dc66e22d4d
8 changed files with 236 additions and 12 deletions
+40 -5
View File
@@ -708,21 +708,56 @@ async def get_national_averages(request: Request):
# Secondary: schools where KS4 data is non-null # Secondary: schools where KS4 data is non-null
secondary_df = df_latest[df_latest["attainment_8_score"].notna()] secondary_df = df_latest[df_latest["attainment_8_score"].notna()]
# Per-year averages for every year in the dataset (used by chart reference lines) latest_primary = _means(primary_df, ks2_metrics)
latest_secondary = _means(secondary_df, ks4_metrics)
# Per-year KS2 primary averages: use official DfE figures from the mart table.
# Per-year KS4 secondary averages: computed from our dataset (no DfE dataset yet).
from .database import SessionLocal
from .models import Ks2NationalAverage
by_year = [] by_year = []
try:
db = SessionLocal()
nat_rows = db.query(Ks2NationalAverage).order_by(Ks2NationalAverage.year).all()
# Build a lookup of computed secondary averages per year as fallback
secondary_by_year = {}
for yr in sorted(df["year"].dropna().unique()): for yr in sorted(df["year"].dropna().unique()):
yr = int(yr) yr = int(yr)
df_yr = df[df["year"] == yr] df_yr = df[df["year"] == yr]
secondary_by_year[yr] = _means(
df_yr[df_yr["attainment_8_score"].notna()], ks4_metrics
)
# Merge: official KS2 figures + computed KS4 figures per year
ks2_years = {r.year for r in nat_rows}
all_years = sorted(ks2_years | set(secondary_by_year.keys()))
nat_lookup = {r.year: r for r in nat_rows}
for yr in all_years:
primary_yr: dict = {}
if yr in nat_lookup:
r = nat_lookup[yr]
for col in ks2_metrics:
val = getattr(r, col, None)
if val is not None:
primary_yr[col] = val
by_year.append({ by_year.append({
"year": yr, "year": yr,
"primary": _means(df_yr[df_yr["rwm_expected_pct"].notna()], ks2_metrics), "primary": primary_yr,
"secondary": _means(df_yr[df_yr["attainment_8_score"].notna()], ks4_metrics), "secondary": secondary_by_year.get(yr, {}),
}) })
finally:
db.close()
# Update latest_primary with official DfE figure for the latest year if available
if by_year:
latest_official = next((e["primary"] for e in reversed(by_year) if e["primary"]), None)
if latest_official:
latest_primary = latest_official
return { return {
"year": latest_year, "year": latest_year,
"primary": _means(primary_df, ks2_metrics), "primary": latest_primary,
"secondary": _means(secondary_df, ks4_metrics), "secondary": latest_secondary,
"by_year": by_year, "by_year": by_year,
} }
+22
View File
@@ -215,3 +215,25 @@ class FactFinance(Base):
teacher_cost_pct = Column(Float) teacher_cost_pct = Column(Float)
support_staff_cost_pct = Column(Float) support_staff_cost_pct = Column(Float)
premises_cost_pct = Column(Float) premises_cost_pct = Column(Float)
class Ks2NationalAverage(Base):
"""Official DfE KS2 national headline averages — one row per academic year."""
__tablename__ = "fact_ks2_national_averages"
__table_args__ = MARTS
year = Column(Integer, primary_key=True)
rwm_expected_pct = Column(Float)
rwm_high_pct = Column(Float)
reading_expected_pct = Column(Float)
reading_high_pct = Column(Float)
reading_avg_score = Column(Float)
writing_expected_pct = Column(Float)
writing_gd_pct = Column(Float)
maths_expected_pct = Column(Float)
maths_high_pct = Column(Float)
maths_avg_score = Column(Float)
gps_expected_pct = Column(Float)
gps_high_pct = Column(Float)
gps_avg_score = Column(Float)
science_expected_pct = Column(Float)
+6 -1
View File
@@ -137,10 +137,15 @@ with DAG(
task_id="extract_ees", task_id="extract_ees",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres", bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres",
) )
# KS2 national headlines run in parallel — small single-CSV download
extract_ks2_national = BashOperator(
task_id="extract_ks2_national",
bash_command=f"cd {PIPELINE_DIR} && {MELTANO_BIN} run tap-uk-ees target-postgres --select ees_ks2_national",
)
dbt_build_ees = BashOperator( dbt_build_ees = BashOperator(
task_id="dbt_build", task_id="dbt_build",
bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_legacy_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+", bash_command=f"cd {PIPELINE_DIR}/transform && {DBT_BIN} build --profiles-dir . --target production --select stg_ees_ks2+ stg_legacy_ks2+ stg_ees_ks4+ stg_ees_census+ stg_ees_admissions+ stg_ees_ks2_national+",
) )
sync_typesense_ees = BashOperator( sync_typesense_ees = BashOperator(
@@ -452,6 +452,99 @@ class EESAdmissionsStream(EESDatasetStream):
# on EES. Only national and LA-level files are published. # on EES. Only national and LA-level files are published.
# ── KS2 National Headlines (national level only — one row per year) ───────────
# Dataset: "Key stage 2 attainment: national headlines"
# URL: https://explore-education-statistics.service.gov.uk/data-catalogue/data-set/
# 58bb4b03-c6df-447f-bb7e-b82970c4d974/csv
# This is a stable data-catalogue CSV endpoint (not a versioned release ZIP).
# Covers 2015/16 → latest; COVID years (2019/20, 2020/21) are suppressed ('x').
_KS2_NATIONAL_CSV_URL = (
"https://explore-education-statistics.service.gov.uk/data-catalogue/"
"data-set/58bb4b03-c6df-447f-bb7e-b82970c4d974/csv"
)
_KS2_NATIONAL_COL_MAP = {
"pt_rwm_exp": "rwm_expected_pct",
"pt_rwm_high": "rwm_high_pct",
"pt_read_exp": "reading_expected_pct",
"pt_read_high": "reading_high_pct",
"pt_mat_exp": "maths_expected_pct",
"pt_mat_high": "maths_high_pct",
"pt_writta_exp": "writing_expected_pct",
"pt_writta_gd": "writing_gd_pct",
"pt_gps_exp": "gps_expected_pct",
"pt_gps_high": "gps_high_pct",
"pt_scita_exp": "science_expected_pct",
"avg_readscore": "reading_avg_score",
"avg_gpsscore": "gps_avg_score",
"avg_matscore": "maths_avg_score",
}
class EESKs2NationalStream(Stream):
"""National KS2 headline averages — one row per academic year.
Fetches the DfE EES data-catalogue CSV directly (stable URL, not versioned
release ZIP). Filters to geographic_level == 'National' and
school_type == 'AllSchools' so only the England-wide headline row per year
is emitted. COVID years (2019/20, 2020/21) are naturally absent (suppressed
with 'x' → treated as null downstream in dbt staging).
"""
name = "ees_ks2_national"
primary_keys = ["time_period"]
replication_key = None
schema = th.PropertiesList(
th.Property("time_period", th.StringType, required=True),
th.Property("rwm_expected_pct", th.StringType),
th.Property("rwm_high_pct", th.StringType),
th.Property("reading_expected_pct", th.StringType),
th.Property("reading_high_pct", th.StringType),
th.Property("maths_expected_pct", th.StringType),
th.Property("maths_high_pct", th.StringType),
th.Property("writing_expected_pct", th.StringType),
th.Property("writing_gd_pct", th.StringType),
th.Property("gps_expected_pct", th.StringType),
th.Property("gps_high_pct", th.StringType),
th.Property("science_expected_pct", th.StringType),
th.Property("reading_avg_score", th.StringType),
th.Property("gps_avg_score", th.StringType),
th.Property("maths_avg_score", th.StringType),
).to_dict()
def get_records(self, context):
import pandas as pd
self.logger.info("Downloading KS2 national headlines: %s", _KS2_NATIONAL_CSV_URL)
resp = requests.get(_KS2_NATIONAL_CSV_URL, timeout=60)
resp.raise_for_status()
df = pd.read_csv(
io.BytesIO(resp.content),
dtype=str,
keep_default_na=False,
)
# Normalise column names to lowercase
df.columns = [c.strip().lower() for c in df.columns]
# Keep only the England national headline row per year
if "geographic_level" in df.columns:
df = df[df["geographic_level"].str.strip().str.lower() == "national"]
if "school_type" in df.columns:
df = df[df["school_type"].str.strip().str.lower() == "allschools"]
self.logger.info("Emitting %d national KS2 rows", len(df))
for _, row in df.iterrows():
record = {"time_period": row.get("time_period", "").strip()}
for csv_col, field in _KS2_NATIONAL_COL_MAP.items():
record[field] = row.get(csv_col, "").strip()
yield record
# ── Legacy KS2 (pre-COVID wide format from DfE performance tables) ──────────── # ── Legacy KS2 (pre-COVID wide format from DfE performance tables) ────────────
# The DfE "Compare School Performance" site published school-level KS2 CSVs # The DfE "Compare School Performance" site published school-level KS2 CSVs
# in a wide format (one row per school, ~300 columns). EES only has school-level # in a wide format (one row per school, ~300 columns). EES only has school-level
@@ -629,6 +722,7 @@ class TapUKEES(Tap):
EESCensusStream(self), EESCensusStream(self),
EESAdmissionsStream(self), EESAdmissionsStream(self),
LegacyKS2Stream(self), LegacyKS2Stream(self),
EESKs2NationalStream(self),
] ]
@@ -111,6 +111,12 @@ models:
- name: urn - name: urn
tests: [not_null] tests: [not_null]
- name: fact_ks2_national_averages
description: Official DfE KS2 national headline averages — one row per academic year
columns:
- name: year
tests: [not_null, unique]
- name: fact_deprivation - name: fact_deprivation
description: IDACI deprivation index — one row per URN description: IDACI deprivation index — one row per URN
columns: columns:
@@ -0,0 +1,25 @@
{{ config(materialized='table') }}
-- Mart: Official DfE KS2 national headline averages — one row per academic year.
-- These are the published England-wide figures, not computed means from our school dataset.
-- Used by the /api/national-averages endpoint to provide accurate per-year reference lines
-- on the school history chart and for hero stat comparisons.
select
year,
rwm_expected_pct,
rwm_high_pct,
reading_expected_pct,
reading_high_pct,
reading_avg_score,
writing_expected_pct,
writing_gd_pct,
maths_expected_pct,
maths_high_pct,
maths_avg_score,
gps_expected_pct,
gps_high_pct,
gps_avg_score,
science_expected_pct
from {{ ref('stg_ees_ks2_national') }}
order by year
@@ -45,6 +45,9 @@ sources:
- name: ees_admissions - name: ees_admissions
description: Primary and secondary school admissions data description: Primary and secondary school admissions data
- name: ees_ks2_national
description: KS2 national headline averages from DfE EES data catalogue — one row per academic year
# Phonics: no school-level data on EES (only national/LA level) # Phonics: no school-level data on EES (only national/LA level)
- name: parent_view - name: parent_view
@@ -0,0 +1,34 @@
{{ config(materialized='table') }}
-- Staging model: DfE KS2 national headline averages
-- Source: EES data catalogue CSV (one row per academic year, England national total)
-- COVID years 2019/20 and 2020/21 are naturally absent — DfE did not publish figures
-- because national assessments were cancelled. Those years produce no rows here.
-- 'x' (not applicable) and suppressed values are coerced to NULL by safe_numeric.
select
cast(trim(time_period) as integer) as year,
{{ safe_numeric('rwm_expected_pct') }} as rwm_expected_pct,
{{ safe_numeric('rwm_high_pct') }} as rwm_high_pct,
{{ safe_numeric('reading_expected_pct') }} as reading_expected_pct,
{{ safe_numeric('reading_high_pct') }} as reading_high_pct,
{{ safe_numeric('reading_avg_score') }} as reading_avg_score,
{{ safe_numeric('writing_expected_pct') }} as writing_expected_pct,
{{ safe_numeric('writing_gd_pct') }} as writing_gd_pct,
{{ safe_numeric('maths_expected_pct') }} as maths_expected_pct,
{{ safe_numeric('maths_high_pct') }} as maths_high_pct,
{{ safe_numeric('maths_avg_score') }} as maths_avg_score,
{{ safe_numeric('gps_expected_pct') }} as gps_expected_pct,
{{ safe_numeric('gps_high_pct') }} as gps_high_pct,
{{ safe_numeric('gps_avg_score') }} as gps_avg_score,
{{ safe_numeric('science_expected_pct') }} as science_expected_pct
from {{ source('raw', 'ees_ks2_national') }}
where time_period ~ '^[0-9]+$'
and cast(trim(time_period) as integer) >= 201617