#!/usr/bin/env python3
"""
Snapshot Downloader for ParentZone
This script downloads snapshots (daily events) from the ParentZone API with pagination support
and generates a comprehensive markup file containing all the snapshot information.
"""
import argparse
import asyncio
import aiohttp
import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
from urllib.parse import urlencode, urljoin
import html
import aiofiles
# Import the auth manager
try:
from src.auth_manager import AuthManager
except ImportError:
AuthManager = None
class SnapshotDownloader:
def __init__(
self,
api_url: str = "https://api.parentzone.me",
output_dir: str = "snapshots",
api_key: str = None,
email: str = None,
password: str = None,
debug_mode: bool = False,
):
"""
Initialize the snapshot downloader.
Args:
api_url: Base URL of the API
output_dir: Directory to save the snapshot files
api_key: API key for authentication
email: Email for login authentication
password: Password for login authentication
debug_mode: Enable detailed server response logging
"""
self.api_url = api_url.rstrip("/")
self.snapshots_endpoint = "/v1/posts"
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Create assets subfolder for media downloads
self.assets_dir = self.output_dir / "assets"
self.assets_dir.mkdir(parents=True, exist_ok=True)
# Authentication
self.api_key = api_key
self.email = email
self.password = password
self.auth_manager = None
self.debug_mode = debug_mode
# Setup logging
self.setup_logging()
# Standard headers based on the curl command
self.headers = {
"accept": "application/json, text/plain, */*",
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8,ro;q=0.7",
"origin": "https://parentzone.me",
"priority": "u=1, i",
"sec-ch-ua": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"x-client-version": "3.54.0",
}
# Statistics
self.stats = {
"total_snapshots": 0,
"pages_fetched": 0,
"failed_requests": 0,
"generated_files": 0,
}
def setup_logging(self):
"""Setup logging configuration."""
log_file = self.output_dir / "snapshots.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
)
self.logger = logging.getLogger(__name__)
async def authenticate(self):
"""Perform authentication if credentials are provided."""
if self.email and self.password and AuthManager:
self.logger.info("Attempting login authentication...")
self.auth_manager = AuthManager(self.api_url)
success = await self.auth_manager.login(self.email, self.password)
if success:
self.logger.info("Login authentication successful")
# Use the API key from auth manager
auth_headers = self.auth_manager.get_auth_headers()
if "x-api-key" in auth_headers:
self.api_key = auth_headers["x-api-key"]
else:
self.logger.error("Login authentication failed")
raise Exception("Login authentication failed")
def get_auth_headers(self) -> Dict[str, str]:
"""Get headers with authentication."""
headers = self.headers.copy()
if self.api_key:
headers["x-api-key"] = self.api_key
elif self.auth_manager and self.auth_manager.is_authenticated():
headers.update(self.auth_manager.get_auth_headers())
return headers
async def fetch_snapshots_page(
self,
session: aiohttp.ClientSession,
type_ids: List[int] = [15],
date_from: str = "2021-10-18",
date_to: str = None,
cursor: str = None,
per_page: int = 100,
) -> Dict[str, Any]:
"""
Fetch a single page of snapshots from the API using cursor-based pagination.
Args:
session: aiohttp session
type_ids: List of type IDs to filter by
date_from: Start date in YYYY-MM-DD format
date_to: End date in YYYY-MM-DD format
cursor: Cursor for pagination (None for first page)
per_page: Number of items per page
Returns:
Dictionary containing the API response
"""
if date_to is None:
date_to = datetime.now().strftime("%Y-%m-%d")
# Build query parameters
params = {
"dateFrom": date_from,
"dateTo": date_to,
}
# Add cursor for pagination (skip for first request)
if cursor:
params["cursor"] = cursor
# Add type IDs - API expects typeIDs[]=15 format
for type_id in type_ids:
params[f"typeIDs[]"] = type_id
# Build URL with parameters
query_string = urlencode(params, doseq=True)
url = f"{self.api_url}{self.snapshots_endpoint}?{query_string}"
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
self.logger.info(f"Fetching snapshots ({page_info}): {url}")
headers = self.get_auth_headers()
try:
async with session.get(url, headers=headers, timeout=30) as response:
response.raise_for_status()
data = await response.json()
# Print detailed response information for debugging if enabled
if self.debug_mode:
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
print(f"\n=== SERVER RESPONSE DEBUG ({page_info}) ===")
print(f"Status Code: {response.status}")
print(f"Headers: {dict(response.headers)}")
print(f"Response Type: {type(data)}")
print(
f"Response Keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}"
)
print(f"Posts count: {len(data.get('posts', []))}")
print(f"Cursor: {data.get('cursor', 'None')}")
if (
len(data.get("posts", [])) <= 3
): # Only print full data if few posts
print(f"Full Response Data:")
print(json.dumps(data, indent=2, default=str))
print("=" * 50)
# The API returns snapshots in 'posts' field
snapshots = data.get("posts", [])
cursor_value = data.get("cursor")
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
self.logger.info(f"Retrieved {len(snapshots)} snapshots ({page_info})")
self.stats["pages_fetched"] += 1
# Return the actual API response format
return data
except Exception as e:
page_info = f"cursor: {cursor[:20]}..." if cursor else "first page"
self.logger.error(f"Failed to fetch snapshots ({page_info}): {e}")
self.stats["failed_requests"] += 1
raise
async def fetch_all_snapshots(
self,
session: aiohttp.ClientSession,
type_ids: List[int] = [15],
date_from: str = "2021-10-18",
date_to: str = None,
max_pages: int = None,
) -> List[Dict[str, Any]]:
"""
Fetch all snapshots across all pages using cursor-based pagination.
Args:
session: aiohttp session
type_ids: List of type IDs to filter by
date_from: Start date in YYYY-MM-DD format
date_to: End date in YYYY-MM-DD format
max_pages: Maximum number of pages to fetch (for testing)
Returns:
List of all snapshot dictionaries
"""
all_snapshots = []
cursor = None
page_count = 0
self.logger.info(
f"Starting snapshot fetch from {date_from} to {date_to or 'now'}"
)
while True:
page_count += 1
if max_pages and page_count > max_pages:
self.logger.info(f"Reached maximum pages limit: {max_pages}")
break
try:
response = await self.fetch_snapshots_page(
session, type_ids, date_from, date_to, cursor
)
# Extract snapshots from response
snapshots = response.get("posts", [])
new_cursor = response.get("cursor")
if not snapshots:
self.logger.info("No more snapshots found (empty posts array)")
break
all_snapshots.extend(snapshots)
self.stats["total_snapshots"] += len(snapshots)
self.logger.info(
f"Page {page_count}: {len(snapshots)} snapshots (total: {len(all_snapshots)})"
)
# If no cursor returned, we've reached the end
if not new_cursor:
self.logger.info("Reached last page (no cursor returned)")
break
# Update cursor for next iteration
cursor = new_cursor
except Exception as e:
self.logger.error(f"Error fetching page {page_count}: {e}")
break
self.logger.info(f"Total snapshots fetched: {len(all_snapshots)}")
return all_snapshots
async def format_snapshot_html(
self, snapshot: Dict[str, Any], session: aiohttp.ClientSession
) -> str:
"""
Format a single snapshot as HTML.
Args:
snapshot: Snapshot dictionary from API
Returns:
HTML string for the snapshot
"""
# Extract key information from ParentZone snapshot format
snapshot_id = snapshot.get("id", "unknown")
content = snapshot.get("notes", "") # Don't escape HTML in notes field
start_time = snapshot.get("startTime", "")
snapshot_type = snapshot.get("type", "Snapshot")
# Format dates
start_date = self.format_date(start_time) if start_time else "Unknown"
# Extract additional information
author = snapshot.get("author", {})
author_forename = author.get("forename", "") if author else ""
author_surname = author.get("surname", "") if author else ""
author_name = (
html.escape(f"{author_forename} {author_surname}".strip())
if author
else "Unknown"
)
# Extract child information (if any)
child = snapshot.get("child", {})
child_forename = child.get("forename", "") if child else ""
child_name = (
html.escape(
f"{child.get('forename', '')} {child.get('surname', '')}".strip()
)
if child
else ""
)
# Create title in format: "Child Forename by Author Forename Surname"
if child_forename and author_forename:
title = html.escape(
f"{child_forename} by {author_forename} {author_surname}".strip()
)
else:
title = html.escape(f"Snapshot {snapshot_id}")
# Extract location/activity information
activity = snapshot.get("activity", {})
activity_name = html.escape(activity.get("name", "")) if activity else ""
# Build HTML
html_content = f"""
{f'
š¤ Author: {author_name}
' if author_name != "Unknown" else ""}
{f'
š¶ Child: {child_name}
' if child_name else ""}
{f'
šÆ Activity: {activity_name}
' if activity_name else ""}
{content if content else "No description provided "}
{await self.format_snapshot_media(snapshot, session)}
{self.format_snapshot_metadata(snapshot)}
"""
return html_content.strip()
async def format_snapshot_media(
self, snapshot: Dict[str, Any], session: aiohttp.ClientSession
) -> str:
"""Format media attachments for a snapshot."""
media_html = ""
# Check for media (images and other files)
media = snapshot.get("media", [])
images = [m for m in media if m.get("type") == "image"]
if images:
media_html += '\n'
media_html += "
šø Images: \n"
media_html += '
\n'
for image in images:
# Download the image file
local_path = await self.download_media_file(session, image)
image_name = html.escape(image.get("fileName", "Image"))
if local_path:
media_html += f'
\n'
media_html += f'
\n'
media_html += f'
{image_name}
\n'
media_html += f'
Updated: {self.format_date(image.get("updated", ""))}
\n'
media_html += f"
\n"
else:
# Fallback to API URL if download failed
image_url = (
f"{self.api_url}/v1/media/{image.get('id')}/full"
if image.get("id")
else ""
)
if image_url:
media_html += f'
\n'
media_html += f'
\n'
media_html += (
f'
{image_name} (online)
\n'
)
media_html += f'
Updated: {self.format_date(image.get("updated", ""))}
\n'
media_html += f"
\n"
media_html += "
\n
\n"
# Check for non-image media as attachments
attachments = [m for m in media if m.get("type") != "image"]
if attachments:
media_html += '\n'
media_html += "
š Attachments: \n"
media_html += '
\n'
for attachment in attachments:
# Download the attachment file
local_path = await self.download_media_file(session, attachment)
attachment_name = html.escape(attachment.get("fileName", "Attachment"))
attachment_type = attachment.get("mimeType", "unknown")
if local_path:
media_html += f' {attachment_name} ({attachment_type}) \n'
else:
# Fallback to API URL if download failed
attachment_url = (
f"{self.api_url}/v1/media/{attachment.get('id')}/full"
if attachment.get("id")
else ""
)
if attachment_url:
media_html += f' {attachment_name} ({attachment_type}) - online \n'
else:
media_html += (
f" {attachment_name} ({attachment_type}) \n"
)
media_html += " \n
\n"
return media_html
def format_snapshot_metadata(self, snapshot: Dict[str, Any]) -> str:
"""Format additional metadata for a snapshot."""
metadata_html = '\n"
return metadata_html
def format_date(self, date_string: str) -> str:
"""Format a date string for display."""
try:
# Try to parse ISO format date
dt = datetime.fromisoformat(date_string.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return date_string
async def download_media_file(
self, session: aiohttp.ClientSession, media: Dict[str, Any]
) -> Optional[str]:
"""
Download a media file to the assets folder.
Args:
session: aiohttp session for making requests
media: Media dictionary from API
Returns:
Relative path to downloaded file, or None if download failed
"""
media_id = media.get("id")
if not media_id:
return None
filename = media.get("fileName", f"media_{media_id}")
# Sanitize filename
filename = self._sanitize_filename(filename)
# Check if file already exists
filepath = self.assets_dir / filename
if filepath.exists():
# Return relative path for HTML
return f"assets/{filename}"
# Construct download URL
download_url = f"{self.api_url}/v1/media/{media_id}/full"
try:
self.logger.info(f"Downloading media file: {filename}")
headers = self.get_auth_headers()
async with session.get(
download_url, headers=headers, timeout=30
) as response:
response.raise_for_status()
# Download the file
async with aiofiles.open(filepath, "wb") as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
self.logger.info(f"Successfully downloaded media: {filename}")
# Return relative path for HTML
return f"assets/{filename}"
except Exception as e:
self.logger.error(f"Failed to download media {filename}: {e}")
return None
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename by removing invalid characters."""
# Remove or replace invalid characters
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, "_")
# Remove leading/trailing spaces and dots
filename = filename.strip(". ")
# Ensure filename is not empty
if not filename:
filename = "media_file"
return filename
async def generate_html_file(
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
) -> Path:
"""
Generate an HTML file containing all snapshots.
Args:
snapshots: List of snapshot dictionaries
date_from: Start date
date_to: End date
Returns:
Path to the generated HTML file
"""
# Sort snapshots by start time (newest first)
sorted_snapshots = sorted(
snapshots, key=lambda x: x.get("startTime", ""), reverse=True
)
# Generate filename
filename = f"snapshots_{date_from}_to_{date_to}.html"
filepath = self.output_dir / filename
# Generate HTML content
html_content = await self.generate_html_template(
sorted_snapshots, date_from, date_to
)
# Write to file
with open(filepath, "w", encoding="utf-8") as f:
f.write(html_content)
self.logger.info(f"Generated HTML file: {filepath}")
self.stats["generated_files"] += 1
return filepath
async def generate_html_template(
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
) -> str:
"""Generate the complete HTML template."""
# Generate individual snapshot HTML
snapshots_html = ""
# Create aiohttp session for media downloads
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector, timeout=timeout
) as session:
# Authenticate session for media downloads
await self.authenticate()
for snapshot in snapshots:
snapshot_html = await self.format_snapshot_html(snapshot, session)
snapshots_html += snapshot_html
snapshots_html += "\n\n"
# Create the complete HTML document
html_template = f"""
ParentZone Snapshots Backup - {date_from} to {date_to}
Toggle All Details
{snapshots_html}
"""
return html_template
def get_css_styles(self) -> str:
"""Get CSS styles for the HTML file."""
return """
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.6;
color: #495057;
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 50%, #f1f3f4 100%);
min-height: 100vh;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.page-header {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
padding: 30px;
border-radius: 15px;
margin-bottom: 30px;
box-shadow: 0 4px 20px rgba(108, 117, 125, 0.15);
border: 2px solid #dee2e6;
text-align: center;
}
.page-header h1 {
color: #495057;
margin-bottom: 10px;
font-size: 2.5em;
font-weight: 600;
}
.date-range {
font-size: 1.2em;
color: #6c757d;
margin-bottom: 15px;
}
.stats {
display: flex;
justify-content: center;
gap: 20px;
flex-wrap: wrap;
}
.stat-item {
color: #495057;
font-size: 1.1em;
}
.navigation {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
padding: 20px;
border-radius: 15px;
margin-bottom: 20px;
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
display: flex;
gap: 20px;
align-items: center;
flex-wrap: wrap;
}
.navigation button {
background: linear-gradient(135deg, #6c757d 0%, #495057 100%);
color: white;
border: none;
padding: 10px 20px;
border-radius: 12px;
cursor: pointer;
font-size: 1em;
}
.navigation button:hover {
background: linear-gradient(135deg, #495057 0%, #343a40 100%);
}
.navigation input {
flex: 1;
padding: 10px;
border: 2px solid #e0e0e0;
border-radius: 12px;
font-size: 1em;
}
.navigation input:focus {
outline: none;
border-color: #6c757d;
}
.snapshots-container {
display: flex;
flex-direction: column;
gap: 20px;
}
.snapshot {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
border-radius: 15px;
padding: 25px;
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
transition: transform 0.2s ease, box-shadow 0.2s ease;
}
.snapshot:hover {
transform: translateY(-2px);
box-shadow: 0 4px 20px rgba(96, 125, 139, 0.15);
}
.snapshot-header {
margin-bottom: 20px;
border-bottom: 2px solid #e8eaf0;
padding-bottom: 15px;
}
.snapshot-title {
color: #495057;
font-size: 1.8em;
margin-bottom: 10px;
}
.snapshot-meta {
display: flex;
gap: 20px;
flex-wrap: wrap;
color: #6c757d;
font-size: 0.9em;
}
.snapshot-content > div {
margin-bottom: 15px;
}
.snapshot-author, .snapshot-child, .snapshot-activity {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
padding: 10px;
border-radius: 12px;
font-weight: 500;
}
.snapshot-description {
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
padding: 20px;
border-radius: 12px;
border-left: 4px solid #6c757d;
}
.snapshot-description p {
margin-bottom: 10px;
line-height: 1.6;
}
.snapshot-description p:last-child {
margin-bottom: 0;
}
.snapshot-description br {
display: block;
margin: 10px 0;
content: " ";
}
.snapshot-description strong {
font-weight: bold;
color: #495057;
}
.snapshot-description em {
font-style: italic;
color: #6c757d;
}
.snapshot-description .notes-content {
/* Container for HTML notes content */
word-wrap: break-word;
overflow-wrap: break-word;
}
.snapshot-description span[style] {
/* Preserve inline styles from the notes HTML */
}
.snapshot-images {
margin: 20px 0;
}
.image-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 10px;
}
.image-item {
text-align: center;
}
.image-item img {
max-width: 100%;
height: auto;
border-radius: 12px;
box-shadow: 0 2px 8px rgba(96, 125, 139, 0.1);
max-height: 400px;
object-fit: contain;
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
}
.image-caption {
margin-top: 5px;
font-size: 0.9em;
color: #6c757d;
font-weight: 500;
}
.image-meta {
margin-top: 3px;
font-size: 0.8em;
color: #95a5a6;
font-style: italic;
}
.snapshot-attachments {
margin: 20px 0;
}
.attachment-list {
list-style: none;
padding-left: 0;
}
.attachment-list li {
padding: 8px 0;
border-bottom: 1px solid #e8eaf0;
}
.attachment-list a {
color: #495057;
text-decoration: none;
}
.attachment-list a:hover {
text-decoration: underline;
}
.snapshot-metadata {
margin-top: 20px;
background: linear-gradient(135deg, #fafbfc 0%, #f0f8ff 100%);
padding: 20px;
border-radius: 12px;
}
.metadata-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 10px;
margin-top: 10px;
}
.metadata-item {
padding: 8px 0;
}
.raw-data {
margin-top: 15px;
}
.raw-data summary {
cursor: pointer;
font-weight: bold;
padding: 5px 0;
}
.json-data {
background: #2c3e50;
color: #ecf0f1;
padding: 15px;
border-radius: 12px;
overflow-x: auto;
font-size: 0.9em;
margin-top: 10px;
}
.page-footer {
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
padding: 20px;
border-radius: 15px;
margin-top: 30px;
text-align: center;
box-shadow: 0 2px 10px rgba(96, 125, 139, 0.1);
color: #6c757d;
}
h4 {
color: #495057;
margin-bottom: 10px;
}
@media (max-width: 768px) {
.container {
padding: 10px;
}
.page-header h1 {
font-size: 2em;
}
.navigation {
flex-direction: column;
}
.stats {
flex-direction: column;
gap: 10px;
}
.snapshot-meta {
flex-direction: column;
gap: 5px;
}
}
"""
def get_javascript_functions(self) -> str:
"""Get JavaScript functions for the HTML file."""
return """
function toggleAllDetails() {
const details = document.querySelectorAll('details');
const allOpen = Array.from(details).every(detail => detail.open);
details.forEach(detail => {
detail.open = !allOpen;
});
}
function searchSnapshots() {
const searchTerm = document.getElementById('searchBox').value.toLowerCase();
const snapshots = document.querySelectorAll('.snapshot');
snapshots.forEach(snapshot => {
const text = snapshot.textContent.toLowerCase();
if (text.includes(searchTerm)) {
snapshot.style.display = 'block';
} else {
snapshot.style.display = 'none';
}
});
}
// Add smooth scrolling for internal links
document.addEventListener('DOMContentLoaded', function() {
// Add click handlers for snapshot titles to make them collapsible
const titles = document.querySelectorAll('.snapshot-title');
titles.forEach(title => {
title.style.cursor = 'pointer';
title.addEventListener('click', function() {
const content = this.closest('.snapshot').querySelector('.snapshot-content');
if (content.style.display === 'none') {
content.style.display = 'block';
this.style.opacity = '1';
} else {
content.style.display = 'none';
this.style.opacity = '0.7';
}
});
});
});
"""
async def download_snapshots(
self,
type_ids: List[int] = [15],
date_from: str = None,
date_to: str = None,
max_pages: int = None,
) -> Path:
"""
Download all snapshots and generate HTML file.
Args:
type_ids: List of type IDs to filter by (default: [15])
date_from: Start date in YYYY-MM-DD format
date_to: End date in YYYY-MM-DD format
max_pages: Maximum number of pages to fetch
Returns:
Path to generated HTML file
"""
# Set default dates if not provided
if date_from is None:
# Default to 1 year ago
date_from = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
if date_to is None:
date_to = datetime.now().strftime("%Y-%m-%d")
self.logger.info(
f"Starting snapshot download for period {date_from} to {date_to}"
)
# Create aiohttp session
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector, timeout=timeout
) as session:
try:
# Authenticate if needed
await self.authenticate()
# Fetch all snapshots
snapshots = await self.fetch_all_snapshots(
session, type_ids, date_from, date_to, max_pages
)
if not snapshots:
self.logger.warning("No snapshots found for the specified period")
return None
# Generate HTML file
html_file = await self.generate_html_file(snapshots, date_from, date_to)
# Print statistics
self.print_statistics()
return html_file
except Exception as e:
self.logger.error(f"Error during snapshot download: {e}")
raise
def print_statistics(self):
"""Print download statistics."""
print("\n" + "=" * 60)
print("SNAPSHOT DOWNLOAD STATISTICS")
print("=" * 60)
print(f"Total snapshots downloaded: {self.stats['total_snapshots']}")
print(f"Pages fetched: {self.stats['pages_fetched']}")
print(f"Failed requests: {self.stats['failed_requests']}")
print(f"Generated files: {self.stats['generated_files']}")
print("=" * 60)
def main():
parser = argparse.ArgumentParser(
description="Download ParentZone snapshots and generate HTML report",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Download snapshots using API key
python3 snapshot_downloader.py --api-key YOUR_API_KEY
# Download snapshots using login credentials
python3 snapshot_downloader.py --email user@example.com --password password
# Download snapshots for specific date range
python3 snapshot_downloader.py --api-key KEY --date-from 2024-01-01 --date-to 2024-12-31
# Download only first 5 cursor pages (for testing)
python3 snapshot_downloader.py --api-key KEY --max-pages 5
# Specify output directory
python3 snapshot_downloader.py --api-key KEY --output-dir ./my_snapshots
""",
)
parser.add_argument("--api-key", help="API key for authentication")
parser.add_argument("--email", help="Email for login authentication")
parser.add_argument("--password", help="Password for login authentication")
parser.add_argument(
"--date-from", help="Start date in YYYY-MM-DD format (default: 1 year ago)"
)
parser.add_argument(
"--date-to", help="End date in YYYY-MM-DD format (default: today)"
)
parser.add_argument(
"--type-ids",
nargs="+",
type=int,
default=[15],
help="Type IDs to filter by (default: [15])",
)
parser.add_argument(
"--output-dir",
default="snapshots",
help="Directory to save snapshot files (default: snapshots)",
)
parser.add_argument(
"--max-pages",
type=int,
help="Maximum number of cursor pages to fetch (for testing)",
)
parser.add_argument(
"--api-url",
default="https://api.parentzone.me",
help="ParentZone API URL (default: https://api.parentzone.me)",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode with detailed server response logging",
)
args = parser.parse_args()
# Validate authentication
if not args.api_key and not (args.email and args.password):
print("Error: Either --api-key or both --email and --password must be provided")
return 1
if args.email and not args.password:
print("Error: Password is required when using email authentication")
return 1
if args.password and not args.email:
print("Error: Email is required when using password authentication")
return 1
try:
# Create downloader
downloader = SnapshotDownloader(
api_url=args.api_url,
output_dir=args.output_dir,
api_key=args.api_key,
email=args.email,
password=args.password,
debug_mode=args.debug,
)
if args.debug:
print("š DEBUG MODE ENABLED - Detailed server responses will be printed")
# Download snapshots
html_file = asyncio.run(
downloader.download_snapshots(
type_ids=args.type_ids,
date_from=args.date_from,
date_to=args.date_to,
max_pages=args.max_pages,
)
)
if html_file:
print(f"\nā
Success! Snapshots downloaded and saved to: {html_file}")
print(f"š Open the file in your browser to view the snapshots")
else:
print("ā ļø No snapshots were found for the specified period")
return 0
except KeyboardInterrupt:
print("\nā ļø Download interrupted by user")
return 1
except Exception as e:
print(f"ā Error: {e}")
return 1
if __name__ == "__main__":
exit(main())