Files
parentzone_downloader/docs/superpowers/plans/2026-05-15-incremental-snapshot.md
T

14 KiB
Raw Blame History

Incremental Snapshot Downloader Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Replace the date-stamped multi-file output with a single snapshots.html updated incrementally each daily run.

Architecture: Add cache/state I/O methods to SnapshotDownloader, change the output filename to a fixed snapshots.html, and rewrite download_snapshots to load an existing JSON cache, fetch only new snapshots since the last run, merge and deduplicate by id, then re-render the full HTML from the merged list.

Tech Stack: Python 3.13, aiohttp, aiofiles, pytest, unittest.mock


File Map

File Change
src/snapshot_downloader.py Add 4 I/O methods; modify generate_html_file and download_snapshots
tests/test_incremental_snapshot.py New — unit tests for all new and modified behaviour

Task 1: Cache and state file I/O methods

Files:

  • Modify: src/snapshot_downloader.py

  • Create: tests/test_incremental_snapshot.py

  • Step 1: Install pytest

pip install pytest

Expected: pytest installed (confirm with pytest --version).

  • Step 2: Create tests/test_incremental_snapshot.py with failing tests for all four I/O methods
import asyncio
import json
import pytest
from unittest.mock import AsyncMock, patch

from src.snapshot_downloader import SnapshotDownloader


def _downloader(tmp_path):
    return SnapshotDownloader(output_dir=str(tmp_path), api_key="test-key")


# --- load_snapshot_cache ---

def test_load_snapshot_cache_missing(tmp_path):
    assert _downloader(tmp_path).load_snapshot_cache() == []


def test_load_snapshot_cache_returns_data(tmp_path):
    d = _downloader(tmp_path)
    snapshots = [{"id": "1", "notes": "hello"}]
    (tmp_path / "snapshots_cache.json").write_text(json.dumps(snapshots))
    assert d.load_snapshot_cache() == snapshots


def test_load_snapshot_cache_malformed_returns_empty(tmp_path):
    d = _downloader(tmp_path)
    (tmp_path / "snapshots_cache.json").write_text("not json{{{")
    assert d.load_snapshot_cache() == []


def test_load_snapshot_cache_non_list_returns_empty(tmp_path):
    d = _downloader(tmp_path)
    (tmp_path / "snapshots_cache.json").write_text('{"key": "val"}')
    assert d.load_snapshot_cache() == []


# --- save_snapshot_cache ---

def test_save_snapshot_cache_writes_json(tmp_path):
    d = _downloader(tmp_path)
    snapshots = [{"id": "1"}, {"id": "2"}]
    d.save_snapshot_cache(snapshots)
    data = json.loads((tmp_path / "snapshots_cache.json").read_text())
    assert data == snapshots


# --- load_last_run_date ---

def test_load_last_run_date_missing(tmp_path):
    assert _downloader(tmp_path).load_last_run_date() is None


def test_load_last_run_date_returns_date(tmp_path):
    d = _downloader(tmp_path)
    (tmp_path / "last_run.json").write_text('{"last_date_to": "2025-01-01"}')
    assert d.load_last_run_date() == "2025-01-01"


def test_load_last_run_date_malformed_returns_none(tmp_path):
    d = _downloader(tmp_path)
    (tmp_path / "last_run.json").write_text("not json")
    assert d.load_last_run_date() is None


# --- save_last_run_date ---

def test_save_last_run_date_writes_json(tmp_path):
    d = _downloader(tmp_path)
    d.save_last_run_date("2025-06-01")
    data = json.loads((tmp_path / "last_run.json").read_text())
    assert data == {"last_date_to": "2025-06-01"}
  • Step 3: Run tests to confirm they fail
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v

Expected: all tests FAIL with AttributeError: 'SnapshotDownloader' object has no attribute 'load_snapshot_cache'.

  • Step 4: Add the four I/O methods to SnapshotDownloader in src/snapshot_downloader.py

Add after the download_media_file method (around line 540), before generate_html_file:

def load_snapshot_cache(self) -> List[Dict[str, Any]]:
    cache_file = self.output_dir / "snapshots_cache.json"
    if not cache_file.exists():
        return []
    try:
        with open(cache_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data if isinstance(data, list) else []
    except (json.JSONDecodeError, OSError):
        self.logger.warning("Could not read snapshot cache; starting fresh")
        return []

def save_snapshot_cache(self, snapshots: List[Dict[str, Any]]) -> None:
    cache_file = self.output_dir / "snapshots_cache.json"
    with open(cache_file, "w", encoding="utf-8") as f:
        json.dump(snapshots, f, indent=2, default=str)

def load_last_run_date(self) -> Optional[str]:
    state_file = self.output_dir / "last_run.json"
    if not state_file.exists():
        return None
    try:
        with open(state_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data.get("last_date_to")
    except (json.JSONDecodeError, OSError):
        return None

def save_last_run_date(self, date: str) -> None:
    state_file = self.output_dir / "last_run.json"
    with open(state_file, "w", encoding="utf-8") as f:
        json.dump({"last_date_to": date}, f)
  • Step 5: Run tests to confirm they pass
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v

Expected: all tests PASS.

  • Step 6: Commit
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: add snapshot cache and state file I/O methods"

Task 2: Fixed output filename

Files:

  • Modify: src/snapshot_downloader.py:562

  • Modify: tests/test_incremental_snapshot.py

  • Step 1: Add a failing test for the fixed filename

Append to tests/test_incremental_snapshot.py:

# --- generate_html_file fixed filename ---

def test_generate_html_file_uses_fixed_filename(tmp_path):
    d = _downloader(tmp_path)
    with patch.object(d, "generate_html_template", new_callable=AsyncMock, return_value="<html></html>"):
        result = asyncio.run(d.generate_html_file([], "2024-01-01", "2025-01-01"))
    assert result.name == "snapshots.html"
    assert (tmp_path / "snapshots.html").exists()
  • Step 2: Run to confirm it fails
PYTHONPATH=. pytest tests/test_incremental_snapshot.py::test_generate_html_file_uses_fixed_filename -v

Expected: FAIL — the file is named snapshots_2024-01-01_to_2025-01-01.html, not snapshots.html.

  • Step 3: Change the filename in generate_html_file

In src/snapshot_downloader.py, find (around line 562):

        filename = f"snapshots_{date_from}_to_{date_to}.html"

Replace with:

        filename = "snapshots.html"
  • Step 4: Run tests to confirm they pass
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v

Expected: all tests PASS.

  • Step 5: Commit
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: write snapshots to fixed filename snapshots.html"

Task 3: Incremental download_snapshots

Files:

  • Modify: src/snapshot_downloader.py:9761036

  • Modify: tests/test_incremental_snapshot.py

  • Step 1: Add failing tests for the incremental orchestration

Append to tests/test_incremental_snapshot.py:

# --- incremental download_snapshots ---

def _run_download(d, **kwargs):
    """Run download_snapshots with mocked API calls."""
    new_snapshots = kwargs.pop("new_snapshots", [])
    mock_fetch = AsyncMock(return_value=new_snapshots)
    with patch.object(d, "authenticate", new_callable=AsyncMock):
        with patch.object(d, "fetch_all_snapshots", mock_fetch):
            with patch.object(d, "generate_html_file", new_callable=AsyncMock,
                              return_value=d.output_dir / "snapshots.html"):
                asyncio.run(d.download_snapshots(**kwargs))
    return mock_fetch


def test_first_run_saves_cache_and_state(tmp_path):
    d = _downloader(tmp_path)
    new_snapshots = [{"id": "abc", "startTime": "2025-01-15T10:00:00Z"}]
    _run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)

    assert d.load_snapshot_cache() == new_snapshots
    assert d.load_last_run_date() is not None


def test_subsequent_run_uses_last_run_date_as_fetch_from(tmp_path):
    d = _downloader(tmp_path)
    d.save_last_run_date("2025-03-01")
    d.save_snapshot_cache([{"id": "old", "startTime": "2025-02-01T00:00:00Z"}])

    new_snapshots = [{"id": "new", "startTime": "2025-03-15T00:00:00Z"}]
    mock_fetch = _run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)

    # Third positional arg to fetch_all_snapshots is date_from (after session, type_ids)
    assert mock_fetch.call_args.args[2] == "2025-03-01"

    ids = {s["id"] for s in d.load_snapshot_cache()}
    assert ids == {"old", "new"}


def test_deduplication_by_id(tmp_path):
    d = _downloader(tmp_path)
    d.save_last_run_date("2025-01-01")
    d.save_snapshot_cache([{"id": "dup", "startTime": "2025-01-01T00:00:00Z"}])

    # API returns the boundary snapshot again plus one new one
    new_snapshots = [
        {"id": "dup", "startTime": "2025-01-01T00:00:00Z"},
        {"id": "fresh", "startTime": "2025-01-02T00:00:00Z"},
    ]
    _run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)

    cache = d.load_snapshot_cache()
    ids = [s["id"] for s in cache]
    assert ids.count("dup") == 1
    assert "fresh" in ids


def test_fetch_failure_does_not_update_state(tmp_path):
    d = _downloader(tmp_path)
    d.save_last_run_date("2025-01-01")
    d.save_snapshot_cache([{"id": "existing"}])

    with patch.object(d, "authenticate", new_callable=AsyncMock):
        with patch.object(d, "fetch_all_snapshots", new_callable=AsyncMock,
                          side_effect=Exception("network error")):
            with pytest.raises(Exception, match="network error"):
                asyncio.run(d.download_snapshots(date_from="2024-01-01"))

    assert d.load_last_run_date() == "2025-01-01"
    assert d.load_snapshot_cache() == [{"id": "existing"}]
  • Step 2: Run to confirm they fail
PYTHONPATH=. pytest tests/test_incremental_snapshot.py::test_first_run_saves_cache_and_state tests/test_incremental_snapshot.py::test_subsequent_run_uses_last_run_date_as_fetch_from tests/test_incremental_snapshot.py::test_deduplication_by_id tests/test_incremental_snapshot.py::test_fetch_failure_does_not_update_state -v

Expected: all four FAIL (the method does not yet do incremental logic).

  • Step 3: Replace download_snapshots in src/snapshot_downloader.py

Find the entire download_snapshots method (lines 9761036) and replace it with:

    async def download_snapshots(
        self,
        type_ids: List[int] = [15],
        date_from: str = None,
        date_to: str = None,
        max_pages: int = None,
    ) -> Path:
        """
        Download new snapshots incrementally and regenerate snapshots.html.

        date_from is used only on the first run (no last_run.json).
        date_to is always today regardless of what is passed.
        """
        date_to = datetime.now().strftime("%Y-%m-%d")

        # Determine fetch window start
        last_run_date = self.load_last_run_date()
        if last_run_date:
            fetch_from = last_run_date
            self.logger.info(f"Incremental run: fetching from {fetch_from}")
        else:
            if date_from is None:
                date_from = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
            fetch_from = date_from
            self.logger.info(f"First run: fetching all snapshots from {fetch_from}")

        self.logger.info(f"Fetch window: {fetch_from} to {date_to}")

        # Load accumulated snapshot data
        existing_snapshots = self.load_snapshot_cache()
        self.logger.info(f"Loaded {len(existing_snapshots)} snapshots from cache")

        connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
        timeout = aiohttp.ClientTimeout(total=30)

        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            # Authenticate if needed
            await self.authenticate()

            # Fetch only new snapshots
            new_snapshots = await self.fetch_all_snapshots(
                session, type_ids, fetch_from, date_to, max_pages
            )

        # Merge: deduplicate by id
        existing_ids = {s.get("id") for s in existing_snapshots}
        added = [s for s in new_snapshots if s.get("id") not in existing_ids]
        merged = existing_snapshots + added
        self.logger.info(f"Added {len(added)} new snapshots (total: {len(merged)})")

        if not merged:
            self.logger.warning("No snapshots found")
            return None

        # Persist updated cache and state
        self.save_snapshot_cache(merged)
        html_file = await self.generate_html_file(merged, date_from or fetch_from, date_to)
        self.save_last_run_date(date_to)

        self.print_statistics()
        return html_file
  • Step 4: Run all tests to confirm they pass
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v

Expected: all tests PASS.

  • Step 5: Commit
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: incremental snapshot fetch with JSON cache and state file"

Self-Review Checklist

  • Spec: all 4 I/O methods — covered in Task 1
  • Spec: fixed filename — covered in Task 2
  • Spec: incremental run logic (7 steps) — covered in Task 3
  • Spec: fetch failure leaves state unchanged — covered by test_fetch_failure_does_not_update_state
  • Spec: deduplication by id — covered by test_deduplication_by_id
  • Spec: ConfigSnapshotDownloader unchanged — no tasks touch it
  • Method names consistent across all tasks: load_snapshot_cache, save_snapshot_cache, load_last_run_date, save_last_run_date
  • fetch_all_snapshots call args order (session, type_ids, fetch_from, date_to, max_pages) matches existing signature at line 245