314 lines
9.9 KiB
Python
314 lines
9.9 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Asset Tracker for ParentZone Downloader
|
||
|
|
|
||
|
|
This module handles tracking of downloaded assets to avoid re-downloading
|
||
|
|
and to identify new assets that need to be downloaded.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
from datetime import datetime
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, List, Set, Any, Optional
|
||
|
|
import hashlib
|
||
|
|
|
||
|
|
|
||
|
|
class AssetTracker:
|
||
|
|
"""
|
||
|
|
Tracks downloaded assets and identifies new ones.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, storage_dir: str = "downloaded_images", metadata_file: str = "asset_metadata.json"):
|
||
|
|
"""
|
||
|
|
Initialize the asset tracker.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
storage_dir: Directory where downloaded assets are stored
|
||
|
|
metadata_file: JSON file to store asset metadata
|
||
|
|
"""
|
||
|
|
self.storage_dir = Path(storage_dir)
|
||
|
|
self.storage_dir.mkdir(exist_ok=True)
|
||
|
|
|
||
|
|
self.metadata_file = self.storage_dir / metadata_file
|
||
|
|
self.logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
# Load existing metadata
|
||
|
|
self.metadata = self._load_metadata()
|
||
|
|
|
||
|
|
def _load_metadata(self) -> Dict[str, Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Load asset metadata from the JSON file.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary of asset metadata keyed by asset ID
|
||
|
|
"""
|
||
|
|
if self.metadata_file.exists():
|
||
|
|
try:
|
||
|
|
with open(self.metadata_file, 'r', encoding='utf-8') as f:
|
||
|
|
data = json.load(f)
|
||
|
|
self.logger.info(f"Loaded metadata for {len(data)} assets")
|
||
|
|
return data
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Failed to load metadata file: {e}")
|
||
|
|
return {}
|
||
|
|
else:
|
||
|
|
self.logger.info("No existing metadata file found, starting fresh")
|
||
|
|
return {}
|
||
|
|
|
||
|
|
def _save_metadata(self):
|
||
|
|
"""Save asset metadata to the JSON file."""
|
||
|
|
try:
|
||
|
|
with open(self.metadata_file, 'w', encoding='utf-8') as f:
|
||
|
|
json.dump(self.metadata, f, indent=2, default=str)
|
||
|
|
self.logger.debug(f"Saved metadata for {len(self.metadata)} assets")
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Failed to save metadata file: {e}")
|
||
|
|
|
||
|
|
def _get_asset_key(self, asset: Dict[str, Any]) -> str:
|
||
|
|
"""
|
||
|
|
Generate a unique key for an asset.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
asset: Asset dictionary from API
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Unique key for the asset
|
||
|
|
"""
|
||
|
|
# Try different ID fields
|
||
|
|
if 'id' in asset:
|
||
|
|
return str(asset['id'])
|
||
|
|
elif 'assetId' in asset:
|
||
|
|
return str(asset['assetId'])
|
||
|
|
elif 'uuid' in asset:
|
||
|
|
return str(asset['uuid'])
|
||
|
|
else:
|
||
|
|
# Generate hash from asset data
|
||
|
|
asset_str = json.dumps(asset, sort_keys=True, default=str)
|
||
|
|
return hashlib.md5(asset_str.encode()).hexdigest()
|
||
|
|
|
||
|
|
def _get_asset_hash(self, asset: Dict[str, Any]) -> str:
|
||
|
|
"""
|
||
|
|
Generate a hash for asset content to detect changes.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
asset: Asset dictionary from API
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Hash of asset content
|
||
|
|
"""
|
||
|
|
# Fields that indicate content changes
|
||
|
|
content_fields = ['updated', 'modified', 'lastModified', 'size', 'checksum', 'etag']
|
||
|
|
|
||
|
|
content_data = {}
|
||
|
|
for field in content_fields:
|
||
|
|
if field in asset:
|
||
|
|
content_data[field] = asset[field]
|
||
|
|
|
||
|
|
# If no content fields, use entire asset
|
||
|
|
if not content_data:
|
||
|
|
content_data = asset
|
||
|
|
|
||
|
|
content_str = json.dumps(content_data, sort_keys=True, default=str)
|
||
|
|
return hashlib.md5(content_str.encode()).hexdigest()
|
||
|
|
|
||
|
|
def is_asset_downloaded(self, asset: Dict[str, Any]) -> bool:
|
||
|
|
"""
|
||
|
|
Check if an asset has already been downloaded.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
asset: Asset dictionary from API
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if asset is already downloaded, False otherwise
|
||
|
|
"""
|
||
|
|
asset_key = self._get_asset_key(asset)
|
||
|
|
return asset_key in self.metadata
|
||
|
|
|
||
|
|
def is_asset_modified(self, asset: Dict[str, Any]) -> bool:
|
||
|
|
"""
|
||
|
|
Check if an asset has been modified since last download.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
asset: Asset dictionary from API
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if asset has been modified, False otherwise
|
||
|
|
"""
|
||
|
|
asset_key = self._get_asset_key(asset)
|
||
|
|
|
||
|
|
if asset_key not in self.metadata:
|
||
|
|
return True # New asset
|
||
|
|
|
||
|
|
current_hash = self._get_asset_hash(asset)
|
||
|
|
stored_hash = self.metadata[asset_key].get('content_hash', '')
|
||
|
|
|
||
|
|
return current_hash != stored_hash
|
||
|
|
|
||
|
|
def get_new_assets(self, api_assets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Identify new or modified assets that need to be downloaded.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
api_assets: List of assets from API response
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of assets that need to be downloaded
|
||
|
|
"""
|
||
|
|
new_assets = []
|
||
|
|
|
||
|
|
for asset in api_assets:
|
||
|
|
asset_key = self._get_asset_key(asset)
|
||
|
|
|
||
|
|
if not self.is_asset_downloaded(asset):
|
||
|
|
self.logger.info(f"New asset found: {asset_key}")
|
||
|
|
new_assets.append(asset)
|
||
|
|
elif self.is_asset_modified(asset):
|
||
|
|
self.logger.info(f"Modified asset found: {asset_key}")
|
||
|
|
new_assets.append(asset)
|
||
|
|
else:
|
||
|
|
self.logger.debug(f"Asset unchanged: {asset_key}")
|
||
|
|
|
||
|
|
self.logger.info(f"Found {len(new_assets)} new/modified assets out of {len(api_assets)} total")
|
||
|
|
return new_assets
|
||
|
|
|
||
|
|
def mark_asset_downloaded(self, asset: Dict[str, Any], filepath: Path, success: bool = True):
|
||
|
|
"""
|
||
|
|
Mark an asset as downloaded in the metadata.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
asset: Asset dictionary from API
|
||
|
|
filepath: Path where asset was saved
|
||
|
|
success: Whether download was successful
|
||
|
|
"""
|
||
|
|
asset_key = self._get_asset_key(asset)
|
||
|
|
|
||
|
|
metadata_entry = {
|
||
|
|
'asset_id': asset_key,
|
||
|
|
'filename': filepath.name,
|
||
|
|
'filepath': str(filepath),
|
||
|
|
'download_date': datetime.now().isoformat(),
|
||
|
|
'success': success,
|
||
|
|
'content_hash': self._get_asset_hash(asset),
|
||
|
|
'api_data': asset
|
||
|
|
}
|
||
|
|
|
||
|
|
# Add file info if download was successful and file exists
|
||
|
|
if success and filepath.exists():
|
||
|
|
stat = filepath.stat()
|
||
|
|
metadata_entry.update({
|
||
|
|
'file_size': stat.st_size,
|
||
|
|
'file_modified': datetime.fromtimestamp(stat.st_mtime).isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
self.metadata[asset_key] = metadata_entry
|
||
|
|
self._save_metadata()
|
||
|
|
|
||
|
|
self.logger.debug(f"Marked asset as downloaded: {asset_key}")
|
||
|
|
|
||
|
|
def get_downloaded_assets(self) -> Dict[str, Dict[str, Any]]:
|
||
|
|
"""
|
||
|
|
Get all downloaded asset metadata.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary of downloaded asset metadata
|
||
|
|
"""
|
||
|
|
return self.metadata.copy()
|
||
|
|
|
||
|
|
def cleanup_missing_files(self):
|
||
|
|
"""
|
||
|
|
Remove metadata entries for files that no longer exist on disk.
|
||
|
|
"""
|
||
|
|
removed_count = 0
|
||
|
|
assets_to_remove = []
|
||
|
|
|
||
|
|
for asset_key, metadata_entry in self.metadata.items():
|
||
|
|
filepath = Path(metadata_entry.get('filepath', ''))
|
||
|
|
if not filepath.exists():
|
||
|
|
assets_to_remove.append(asset_key)
|
||
|
|
self.logger.warning(f"File missing, removing from metadata: {filepath}")
|
||
|
|
|
||
|
|
for asset_key in assets_to_remove:
|
||
|
|
del self.metadata[asset_key]
|
||
|
|
removed_count += 1
|
||
|
|
|
||
|
|
if removed_count > 0:
|
||
|
|
self._save_metadata()
|
||
|
|
self.logger.info(f"Cleaned up {removed_count} missing file entries from metadata")
|
||
|
|
|
||
|
|
def get_stats(self) -> Dict[str, Any]:
|
||
|
|
"""
|
||
|
|
Get statistics about tracked assets.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary with statistics
|
||
|
|
"""
|
||
|
|
total_assets = len(self.metadata)
|
||
|
|
successful_downloads = sum(1 for entry in self.metadata.values() if entry.get('success', False))
|
||
|
|
failed_downloads = total_assets - successful_downloads
|
||
|
|
|
||
|
|
total_size = 0
|
||
|
|
existing_files = 0
|
||
|
|
|
||
|
|
for entry in self.metadata.values():
|
||
|
|
if 'file_size' in entry:
|
||
|
|
total_size += entry['file_size']
|
||
|
|
filepath = Path(entry.get('filepath', ''))
|
||
|
|
if filepath.exists():
|
||
|
|
existing_files += 1
|
||
|
|
|
||
|
|
return {
|
||
|
|
'total_tracked_assets': total_assets,
|
||
|
|
'successful_downloads': successful_downloads,
|
||
|
|
'failed_downloads': failed_downloads,
|
||
|
|
'existing_files': existing_files,
|
||
|
|
'missing_files': total_assets - existing_files,
|
||
|
|
'total_size_bytes': total_size,
|
||
|
|
'total_size_mb': round(total_size / (1024 * 1024), 2)
|
||
|
|
}
|
||
|
|
|
||
|
|
def print_stats(self):
|
||
|
|
"""Print statistics about tracked assets."""
|
||
|
|
stats = self.get_stats()
|
||
|
|
|
||
|
|
print("=" * 60)
|
||
|
|
print("ASSET TRACKER STATISTICS")
|
||
|
|
print("=" * 60)
|
||
|
|
print(f"Total tracked assets: {stats['total_tracked_assets']}")
|
||
|
|
print(f"Successful downloads: {stats['successful_downloads']}")
|
||
|
|
print(f"Failed downloads: {stats['failed_downloads']}")
|
||
|
|
print(f"Existing files: {stats['existing_files']}")
|
||
|
|
print(f"Missing files: {stats['missing_files']}")
|
||
|
|
print(f"Total size: {stats['total_size_mb']} MB ({stats['total_size_bytes']} bytes)")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""Test the asset tracker functionality."""
|
||
|
|
import sys
|
||
|
|
|
||
|
|
# Setup logging
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
|
|
)
|
||
|
|
|
||
|
|
# Create tracker
|
||
|
|
tracker = AssetTracker()
|
||
|
|
|
||
|
|
# Print current stats
|
||
|
|
tracker.print_stats()
|
||
|
|
|
||
|
|
# Cleanup missing files
|
||
|
|
tracker.cleanup_missing_files()
|
||
|
|
|
||
|
|
# Print updated stats
|
||
|
|
if len(sys.argv) > 1 and sys.argv[1] == '--cleanup':
|
||
|
|
print("\nAfter cleanup:")
|
||
|
|
tracker.print_stats()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|