Files
parentzone_downloader/docs/superpowers/plans/2026-05-15-incremental-snapshot.md
T

422 lines
14 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Incremental Snapshot Downloader Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace the date-stamped multi-file output with a single `snapshots.html` updated incrementally each daily run.
**Architecture:** Add cache/state I/O methods to `SnapshotDownloader`, change the output filename to a fixed `snapshots.html`, and rewrite `download_snapshots` to load an existing JSON cache, fetch only new snapshots since the last run, merge and deduplicate by `id`, then re-render the full HTML from the merged list.
**Tech Stack:** Python 3.13, aiohttp, aiofiles, pytest, unittest.mock
---
## File Map
| File | Change |
|---|---|
| `src/snapshot_downloader.py` | Add 4 I/O methods; modify `generate_html_file` and `download_snapshots` |
| `tests/test_incremental_snapshot.py` | New — unit tests for all new and modified behaviour |
---
## Task 1: Cache and state file I/O methods
**Files:**
- Modify: `src/snapshot_downloader.py`
- Create: `tests/test_incremental_snapshot.py`
- [ ] **Step 1: Install pytest**
```bash
pip install pytest
```
Expected: pytest installed (confirm with `pytest --version`).
- [ ] **Step 2: Create `tests/test_incremental_snapshot.py` with failing tests for all four I/O methods**
```python
import asyncio
import json
import pytest
from unittest.mock import AsyncMock, patch
from src.snapshot_downloader import SnapshotDownloader
def _downloader(tmp_path):
return SnapshotDownloader(output_dir=str(tmp_path), api_key="test-key")
# --- load_snapshot_cache ---
def test_load_snapshot_cache_missing(tmp_path):
assert _downloader(tmp_path).load_snapshot_cache() == []
def test_load_snapshot_cache_returns_data(tmp_path):
d = _downloader(tmp_path)
snapshots = [{"id": "1", "notes": "hello"}]
(tmp_path / "snapshots_cache.json").write_text(json.dumps(snapshots))
assert d.load_snapshot_cache() == snapshots
def test_load_snapshot_cache_malformed_returns_empty(tmp_path):
d = _downloader(tmp_path)
(tmp_path / "snapshots_cache.json").write_text("not json{{{")
assert d.load_snapshot_cache() == []
def test_load_snapshot_cache_non_list_returns_empty(tmp_path):
d = _downloader(tmp_path)
(tmp_path / "snapshots_cache.json").write_text('{"key": "val"}')
assert d.load_snapshot_cache() == []
# --- save_snapshot_cache ---
def test_save_snapshot_cache_writes_json(tmp_path):
d = _downloader(tmp_path)
snapshots = [{"id": "1"}, {"id": "2"}]
d.save_snapshot_cache(snapshots)
data = json.loads((tmp_path / "snapshots_cache.json").read_text())
assert data == snapshots
# --- load_last_run_date ---
def test_load_last_run_date_missing(tmp_path):
assert _downloader(tmp_path).load_last_run_date() is None
def test_load_last_run_date_returns_date(tmp_path):
d = _downloader(tmp_path)
(tmp_path / "last_run.json").write_text('{"last_date_to": "2025-01-01"}')
assert d.load_last_run_date() == "2025-01-01"
def test_load_last_run_date_malformed_returns_none(tmp_path):
d = _downloader(tmp_path)
(tmp_path / "last_run.json").write_text("not json")
assert d.load_last_run_date() is None
# --- save_last_run_date ---
def test_save_last_run_date_writes_json(tmp_path):
d = _downloader(tmp_path)
d.save_last_run_date("2025-06-01")
data = json.loads((tmp_path / "last_run.json").read_text())
assert data == {"last_date_to": "2025-06-01"}
```
- [ ] **Step 3: Run tests to confirm they fail**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v
```
Expected: all tests FAIL with `AttributeError: 'SnapshotDownloader' object has no attribute 'load_snapshot_cache'`.
- [ ] **Step 4: Add the four I/O methods to `SnapshotDownloader` in `src/snapshot_downloader.py`**
Add after the `download_media_file` method (around line 540), before `generate_html_file`:
```python
def load_snapshot_cache(self) -> List[Dict[str, Any]]:
cache_file = self.output_dir / "snapshots_cache.json"
if not cache_file.exists():
return []
try:
with open(cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except (json.JSONDecodeError, OSError):
self.logger.warning("Could not read snapshot cache; starting fresh")
return []
def save_snapshot_cache(self, snapshots: List[Dict[str, Any]]) -> None:
cache_file = self.output_dir / "snapshots_cache.json"
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(snapshots, f, indent=2, default=str)
def load_last_run_date(self) -> Optional[str]:
state_file = self.output_dir / "last_run.json"
if not state_file.exists():
return None
try:
with open(state_file, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("last_date_to")
except (json.JSONDecodeError, OSError):
return None
def save_last_run_date(self, date: str) -> None:
state_file = self.output_dir / "last_run.json"
with open(state_file, "w", encoding="utf-8") as f:
json.dump({"last_date_to": date}, f)
```
- [ ] **Step 5: Run tests to confirm they pass**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v
```
Expected: all tests PASS.
- [ ] **Step 6: Commit**
```bash
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: add snapshot cache and state file I/O methods"
```
---
## Task 2: Fixed output filename
**Files:**
- Modify: `src/snapshot_downloader.py:562`
- Modify: `tests/test_incremental_snapshot.py`
- [ ] **Step 1: Add a failing test for the fixed filename**
Append to `tests/test_incremental_snapshot.py`:
```python
# --- generate_html_file fixed filename ---
def test_generate_html_file_uses_fixed_filename(tmp_path):
d = _downloader(tmp_path)
with patch.object(d, "generate_html_template", new_callable=AsyncMock, return_value="<html></html>"):
result = asyncio.run(d.generate_html_file([], "2024-01-01", "2025-01-01"))
assert result.name == "snapshots.html"
assert (tmp_path / "snapshots.html").exists()
```
- [ ] **Step 2: Run to confirm it fails**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py::test_generate_html_file_uses_fixed_filename -v
```
Expected: FAIL — the file is named `snapshots_2024-01-01_to_2025-01-01.html`, not `snapshots.html`.
- [ ] **Step 3: Change the filename in `generate_html_file`**
In `src/snapshot_downloader.py`, find (around line 562):
```python
filename = f"snapshots_{date_from}_to_{date_to}.html"
```
Replace with:
```python
filename = "snapshots.html"
```
- [ ] **Step 4: Run tests to confirm they pass**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v
```
Expected: all tests PASS.
- [ ] **Step 5: Commit**
```bash
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: write snapshots to fixed filename snapshots.html"
```
---
## Task 3: Incremental `download_snapshots`
**Files:**
- Modify: `src/snapshot_downloader.py:9761036`
- Modify: `tests/test_incremental_snapshot.py`
- [ ] **Step 1: Add failing tests for the incremental orchestration**
Append to `tests/test_incremental_snapshot.py`:
```python
# --- incremental download_snapshots ---
def _run_download(d, **kwargs):
"""Run download_snapshots with mocked API calls."""
new_snapshots = kwargs.pop("new_snapshots", [])
mock_fetch = AsyncMock(return_value=new_snapshots)
with patch.object(d, "authenticate", new_callable=AsyncMock):
with patch.object(d, "fetch_all_snapshots", mock_fetch):
with patch.object(d, "generate_html_file", new_callable=AsyncMock,
return_value=d.output_dir / "snapshots.html"):
asyncio.run(d.download_snapshots(**kwargs))
return mock_fetch
def test_first_run_saves_cache_and_state(tmp_path):
d = _downloader(tmp_path)
new_snapshots = [{"id": "abc", "startTime": "2025-01-15T10:00:00Z"}]
_run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)
assert d.load_snapshot_cache() == new_snapshots
assert d.load_last_run_date() is not None
def test_subsequent_run_uses_last_run_date_as_fetch_from(tmp_path):
d = _downloader(tmp_path)
d.save_last_run_date("2025-03-01")
d.save_snapshot_cache([{"id": "old", "startTime": "2025-02-01T00:00:00Z"}])
new_snapshots = [{"id": "new", "startTime": "2025-03-15T00:00:00Z"}]
mock_fetch = _run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)
# Third positional arg to fetch_all_snapshots is date_from (after session, type_ids)
assert mock_fetch.call_args.args[2] == "2025-03-01"
ids = {s["id"] for s in d.load_snapshot_cache()}
assert ids == {"old", "new"}
def test_deduplication_by_id(tmp_path):
d = _downloader(tmp_path)
d.save_last_run_date("2025-01-01")
d.save_snapshot_cache([{"id": "dup", "startTime": "2025-01-01T00:00:00Z"}])
# API returns the boundary snapshot again plus one new one
new_snapshots = [
{"id": "dup", "startTime": "2025-01-01T00:00:00Z"},
{"id": "fresh", "startTime": "2025-01-02T00:00:00Z"},
]
_run_download(d, date_from="2024-01-01", new_snapshots=new_snapshots)
cache = d.load_snapshot_cache()
ids = [s["id"] for s in cache]
assert ids.count("dup") == 1
assert "fresh" in ids
def test_fetch_failure_does_not_update_state(tmp_path):
d = _downloader(tmp_path)
d.save_last_run_date("2025-01-01")
d.save_snapshot_cache([{"id": "existing"}])
with patch.object(d, "authenticate", new_callable=AsyncMock):
with patch.object(d, "fetch_all_snapshots", new_callable=AsyncMock,
side_effect=Exception("network error")):
with pytest.raises(Exception, match="network error"):
asyncio.run(d.download_snapshots(date_from="2024-01-01"))
assert d.load_last_run_date() == "2025-01-01"
assert d.load_snapshot_cache() == [{"id": "existing"}]
```
- [ ] **Step 2: Run to confirm they fail**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py::test_first_run_saves_cache_and_state tests/test_incremental_snapshot.py::test_subsequent_run_uses_last_run_date_as_fetch_from tests/test_incremental_snapshot.py::test_deduplication_by_id tests/test_incremental_snapshot.py::test_fetch_failure_does_not_update_state -v
```
Expected: all four FAIL (the method does not yet do incremental logic).
- [ ] **Step 3: Replace `download_snapshots` in `src/snapshot_downloader.py`**
Find the entire `download_snapshots` method (lines 9761036) and replace it with:
```python
async def download_snapshots(
self,
type_ids: List[int] = [15],
date_from: str = None,
date_to: str = None,
max_pages: int = None,
) -> Path:
"""
Download new snapshots incrementally and regenerate snapshots.html.
date_from is used only on the first run (no last_run.json).
date_to is always today regardless of what is passed.
"""
date_to = datetime.now().strftime("%Y-%m-%d")
# Determine fetch window start
last_run_date = self.load_last_run_date()
if last_run_date:
fetch_from = last_run_date
self.logger.info(f"Incremental run: fetching from {fetch_from}")
else:
if date_from is None:
date_from = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
fetch_from = date_from
self.logger.info(f"First run: fetching all snapshots from {fetch_from}")
self.logger.info(f"Fetch window: {fetch_from} to {date_to}")
# Load accumulated snapshot data
existing_snapshots = self.load_snapshot_cache()
self.logger.info(f"Loaded {len(existing_snapshots)} snapshots from cache")
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
# Authenticate if needed
await self.authenticate()
# Fetch only new snapshots
new_snapshots = await self.fetch_all_snapshots(
session, type_ids, fetch_from, date_to, max_pages
)
# Merge: deduplicate by id
existing_ids = {s.get("id") for s in existing_snapshots}
added = [s for s in new_snapshots if s.get("id") not in existing_ids]
merged = existing_snapshots + added
self.logger.info(f"Added {len(added)} new snapshots (total: {len(merged)})")
if not merged:
self.logger.warning("No snapshots found")
return None
# Persist updated cache and state
self.save_snapshot_cache(merged)
html_file = await self.generate_html_file(merged, date_from or fetch_from, date_to)
self.save_last_run_date(date_to)
self.print_statistics()
return html_file
```
- [ ] **Step 4: Run all tests to confirm they pass**
```bash
PYTHONPATH=. pytest tests/test_incremental_snapshot.py -v
```
Expected: all tests PASS.
- [ ] **Step 5: Commit**
```bash
git add src/snapshot_downloader.py tests/test_incremental_snapshot.py
git commit -m "feat: incremental snapshot fetch with JSON cache and state file"
```
---
## Self-Review Checklist
- Spec: all 4 I/O methods — covered in Task 1
- Spec: fixed filename — covered in Task 2
- Spec: incremental run logic (7 steps) — covered in Task 3
- Spec: fetch failure leaves state unchanged — covered by `test_fetch_failure_does_not_update_state`
- Spec: deduplication by `id` — covered by `test_deduplication_by_id`
- Spec: `ConfigSnapshotDownloader` unchanged — no tasks touch it
- Method names consistent across all tasks: `load_snapshot_cache`, `save_snapshot_cache`, `load_last_run_date`, `save_last_run_date`
- `fetch_all_snapshots` call args order `(session, type_ids, fetch_from, date_to, max_pages)` matches existing signature at line 245