Files
parentzone_downloader/asset_tracker.py

314 lines
9.9 KiB
Python
Raw Normal View History

2025-10-07 14:52:04 +01:00
#!/usr/bin/env python3
"""
Asset Tracker for ParentZone Downloader
This module handles tracking of downloaded assets to avoid re-downloading
and to identify new assets that need to be downloaded.
"""
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set, Any, Optional
import hashlib
class AssetTracker:
"""
Tracks downloaded assets and identifies new ones.
"""
def __init__(self, storage_dir: str = "downloaded_images", metadata_file: str = "asset_metadata.json"):
"""
Initialize the asset tracker.
Args:
storage_dir: Directory where downloaded assets are stored
metadata_file: JSON file to store asset metadata
"""
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.metadata_file = self.storage_dir / metadata_file
self.logger = logging.getLogger(__name__)
# Load existing metadata
self.metadata = self._load_metadata()
def _load_metadata(self) -> Dict[str, Dict[str, Any]]:
"""
Load asset metadata from the JSON file.
Returns:
Dictionary of asset metadata keyed by asset ID
"""
if self.metadata_file.exists():
try:
with open(self.metadata_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.logger.info(f"Loaded metadata for {len(data)} assets")
return data
except Exception as e:
self.logger.error(f"Failed to load metadata file: {e}")
return {}
else:
self.logger.info("No existing metadata file found, starting fresh")
return {}
def _save_metadata(self):
"""Save asset metadata to the JSON file."""
try:
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(self.metadata, f, indent=2, default=str)
self.logger.debug(f"Saved metadata for {len(self.metadata)} assets")
except Exception as e:
self.logger.error(f"Failed to save metadata file: {e}")
def _get_asset_key(self, asset: Dict[str, Any]) -> str:
"""
Generate a unique key for an asset.
Args:
asset: Asset dictionary from API
Returns:
Unique key for the asset
"""
# Try different ID fields
if 'id' in asset:
return str(asset['id'])
elif 'assetId' in asset:
return str(asset['assetId'])
elif 'uuid' in asset:
return str(asset['uuid'])
else:
# Generate hash from asset data
asset_str = json.dumps(asset, sort_keys=True, default=str)
return hashlib.md5(asset_str.encode()).hexdigest()
def _get_asset_hash(self, asset: Dict[str, Any]) -> str:
"""
Generate a hash for asset content to detect changes.
Args:
asset: Asset dictionary from API
Returns:
Hash of asset content
"""
# Fields that indicate content changes
content_fields = ['updated', 'modified', 'lastModified', 'size', 'checksum', 'etag']
content_data = {}
for field in content_fields:
if field in asset:
content_data[field] = asset[field]
# If no content fields, use entire asset
if not content_data:
content_data = asset
content_str = json.dumps(content_data, sort_keys=True, default=str)
return hashlib.md5(content_str.encode()).hexdigest()
def is_asset_downloaded(self, asset: Dict[str, Any]) -> bool:
"""
Check if an asset has already been downloaded.
Args:
asset: Asset dictionary from API
Returns:
True if asset is already downloaded, False otherwise
"""
asset_key = self._get_asset_key(asset)
return asset_key in self.metadata
def is_asset_modified(self, asset: Dict[str, Any]) -> bool:
"""
Check if an asset has been modified since last download.
Args:
asset: Asset dictionary from API
Returns:
True if asset has been modified, False otherwise
"""
asset_key = self._get_asset_key(asset)
if asset_key not in self.metadata:
return True # New asset
current_hash = self._get_asset_hash(asset)
stored_hash = self.metadata[asset_key].get('content_hash', '')
return current_hash != stored_hash
def get_new_assets(self, api_assets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Identify new or modified assets that need to be downloaded.
Args:
api_assets: List of assets from API response
Returns:
List of assets that need to be downloaded
"""
new_assets = []
for asset in api_assets:
asset_key = self._get_asset_key(asset)
if not self.is_asset_downloaded(asset):
self.logger.info(f"New asset found: {asset_key}")
new_assets.append(asset)
elif self.is_asset_modified(asset):
self.logger.info(f"Modified asset found: {asset_key}")
new_assets.append(asset)
else:
self.logger.debug(f"Asset unchanged: {asset_key}")
self.logger.info(f"Found {len(new_assets)} new/modified assets out of {len(api_assets)} total")
return new_assets
def mark_asset_downloaded(self, asset: Dict[str, Any], filepath: Path, success: bool = True):
"""
Mark an asset as downloaded in the metadata.
Args:
asset: Asset dictionary from API
filepath: Path where asset was saved
success: Whether download was successful
"""
asset_key = self._get_asset_key(asset)
metadata_entry = {
'asset_id': asset_key,
'filename': filepath.name,
'filepath': str(filepath),
'download_date': datetime.now().isoformat(),
'success': success,
'content_hash': self._get_asset_hash(asset),
'api_data': asset
}
# Add file info if download was successful and file exists
if success and filepath.exists():
stat = filepath.stat()
metadata_entry.update({
'file_size': stat.st_size,
'file_modified': datetime.fromtimestamp(stat.st_mtime).isoformat()
})
self.metadata[asset_key] = metadata_entry
self._save_metadata()
self.logger.debug(f"Marked asset as downloaded: {asset_key}")
def get_downloaded_assets(self) -> Dict[str, Dict[str, Any]]:
"""
Get all downloaded asset metadata.
Returns:
Dictionary of downloaded asset metadata
"""
return self.metadata.copy()
def cleanup_missing_files(self):
"""
Remove metadata entries for files that no longer exist on disk.
"""
removed_count = 0
assets_to_remove = []
for asset_key, metadata_entry in self.metadata.items():
filepath = Path(metadata_entry.get('filepath', ''))
if not filepath.exists():
assets_to_remove.append(asset_key)
self.logger.warning(f"File missing, removing from metadata: {filepath}")
for asset_key in assets_to_remove:
del self.metadata[asset_key]
removed_count += 1
if removed_count > 0:
self._save_metadata()
self.logger.info(f"Cleaned up {removed_count} missing file entries from metadata")
def get_stats(self) -> Dict[str, Any]:
"""
Get statistics about tracked assets.
Returns:
Dictionary with statistics
"""
total_assets = len(self.metadata)
successful_downloads = sum(1 for entry in self.metadata.values() if entry.get('success', False))
failed_downloads = total_assets - successful_downloads
total_size = 0
existing_files = 0
for entry in self.metadata.values():
if 'file_size' in entry:
total_size += entry['file_size']
filepath = Path(entry.get('filepath', ''))
if filepath.exists():
existing_files += 1
return {
'total_tracked_assets': total_assets,
'successful_downloads': successful_downloads,
'failed_downloads': failed_downloads,
'existing_files': existing_files,
'missing_files': total_assets - existing_files,
'total_size_bytes': total_size,
'total_size_mb': round(total_size / (1024 * 1024), 2)
}
def print_stats(self):
"""Print statistics about tracked assets."""
stats = self.get_stats()
print("=" * 60)
print("ASSET TRACKER STATISTICS")
print("=" * 60)
print(f"Total tracked assets: {stats['total_tracked_assets']}")
print(f"Successful downloads: {stats['successful_downloads']}")
print(f"Failed downloads: {stats['failed_downloads']}")
print(f"Existing files: {stats['existing_files']}")
print(f"Missing files: {stats['missing_files']}")
print(f"Total size: {stats['total_size_mb']} MB ({stats['total_size_bytes']} bytes)")
print("=" * 60)
def main():
"""Test the asset tracker functionality."""
import sys
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create tracker
tracker = AssetTracker()
# Print current stats
tracker.print_stats()
# Cleanup missing files
tracker.cleanup_missing_files()
# Print updated stats
if len(sys.argv) > 1 and sys.argv[1] == '--cleanup':
print("\nAfter cleanup:")
tracker.print_stats()
if __name__ == "__main__":
main()