1246 lines
40 KiB
Python
1246 lines
40 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Snapshot Downloader for ParentZone
|
||
|
||
This script downloads snapshots (daily events) from the ParentZone API with pagination support
|
||
and generates a comprehensive markup file containing all the snapshot information.
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import html
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional
|
||
from urllib.parse import urlencode
|
||
|
||
import aiofiles
|
||
import aiohttp
|
||
|
||
# Import the auth manager
|
||
try:
|
||
from src.auth_manager import AuthManager
|
||
except ImportError:
|
||
AuthManager = None
|
||
|
||
|
||
class SnapshotDownloader:
|
||
def __init__(
|
||
self,
|
||
api_url: str = "https://api.parentzone.me",
|
||
output_dir: str = "snapshots",
|
||
api_key: str = None,
|
||
email: str = None,
|
||
password: str = None,
|
||
debug_mode: bool = False,
|
||
):
|
||
"""
|
||
Initialize the snapshot downloader.
|
||
|
||
Args:
|
||
api_url: Base URL of the API
|
||
output_dir: Directory to save the snapshot files
|
||
api_key: API key for authentication
|
||
email: Email for login authentication
|
||
password: Password for login authentication
|
||
debug_mode: Enable detailed server response logging
|
||
"""
|
||
self.api_url = api_url.rstrip("/")
|
||
self.snapshots_endpoint = "/v1/posts"
|
||
self.output_dir = Path(output_dir)
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Create assets subfolder for media downloads
|
||
self.assets_dir = self.output_dir / "assets"
|
||
self.assets_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Authentication
|
||
self.api_key = api_key
|
||
self.email = email
|
||
self.password = password
|
||
self.auth_manager = None
|
||
self.debug_mode = debug_mode
|
||
|
||
# Setup logging
|
||
self.setup_logging()
|
||
|
||
# Standard headers based on the curl command
|
||
self.headers = {
|
||
"accept": "application/json, text/plain, */*",
|
||
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8,ro;q=0.7",
|
||
"origin": "https://parentzone.me",
|
||
"priority": "u=1, i",
|
||
"sec-ch-ua": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
|
||
"sec-ch-ua-mobile": "?0",
|
||
"sec-ch-ua-platform": '"macOS"',
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-site",
|
||
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
|
||
"x-client-version": "3.54.0",
|
||
}
|
||
|
||
# Statistics
|
||
self.stats = {
|
||
"total_snapshots": 0,
|
||
"pages_fetched": 0,
|
||
"failed_requests": 0,
|
||
"generated_files": 0,
|
||
}
|
||
|
||
def setup_logging(self):
|
||
"""Setup logging configuration."""
|
||
log_file = self.output_dir / "snapshots.log"
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
|
||
)
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
async def authenticate(self):
|
||
"""Perform authentication if credentials are provided."""
|
||
if self.email and self.password and AuthManager:
|
||
self.logger.info("Attempting login authentication...")
|
||
self.auth_manager = AuthManager(self.api_url)
|
||
success = await self.auth_manager.login(self.email, self.password)
|
||
|
||
if success:
|
||
self.logger.info("Login authentication successful")
|
||
# Use the API key from auth manager
|
||
auth_headers = self.auth_manager.get_auth_headers()
|
||
if "x-api-key" in auth_headers:
|
||
self.api_key = auth_headers["x-api-key"]
|
||
else:
|
||
self.logger.error("Login authentication failed")
|
||
raise Exception("Login authentication failed")
|
||
|
||
def get_auth_headers(self) -> Dict[str, str]:
|
||
"""Get headers with authentication."""
|
||
headers = self.headers.copy()
|
||
|
||
if self.api_key:
|
||
headers["x-api-key"] = self.api_key
|
||
elif self.auth_manager and self.auth_manager.is_authenticated():
|
||
headers.update(self.auth_manager.get_auth_headers())
|
||
|
||
return headers
|
||
|
||
async def fetch_snapshots_page(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
type_ids: list[int] = [15],
|
||
date_from: str = "2021-10-18",
|
||
date_to: str = "",
|
||
cursor: str = None,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
Fetch a single page of snapshots from the API using cursor-based pagination.
|
||
|
||
Args:
|
||
session: aiohttp session
|
||
type_ids: List of type IDs to filter by
|
||
date_from: Start date in YYYY-MM-DD format
|
||
date_to: End date in YYYY-MM-DD format
|
||
cursor: Cursor for pagination (None for first page)
|
||
|
||
Returns:
|
||
Dictionary containing the API response
|
||
"""
|
||
if date_to == "":
|
||
date_to = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
# Build query parameters
|
||
params = {
|
||
"dateFrom": date_from,
|
||
"dateTo": date_to,
|
||
}
|
||
|
||
# Add cursor for pagination (skip for first request)
|
||
if cursor:
|
||
params["cursor"] = cursor
|
||
|
||
# Add type IDs - API expects typeIDs[]=15 format
|
||
for type_id in type_ids:
|
||
params["typeIDs[]"] = type_id
|
||
|
||
# Build URL with parameters
|
||
query_string = urlencode(params, doseq=True)
|
||
url = f"{self.api_url}{self.snapshots_endpoint}?{query_string}"
|
||
|
||
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
|
||
self.logger.info(f"Fetching snapshots ({page_info}): {url}")
|
||
|
||
headers = self.get_auth_headers()
|
||
|
||
try:
|
||
async with session.get(url, headers=headers, timeout=30) as response:
|
||
response.raise_for_status()
|
||
data = await response.json()
|
||
|
||
# Print detailed response information for debugging if enabled
|
||
if self.debug_mode:
|
||
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
|
||
print(f"\n=== SERVER RESPONSE DEBUG ({page_info}) ===")
|
||
print(f"Status Code: {response.status}")
|
||
print(f"Headers: {dict(response.headers)}")
|
||
print(f"Response Type: {type(data)}")
|
||
print(
|
||
f"Response Keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}"
|
||
)
|
||
print(f"Posts count: {len(data.get('posts', []))}")
|
||
print(f"Cursor: {data.get('cursor', 'None')}")
|
||
if (
|
||
len(data.get("posts", [])) <= 3
|
||
): # Only print full data if few posts
|
||
print("Full Response Data:")
|
||
print(json.dumps(data, indent=2, default=str))
|
||
print("=" * 50)
|
||
|
||
# The API returns snapshots in 'posts' field
|
||
snapshots = data.get("posts", [])
|
||
|
||
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
|
||
self.logger.info(f"Retrieved {len(snapshots)} snapshots ({page_info})")
|
||
self.stats["pages_fetched"] += 1
|
||
|
||
# Return the actual API response format
|
||
return data
|
||
|
||
except Exception as e:
|
||
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
|
||
self.logger.error(f"Failed to fetch snapshots ({page_info}): {e}")
|
||
self.stats["failed_requests"] += 1
|
||
raise
|
||
|
||
async def fetch_all_snapshots(
|
||
self,
|
||
session: aiohttp.ClientSession,
|
||
type_ids: List[int] = [15],
|
||
date_from: str = "2021-10-18",
|
||
date_to: str = None,
|
||
max_pages: int = None,
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Fetch all snapshots across all pages using cursor-based pagination.
|
||
|
||
Args:
|
||
session: aiohttp session
|
||
type_ids: List of type IDs to filter by
|
||
date_from: Start date in YYYY-MM-DD format
|
||
date_to: End date in YYYY-MM-DD format
|
||
max_pages: Maximum number of pages to fetch (for testing)
|
||
|
||
Returns:
|
||
List of all snapshot dictionaries
|
||
"""
|
||
all_snapshots = []
|
||
cursor = None
|
||
page_count = 0
|
||
|
||
self.logger.info(
|
||
f"Starting snapshot fetch from {date_from} to {date_to or 'now'}"
|
||
)
|
||
|
||
while True:
|
||
page_count += 1
|
||
|
||
if max_pages and page_count > max_pages:
|
||
self.logger.info(f"Reached maximum pages limit: {max_pages}")
|
||
break
|
||
|
||
try:
|
||
response = await self.fetch_snapshots_page(
|
||
session, type_ids, date_from, date_to, cursor
|
||
)
|
||
|
||
# Extract snapshots from response
|
||
snapshots = response.get("posts", [])
|
||
new_cursor = response.get("cursor")
|
||
|
||
if not snapshots:
|
||
self.logger.info("No more snapshots found (empty posts array)")
|
||
break
|
||
|
||
all_snapshots.extend(snapshots)
|
||
self.stats["total_snapshots"] += len(snapshots)
|
||
|
||
self.logger.info(
|
||
f"Page {page_count}: {len(snapshots)} snapshots (total: {len(all_snapshots)})"
|
||
)
|
||
|
||
# If no cursor returned, we've reached the end
|
||
if not new_cursor:
|
||
self.logger.info("Reached last page (no cursor returned)")
|
||
break
|
||
|
||
# Update cursor for next iteration
|
||
cursor = new_cursor
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error fetching page {page_count}: {e}")
|
||
break
|
||
|
||
self.logger.info(f"Total snapshots fetched: {len(all_snapshots)}")
|
||
return all_snapshots
|
||
|
||
async def format_snapshot_html(
|
||
self, snapshot: Dict[str, Any], session: aiohttp.ClientSession
|
||
) -> str:
|
||
"""
|
||
Format a single snapshot as HTML.
|
||
|
||
Args:
|
||
snapshot: Snapshot dictionary from API
|
||
|
||
Returns:
|
||
HTML string for the snapshot
|
||
"""
|
||
# Extract key information from ParentZone snapshot format
|
||
snapshot_id = snapshot.get("id", "unknown")
|
||
content = snapshot.get("notes", "") # Don't escape HTML in notes field
|
||
start_time = snapshot.get("startTime", "")
|
||
snapshot_type = snapshot.get("type", "Snapshot")
|
||
|
||
# Format dates
|
||
start_date = self.format_date(start_time) if start_time else "Unknown"
|
||
|
||
# Extract additional information
|
||
author = snapshot.get("author", {})
|
||
author_forename = author.get("forename", "") if author else ""
|
||
author_surname = author.get("surname", "") if author else ""
|
||
author_name = (
|
||
html.escape(f"{author_forename} {author_surname}".strip())
|
||
if author
|
||
else "Unknown"
|
||
)
|
||
|
||
# Extract child information (if any)
|
||
child = snapshot.get("child", {})
|
||
child_forename = child.get("forename", "") if child else ""
|
||
child_name = (
|
||
html.escape(
|
||
f"{child.get('forename', '')} {child.get('surname', '')}".strip()
|
||
)
|
||
if child
|
||
else ""
|
||
)
|
||
|
||
# Create title in format: "Child Forename by Author Forename Surname"
|
||
if child_forename and author_forename:
|
||
title = html.escape(
|
||
f"{child_forename} by {author_forename} {author_surname}".strip()
|
||
)
|
||
else:
|
||
title = html.escape(f"Snapshot {snapshot_id}")
|
||
|
||
# Extract location/activity information
|
||
activity = snapshot.get("activity", {})
|
||
activity_name = html.escape(activity.get("name", "")) if activity else ""
|
||
|
||
# Build HTML
|
||
html_content = f"""
|
||
<div class="snapshot" id="snapshot-{snapshot_id}">
|
||
<div class="snapshot-header">
|
||
<h3 class="snapshot-title">{title}</h3>
|
||
<div class="snapshot-meta">
|
||
<span class="snapshot-id">ID: {snapshot_id}</span>
|
||
<span class="snapshot-type">Type: {snapshot_type}</span>
|
||
<span class="snapshot-date">Date: {start_date}</span>
|
||
{f'<span class="snapshot-signed">{"✓ Signed" if snapshot.get("signed", False) else "⏳ Pending"}</span>'}
|
||
</div>
|
||
</div>
|
||
|
||
<div class="snapshot-content">
|
||
{f'<div class="snapshot-author">👤 Author: {author_name}</div>' if author_name != "Unknown" else ""}
|
||
{f'<div class="snapshot-child">👶 Child: {child_name}</div>' if child_name else ""}
|
||
{f'<div class="snapshot-activity">🎯 Activity: {activity_name}</div>' if activity_name else ""}
|
||
|
||
<div class="snapshot-description">
|
||
<div class="notes-content">{content if content else "<em>No description provided</em>"}</div>
|
||
</div>
|
||
|
||
{await self.format_snapshot_media(snapshot, session)}
|
||
{self.format_snapshot_metadata(snapshot)}
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
return html_content.strip()
|
||
|
||
async def format_snapshot_media(
|
||
self, snapshot: Dict[str, Any], session: aiohttp.ClientSession
|
||
) -> str:
|
||
"""Format media attachments for a snapshot."""
|
||
media_html = ""
|
||
|
||
# Check for media (images and other files)
|
||
media = snapshot.get("media", [])
|
||
images = [m for m in media if m.get("type") == "image"]
|
||
if images:
|
||
media_html += '<div class="snapshot-images">\n'
|
||
media_html += "<h4>📸 Images:</h4>\n"
|
||
media_html += '<div class="image-grid">\n'
|
||
|
||
for image in images:
|
||
# Download the image file
|
||
local_path = await self.download_media_file(session, image)
|
||
image_name = html.escape(image.get("fileName", "Image"))
|
||
|
||
if local_path:
|
||
media_html += '<div class="image-item">\n'
|
||
media_html += f' <img src="{local_path}" alt="{image_name}" loading="lazy">\n'
|
||
media_html += f' <p class="image-caption">{image_name}</p>\n'
|
||
media_html += f' <p class="image-meta">Updated: {self.format_date(image.get("updated", ""))}</p>\n'
|
||
media_html += "</div>\n"
|
||
else:
|
||
# Fallback to API URL if download failed
|
||
image_url = (
|
||
f"{self.api_url}/v1/media/{image.get('id')}/full"
|
||
if image.get("id")
|
||
else ""
|
||
)
|
||
if image_url:
|
||
media_html += '<div class="image-item">\n'
|
||
media_html += f' <img src="{image_url}" alt="{image_name}" loading="lazy">\n'
|
||
media_html += (
|
||
f' <p class="image-caption">{image_name} (online)</p>\n'
|
||
)
|
||
media_html += f' <p class="image-meta">Updated: {self.format_date(image.get("updated", ""))}</p>\n'
|
||
media_html += "</div>\n"
|
||
|
||
media_html += "</div>\n</div>\n"
|
||
|
||
# Check for non-image media as attachments
|
||
attachments = [m for m in media if m.get("type") != "image"]
|
||
if attachments:
|
||
media_html += '<div class="snapshot-attachments">\n'
|
||
media_html += "<h4>📎 Attachments:</h4>\n"
|
||
media_html += '<ul class="attachment-list">\n'
|
||
|
||
for attachment in attachments:
|
||
# Download the attachment file
|
||
local_path = await self.download_media_file(session, attachment)
|
||
attachment_name = html.escape(attachment.get("fileName", "Attachment"))
|
||
attachment_type = attachment.get("mimeType", "unknown")
|
||
|
||
if local_path:
|
||
media_html += f' <li><a href="{local_path}" target="_blank">{attachment_name} ({attachment_type})</a></li>\n'
|
||
else:
|
||
# Fallback to API URL if download failed
|
||
attachment_url = (
|
||
f"{self.api_url}/v1/media/{attachment.get('id')}/full"
|
||
if attachment.get("id")
|
||
else ""
|
||
)
|
||
if attachment_url:
|
||
media_html += f' <li><a href="{attachment_url}" target="_blank">{attachment_name} ({attachment_type}) - online</a></li>\n'
|
||
else:
|
||
media_html += (
|
||
f" <li>{attachment_name} ({attachment_type})</li>\n"
|
||
)
|
||
|
||
media_html += "</ul>\n</div>\n"
|
||
|
||
return media_html
|
||
|
||
def format_snapshot_metadata(self, snapshot: Dict[str, Any]) -> str:
|
||
"""Format additional metadata for a snapshot."""
|
||
metadata_html = '<div class="snapshot-metadata">\n'
|
||
metadata_html += "<h4>ℹ️ Additional Information:</h4>\n"
|
||
metadata_html += '<div class="metadata-grid">\n'
|
||
|
||
# Add any additional fields that might be interesting
|
||
metadata_fields = [
|
||
("code", "Code"),
|
||
("frameworkIndicatorCount", "Framework Indicators"),
|
||
("signed", "Signed Status"),
|
||
("type", "Type"),
|
||
]
|
||
|
||
for field, label in metadata_fields:
|
||
value = snapshot.get(field)
|
||
if value:
|
||
if isinstance(value, list):
|
||
value = ", ".join(str(v) for v in value)
|
||
metadata_html += '<div class="metadata-item">\n'
|
||
metadata_html += (
|
||
f" <strong>{label}:</strong> {html.escape(str(value))}\n"
|
||
)
|
||
metadata_html += "</div>\n"
|
||
|
||
# Raw JSON data (collapsed by default)
|
||
metadata_html += '<details class="raw-data">\n'
|
||
metadata_html += "<summary>🔍 Raw JSON Data</summary>\n"
|
||
metadata_html += '<pre class="json-data">\n'
|
||
metadata_html += html.escape(json.dumps(snapshot, indent=2, default=str))
|
||
metadata_html += "\n</pre>\n"
|
||
metadata_html += "</details>\n"
|
||
|
||
metadata_html += "</div>\n</div>\n"
|
||
return metadata_html
|
||
|
||
def format_date(self, date_string: str) -> str:
|
||
"""Format a date string for display."""
|
||
try:
|
||
# Try to parse ISO format date
|
||
dt = datetime.fromisoformat(date_string.replace("Z", "+00:00"))
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except:
|
||
return date_string
|
||
|
||
async def download_media_file(
|
||
self, session: aiohttp.ClientSession, media: Dict[str, Any]
|
||
) -> Optional[str]:
|
||
"""
|
||
Download a media file to the assets folder.
|
||
|
||
Args:
|
||
session: aiohttp session for making requests
|
||
media: Media dictionary from API
|
||
|
||
Returns:
|
||
Relative path to downloaded file, or None if download failed
|
||
"""
|
||
media_id = media.get("id")
|
||
if not media_id:
|
||
return None
|
||
|
||
filename = media.get("fileName", f"media_{media_id}")
|
||
# Sanitize filename
|
||
filename = self._sanitize_filename(filename)
|
||
|
||
# Check if file already exists
|
||
filepath = self.assets_dir / filename
|
||
if filepath.exists():
|
||
# Return relative path for HTML
|
||
return f"assets/{filename}"
|
||
|
||
# Construct download URL
|
||
download_url = f"{self.api_url}/v1/media/{media_id}/full"
|
||
|
||
try:
|
||
self.logger.info(f"Downloading media file: {filename}")
|
||
|
||
headers = self.get_auth_headers()
|
||
async with session.get(
|
||
download_url, headers=headers, timeout=30
|
||
) as response:
|
||
response.raise_for_status()
|
||
|
||
# Download the file
|
||
async with aiofiles.open(filepath, "wb") as f:
|
||
async for chunk in response.content.iter_chunked(8192):
|
||
await f.write(chunk)
|
||
|
||
self.logger.info(f"Successfully downloaded media: {filename}")
|
||
|
||
# Return relative path for HTML
|
||
return f"assets/{filename}"
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to download media {filename}: {e}")
|
||
return None
|
||
|
||
def _sanitize_filename(self, filename: str) -> str:
|
||
"""Sanitize filename by removing invalid characters."""
|
||
# Remove or replace invalid characters
|
||
invalid_chars = '<>:"/\\|?*'
|
||
for char in invalid_chars:
|
||
filename = filename.replace(char, "_")
|
||
|
||
# Remove leading/trailing spaces and dots
|
||
filename = filename.strip(". ")
|
||
|
||
# Ensure filename is not empty
|
||
if not filename:
|
||
filename = "media_file"
|
||
|
||
return filename
|
||
|
||
async def generate_html_file(
|
||
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
|
||
) -> Path:
|
||
"""
|
||
Generate an HTML file containing all snapshots.
|
||
|
||
Args:
|
||
snapshots: List of snapshot dictionaries
|
||
date_from: Start date
|
||
date_to: End date
|
||
|
||
Returns:
|
||
Path to the generated HTML file
|
||
"""
|
||
# Sort snapshots by start time (newest first)
|
||
sorted_snapshots = sorted(
|
||
snapshots, key=lambda x: x.get("startTime", ""), reverse=True
|
||
)
|
||
|
||
# Generate filename
|
||
filename = f"snapshots_{date_from}_to_{date_to}.html"
|
||
filepath = self.output_dir / filename
|
||
|
||
# Generate HTML content
|
||
html_content = await self.generate_html_template(
|
||
sorted_snapshots, date_from, date_to
|
||
)
|
||
|
||
# Write to file
|
||
with open(filepath, "w", encoding="utf-8") as f:
|
||
f.write(html_content)
|
||
|
||
self.logger.info(f"Generated HTML file: {filepath}")
|
||
self.stats["generated_files"] += 1
|
||
|
||
return filepath
|
||
|
||
async def generate_html_template(
|
||
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
|
||
) -> str:
|
||
"""Generate the complete HTML template."""
|
||
|
||
# Generate individual snapshot HTML
|
||
snapshots_html = ""
|
||
|
||
# Create aiohttp session for media downloads
|
||
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
|
||
timeout = aiohttp.ClientTimeout(total=30)
|
||
|
||
async with aiohttp.ClientSession(
|
||
connector=connector, timeout=timeout
|
||
) as session:
|
||
# Authenticate session for media downloads
|
||
await self.authenticate()
|
||
|
||
for snapshot in snapshots:
|
||
snapshot_html = await self.format_snapshot_html(snapshot, session)
|
||
snapshots_html += snapshot_html
|
||
snapshots_html += "\n\n"
|
||
|
||
# Create the complete HTML document
|
||
html_template = f"""<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>ParentZone Snapshots Backup - {date_from} to {date_to}</title>
|
||
<style>
|
||
{self.get_css_styles()}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<header class="page-header">
|
||
<h1>ParentZone Snapshots Backup</h1>
|
||
<div class="date-range">
|
||
<strong>Period:</strong> {date_from} to {date_to}
|
||
</div>
|
||
<div class="stats">
|
||
<span class="stat-item">Total Snapshots: <strong>{len(snapshots)}</strong></span>
|
||
<span class="stat-item">Generated: <strong>{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</strong></span>
|
||
</div>
|
||
</header>
|
||
|
||
<nav class="navigation">
|
||
<button onclick="toggleAllDetails()">Toggle All Details</button>
|
||
<input type="text" id="searchBox" placeholder="Search snapshots..." onkeyup="searchSnapshots()">
|
||
</nav>
|
||
|
||
<main class="snapshots-container">
|
||
{snapshots_html}
|
||
</main>
|
||
|
||
<footer class="page-footer">
|
||
<p>Generated by ParentZone Snapshot Downloader</p>
|
||
<p>Total snapshots: {len(snapshots)} | Pages fetched: {self.stats["pages_fetched"]}</p>
|
||
</footer>
|
||
</div>
|
||
|
||
<script>
|
||
{self.get_javascript_functions()}
|
||
</script>
|
||
</body>
|
||
</html>"""
|
||
|
||
return html_template
|
||
|
||
def get_css_styles(self) -> str:
|
||
"""Get CSS styles for the HTML file."""
|
||
return """
|
||
* {
|
||
margin: 0;
|
||
padding: 0;
|
||
box-sizing: border-box;
|
||
}
|
||
|
||
body {
|
||
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
|
||
line-height: 1.6;
|
||
color: #495057;
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 50%, #f1f3f4 100%);
|
||
min-height: 100vh;
|
||
}
|
||
|
||
.container {
|
||
max-width: 1200px;
|
||
margin: 0 auto;
|
||
padding: 20px;
|
||
}
|
||
|
||
.page-header {
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
|
||
padding: 30px;
|
||
border-radius: 15px;
|
||
margin-bottom: 30px;
|
||
box-shadow: 0 4px 20px rgba(108, 117, 125, 0.15);
|
||
border: 2px solid #dee2e6;
|
||
text-align: center;
|
||
}
|
||
|
||
.page-header h1 {
|
||
color: #495057;
|
||
margin-bottom: 10px;
|
||
font-size: 2.5em;
|
||
font-weight: 600;
|
||
}
|
||
|
||
.date-range {
|
||
font-size: 1.2em;
|
||
color: #6c757d;
|
||
margin-bottom: 15px;
|
||
}
|
||
|
||
.stats {
|
||
display: flex;
|
||
justify-content: center;
|
||
gap: 20px;
|
||
flex-wrap: wrap;
|
||
}
|
||
|
||
.stat-item {
|
||
color: #495057;
|
||
font-size: 1.1em;
|
||
}
|
||
|
||
.navigation {
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
|
||
padding: 20px;
|
||
border-radius: 15px;
|
||
margin-bottom: 20px;
|
||
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
|
||
display: flex;
|
||
gap: 20px;
|
||
align-items: center;
|
||
flex-wrap: wrap;
|
||
}
|
||
|
||
.navigation button {
|
||
background: linear-gradient(135deg, #6c757d 0%, #495057 100%);
|
||
color: white;
|
||
border: none;
|
||
padding: 10px 20px;
|
||
border-radius: 12px;
|
||
cursor: pointer;
|
||
font-size: 1em;
|
||
}
|
||
|
||
.navigation button:hover {
|
||
background: linear-gradient(135deg, #495057 0%, #343a40 100%);
|
||
}
|
||
|
||
.navigation input {
|
||
flex: 1;
|
||
padding: 10px;
|
||
border: 2px solid #e0e0e0;
|
||
border-radius: 12px;
|
||
font-size: 1em;
|
||
}
|
||
|
||
.navigation input:focus {
|
||
outline: none;
|
||
border-color: #6c757d;
|
||
}
|
||
|
||
.snapshots-container {
|
||
display: flex;
|
||
flex-direction: column;
|
||
gap: 20px;
|
||
}
|
||
|
||
.snapshot {
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
|
||
border-radius: 15px;
|
||
padding: 25px;
|
||
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
|
||
transition: transform 0.2s ease, box-shadow 0.2s ease;
|
||
}
|
||
|
||
.snapshot:hover {
|
||
transform: translateY(-2px);
|
||
box-shadow: 0 4px 20px rgba(96, 125, 139, 0.15);
|
||
}
|
||
|
||
.snapshot-header {
|
||
margin-bottom: 20px;
|
||
border-bottom: 2px solid #e8eaf0;
|
||
padding-bottom: 15px;
|
||
}
|
||
|
||
.snapshot-title {
|
||
color: #495057;
|
||
font-size: 1.8em;
|
||
margin-bottom: 10px;
|
||
}
|
||
|
||
.snapshot-meta {
|
||
display: flex;
|
||
gap: 20px;
|
||
flex-wrap: wrap;
|
||
color: #6c757d;
|
||
font-size: 0.9em;
|
||
}
|
||
|
||
.snapshot-content > div {
|
||
margin-bottom: 15px;
|
||
}
|
||
|
||
.snapshot-author, .snapshot-child, .snapshot-activity {
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
|
||
padding: 10px;
|
||
border-radius: 12px;
|
||
font-weight: 500;
|
||
}
|
||
|
||
.snapshot-description {
|
||
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
|
||
padding: 20px;
|
||
border-radius: 12px;
|
||
border-left: 4px solid #6c757d;
|
||
}
|
||
|
||
.snapshot-description p {
|
||
margin-bottom: 10px;
|
||
line-height: 1.6;
|
||
}
|
||
|
||
.snapshot-description p:last-child {
|
||
margin-bottom: 0;
|
||
}
|
||
|
||
.snapshot-description br {
|
||
display: block;
|
||
margin: 10px 0;
|
||
content: " ";
|
||
}
|
||
|
||
.snapshot-description strong {
|
||
font-weight: bold;
|
||
color: #495057;
|
||
}
|
||
|
||
.snapshot-description em {
|
||
font-style: italic;
|
||
color: #6c757d;
|
||
}
|
||
|
||
.snapshot-description .notes-content {
|
||
/* Container for HTML notes content */
|
||
word-wrap: break-word;
|
||
overflow-wrap: break-word;
|
||
}
|
||
|
||
.snapshot-description span[style] {
|
||
/* Preserve inline styles from the notes HTML */
|
||
}
|
||
|
||
.snapshot-images {
|
||
margin: 20px 0;
|
||
}
|
||
|
||
.image-grid {
|
||
display: grid;
|
||
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
|
||
gap: 15px;
|
||
margin-top: 10px;
|
||
}
|
||
|
||
.image-item {
|
||
text-align: center;
|
||
}
|
||
|
||
.image-item img {
|
||
max-width: 100%;
|
||
height: auto;
|
||
border-radius: 12px;
|
||
box-shadow: 0 2px 8px rgba(96, 125, 139, 0.1);
|
||
max-height: 400px;
|
||
object-fit: contain;
|
||
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
|
||
}
|
||
|
||
.image-caption {
|
||
margin-top: 5px;
|
||
font-size: 0.9em;
|
||
color: #6c757d;
|
||
font-weight: 500;
|
||
}
|
||
|
||
.image-meta {
|
||
margin-top: 3px;
|
||
font-size: 0.8em;
|
||
color: #95a5a6;
|
||
font-style: italic;
|
||
}
|
||
|
||
.snapshot-attachments {
|
||
margin: 20px 0;
|
||
}
|
||
|
||
.attachment-list {
|
||
list-style: none;
|
||
padding-left: 0;
|
||
}
|
||
|
||
.attachment-list li {
|
||
padding: 8px 0;
|
||
border-bottom: 1px solid #e8eaf0;
|
||
}
|
||
|
||
.attachment-list a {
|
||
color: #495057;
|
||
text-decoration: none;
|
||
}
|
||
|
||
.attachment-list a:hover {
|
||
text-decoration: underline;
|
||
}
|
||
|
||
.snapshot-metadata {
|
||
margin-top: 20px;
|
||
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
|
||
padding: 20px;
|
||
border-radius: 12px;
|
||
}
|
||
|
||
.metadata-grid {
|
||
display: grid;
|
||
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
|
||
gap: 10px;
|
||
margin-top: 10px;
|
||
}
|
||
|
||
.metadata-item {
|
||
padding: 8px 0;
|
||
}
|
||
|
||
.raw-data {
|
||
margin-top: 15px;
|
||
}
|
||
|
||
.raw-data summary {
|
||
cursor: pointer;
|
||
font-weight: bold;
|
||
padding: 5px 0;
|
||
}
|
||
|
||
.json-data {
|
||
background: #2c3e50;
|
||
color: #ecf0f1;
|
||
padding: 15px;
|
||
border-radius: 12px;
|
||
overflow-x: auto;
|
||
font-size: 0.9em;
|
||
margin-top: 10px;
|
||
}
|
||
|
||
.page-footer {
|
||
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
|
||
padding: 20px;
|
||
border-radius: 15px;
|
||
margin-top: 30px;
|
||
text-align: center;
|
||
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
|
||
color: #6c757d;
|
||
}
|
||
|
||
h4 {
|
||
color: #495057;
|
||
margin-bottom: 10px;
|
||
}
|
||
|
||
@media (max-width: 768px) {
|
||
.container {
|
||
padding: 10px;
|
||
}
|
||
|
||
.page-header h1 {
|
||
font-size: 2em;
|
||
}
|
||
|
||
.navigation {
|
||
flex-direction: column;
|
||
}
|
||
|
||
.stats {
|
||
flex-direction: column;
|
||
gap: 10px;
|
||
}
|
||
|
||
.snapshot-meta {
|
||
flex-direction: column;
|
||
gap: 5px;
|
||
}
|
||
}
|
||
"""
|
||
|
||
def get_javascript_functions(self) -> str:
|
||
"""Get JavaScript functions for the HTML file."""
|
||
return """
|
||
function toggleAllDetails() {
|
||
const details = document.querySelectorAll('details');
|
||
const allOpen = Array.from(details).every(detail => detail.open);
|
||
|
||
details.forEach(detail => {
|
||
detail.open = !allOpen;
|
||
});
|
||
}
|
||
|
||
function searchSnapshots() {
|
||
const searchTerm = document.getElementById('searchBox').value.toLowerCase();
|
||
const snapshots = document.querySelectorAll('.snapshot');
|
||
|
||
snapshots.forEach(snapshot => {
|
||
const text = snapshot.textContent.toLowerCase();
|
||
if (text.includes(searchTerm)) {
|
||
snapshot.style.display = 'block';
|
||
} else {
|
||
snapshot.style.display = 'none';
|
||
}
|
||
});
|
||
}
|
||
|
||
// Add smooth scrolling for internal links
|
||
document.addEventListener('DOMContentLoaded', function() {
|
||
// Add click handlers for snapshot titles to make them collapsible
|
||
const titles = document.querySelectorAll('.snapshot-title');
|
||
titles.forEach(title => {
|
||
title.style.cursor = 'pointer';
|
||
title.addEventListener('click', function() {
|
||
const content = this.closest('.snapshot').querySelector('.snapshot-content');
|
||
if (content.style.display === 'none') {
|
||
content.style.display = 'block';
|
||
this.style.opacity = '1';
|
||
} else {
|
||
content.style.display = 'none';
|
||
this.style.opacity = '0.7';
|
||
}
|
||
});
|
||
});
|
||
});
|
||
"""
|
||
|
||
async def download_snapshots(
|
||
self,
|
||
type_ids: List[int] = [15],
|
||
date_from: str = None,
|
||
date_to: str = None,
|
||
max_pages: int = None,
|
||
) -> Path:
|
||
"""
|
||
Download all snapshots and generate HTML file.
|
||
|
||
Args:
|
||
type_ids: List of type IDs to filter by (default: [15])
|
||
date_from: Start date in YYYY-MM-DD format
|
||
date_to: End date in YYYY-MM-DD format
|
||
max_pages: Maximum number of pages to fetch
|
||
|
||
Returns:
|
||
Path to generated HTML file
|
||
"""
|
||
# Set default dates if not provided
|
||
if date_from is None:
|
||
# Default to 1 year ago
|
||
date_from = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
|
||
if date_to is None:
|
||
date_to = datetime.now().strftime("%Y-%m-%d")
|
||
|
||
self.logger.info(
|
||
f"Starting snapshot download for period {date_from} to {date_to}"
|
||
)
|
||
|
||
# Create aiohttp session
|
||
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
|
||
timeout = aiohttp.ClientTimeout(total=30)
|
||
|
||
async with aiohttp.ClientSession(
|
||
connector=connector, timeout=timeout
|
||
) as session:
|
||
try:
|
||
# Authenticate if needed
|
||
await self.authenticate()
|
||
|
||
# Fetch all snapshots
|
||
snapshots = await self.fetch_all_snapshots(
|
||
session, type_ids, date_from, date_to, max_pages
|
||
)
|
||
|
||
if not snapshots:
|
||
self.logger.warning("No snapshots found for the specified period")
|
||
return None
|
||
|
||
# Generate HTML file
|
||
html_file = await self.generate_html_file(snapshots, date_from, date_to)
|
||
|
||
# Print statistics
|
||
self.print_statistics()
|
||
|
||
return html_file
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error during snapshot download: {e}")
|
||
raise
|
||
|
||
def print_statistics(self):
|
||
"""Print download statistics."""
|
||
print("\n" + "=" * 60)
|
||
print("SNAPSHOT DOWNLOAD STATISTICS")
|
||
print("=" * 60)
|
||
print(f"Total snapshots downloaded: {self.stats['total_snapshots']}")
|
||
print(f"Pages fetched: {self.stats['pages_fetched']}")
|
||
print(f"Failed requests: {self.stats['failed_requests']}")
|
||
print(f"Generated files: {self.stats['generated_files']}")
|
||
print("=" * 60)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Download ParentZone snapshots and generate HTML report",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
# Download snapshots using API key
|
||
python3 snapshot_downloader.py --api-key YOUR_API_KEY
|
||
|
||
# Download snapshots using login credentials
|
||
python3 snapshot_downloader.py --email user@example.com --password password
|
||
|
||
# Download snapshots for specific date range
|
||
python3 snapshot_downloader.py --api-key KEY --date-from 2024-01-01 --date-to 2024-12-31
|
||
|
||
# Download only first 5 cursor pages (for testing)
|
||
python3 snapshot_downloader.py --api-key KEY --max-pages 5
|
||
|
||
# Specify output directory
|
||
python3 snapshot_downloader.py --api-key KEY --output-dir ./my_snapshots
|
||
""",
|
||
)
|
||
|
||
parser.add_argument("--api-key", help="API key for authentication")
|
||
|
||
parser.add_argument("--email", help="Email for login authentication")
|
||
|
||
parser.add_argument("--password", help="Password for login authentication")
|
||
|
||
parser.add_argument(
|
||
"--date-from", help="Start date in YYYY-MM-DD format (default: 1 year ago)"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--date-to", help="End date in YYYY-MM-DD format (default: today)"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--type-ids",
|
||
nargs="+",
|
||
type=int,
|
||
default=[15],
|
||
help="Type IDs to filter by (default: [15])",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--output-dir",
|
||
default="snapshots",
|
||
help="Directory to save snapshot files (default: snapshots)",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--max-pages",
|
||
type=int,
|
||
help="Maximum number of cursor pages to fetch (for testing)",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--api-url",
|
||
default="https://api.parentzone.me",
|
||
help="ParentZone API URL (default: https://api.parentzone.me)",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--debug",
|
||
action="store_true",
|
||
help="Enable debug mode with detailed server response logging",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Validate authentication
|
||
if not args.api_key and not (args.email and args.password):
|
||
print("Error: Either --api-key or both --email and --password must be provided")
|
||
return 1
|
||
|
||
if args.email and not args.password:
|
||
print("Error: Password is required when using email authentication")
|
||
return 1
|
||
|
||
if args.password and not args.email:
|
||
print("Error: Email is required when using password authentication")
|
||
return 1
|
||
|
||
try:
|
||
# Create downloader
|
||
downloader = SnapshotDownloader(
|
||
api_url=args.api_url,
|
||
output_dir=args.output_dir,
|
||
api_key=args.api_key,
|
||
email=args.email,
|
||
password=args.password,
|
||
debug_mode=args.debug,
|
||
)
|
||
|
||
if args.debug:
|
||
print("🔍 DEBUG MODE ENABLED - Detailed server responses will be printed")
|
||
|
||
# Download snapshots
|
||
html_file = asyncio.run(
|
||
downloader.download_snapshots(
|
||
type_ids=args.type_ids,
|
||
date_from=args.date_from,
|
||
date_to=args.date_to,
|
||
max_pages=args.max_pages,
|
||
)
|
||
)
|
||
|
||
if html_file:
|
||
print(f"\n✅ Success! Snapshots downloaded and saved to: {html_file}")
|
||
print("📁 Open the file in your browser to view the snapshots")
|
||
else:
|
||
print("⚠️ No snapshots were found for the specified period")
|
||
|
||
return 0
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ Download interrupted by user")
|
||
return 1
|
||
except Exception as e:
|
||
print(f"❌ Error: {e}")
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(main())
|