#!/usr/bin/env python3 """ Asset Tracker for ParentZone Downloader This module handles tracking of downloaded assets to avoid re-downloading and to identify new assets that need to be downloaded. """ import json import logging import os from datetime import datetime from pathlib import Path from typing import Dict, List, Set, Any, Optional import hashlib class AssetTracker: """ Tracks downloaded assets and identifies new ones. """ def __init__(self, storage_dir: str = "downloaded_images", metadata_file: str = "asset_metadata.json"): """ Initialize the asset tracker. Args: storage_dir: Directory where downloaded assets are stored metadata_file: JSON file to store asset metadata """ self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(exist_ok=True) self.metadata_file = self.storage_dir / metadata_file self.logger = logging.getLogger(__name__) # Load existing metadata self.metadata = self._load_metadata() def _load_metadata(self) -> Dict[str, Dict[str, Any]]: """ Load asset metadata from the JSON file. Returns: Dictionary of asset metadata keyed by asset ID """ if self.metadata_file.exists(): try: with open(self.metadata_file, 'r', encoding='utf-8') as f: data = json.load(f) self.logger.info(f"Loaded metadata for {len(data)} assets") return data except Exception as e: self.logger.error(f"Failed to load metadata file: {e}") return {} else: self.logger.info("No existing metadata file found, starting fresh") return {} def _save_metadata(self): """Save asset metadata to the JSON file.""" try: with open(self.metadata_file, 'w', encoding='utf-8') as f: json.dump(self.metadata, f, indent=2, default=str) self.logger.debug(f"Saved metadata for {len(self.metadata)} assets") except Exception as e: self.logger.error(f"Failed to save metadata file: {e}") def _get_asset_key(self, asset: Dict[str, Any]) -> str: """ Generate a unique key for an asset. Args: asset: Asset dictionary from API Returns: Unique key for the asset """ # Try different ID fields if 'id' in asset: return str(asset['id']) elif 'assetId' in asset: return str(asset['assetId']) elif 'uuid' in asset: return str(asset['uuid']) else: # Generate hash from asset data asset_str = json.dumps(asset, sort_keys=True, default=str) return hashlib.md5(asset_str.encode()).hexdigest() def _get_asset_hash(self, asset: Dict[str, Any]) -> str: """ Generate a hash for asset content to detect changes. Args: asset: Asset dictionary from API Returns: Hash of asset content """ # Fields that indicate content changes content_fields = ['updated', 'modified', 'lastModified', 'size', 'checksum', 'etag'] content_data = {} for field in content_fields: if field in asset: content_data[field] = asset[field] # If no content fields, use entire asset if not content_data: content_data = asset content_str = json.dumps(content_data, sort_keys=True, default=str) return hashlib.md5(content_str.encode()).hexdigest() def is_asset_downloaded(self, asset: Dict[str, Any]) -> bool: """ Check if an asset has already been downloaded. Args: asset: Asset dictionary from API Returns: True if asset is already downloaded, False otherwise """ asset_key = self._get_asset_key(asset) return asset_key in self.metadata def is_asset_modified(self, asset: Dict[str, Any]) -> bool: """ Check if an asset has been modified since last download. Args: asset: Asset dictionary from API Returns: True if asset has been modified, False otherwise """ asset_key = self._get_asset_key(asset) if asset_key not in self.metadata: return True # New asset current_hash = self._get_asset_hash(asset) stored_hash = self.metadata[asset_key].get('content_hash', '') return current_hash != stored_hash def get_new_assets(self, api_assets: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Identify new or modified assets that need to be downloaded. Args: api_assets: List of assets from API response Returns: List of assets that need to be downloaded """ new_assets = [] for asset in api_assets: asset_key = self._get_asset_key(asset) if not self.is_asset_downloaded(asset): self.logger.info(f"New asset found: {asset_key}") new_assets.append(asset) elif self.is_asset_modified(asset): self.logger.info(f"Modified asset found: {asset_key}") new_assets.append(asset) else: self.logger.debug(f"Asset unchanged: {asset_key}") self.logger.info(f"Found {len(new_assets)} new/modified assets out of {len(api_assets)} total") return new_assets def mark_asset_downloaded(self, asset: Dict[str, Any], filepath: Path, success: bool = True): """ Mark an asset as downloaded in the metadata. Args: asset: Asset dictionary from API filepath: Path where asset was saved success: Whether download was successful """ asset_key = self._get_asset_key(asset) metadata_entry = { 'asset_id': asset_key, 'filename': filepath.name, 'filepath': str(filepath), 'download_date': datetime.now().isoformat(), 'success': success, 'content_hash': self._get_asset_hash(asset), 'api_data': asset } # Add file info if download was successful and file exists if success and filepath.exists(): stat = filepath.stat() metadata_entry.update({ 'file_size': stat.st_size, 'file_modified': datetime.fromtimestamp(stat.st_mtime).isoformat() }) self.metadata[asset_key] = metadata_entry self._save_metadata() self.logger.debug(f"Marked asset as downloaded: {asset_key}") def get_downloaded_assets(self) -> Dict[str, Dict[str, Any]]: """ Get all downloaded asset metadata. Returns: Dictionary of downloaded asset metadata """ return self.metadata.copy() def cleanup_missing_files(self): """ Remove metadata entries for files that no longer exist on disk. """ removed_count = 0 assets_to_remove = [] for asset_key, metadata_entry in self.metadata.items(): filepath = Path(metadata_entry.get('filepath', '')) if not filepath.exists(): assets_to_remove.append(asset_key) self.logger.warning(f"File missing, removing from metadata: {filepath}") for asset_key in assets_to_remove: del self.metadata[asset_key] removed_count += 1 if removed_count > 0: self._save_metadata() self.logger.info(f"Cleaned up {removed_count} missing file entries from metadata") def get_stats(self) -> Dict[str, Any]: """ Get statistics about tracked assets. Returns: Dictionary with statistics """ total_assets = len(self.metadata) successful_downloads = sum(1 for entry in self.metadata.values() if entry.get('success', False)) failed_downloads = total_assets - successful_downloads total_size = 0 existing_files = 0 for entry in self.metadata.values(): if 'file_size' in entry: total_size += entry['file_size'] filepath = Path(entry.get('filepath', '')) if filepath.exists(): existing_files += 1 return { 'total_tracked_assets': total_assets, 'successful_downloads': successful_downloads, 'failed_downloads': failed_downloads, 'existing_files': existing_files, 'missing_files': total_assets - existing_files, 'total_size_bytes': total_size, 'total_size_mb': round(total_size / (1024 * 1024), 2) } def print_stats(self): """Print statistics about tracked assets.""" stats = self.get_stats() print("=" * 60) print("ASSET TRACKER STATISTICS") print("=" * 60) print(f"Total tracked assets: {stats['total_tracked_assets']}") print(f"Successful downloads: {stats['successful_downloads']}") print(f"Failed downloads: {stats['failed_downloads']}") print(f"Existing files: {stats['existing_files']}") print(f"Missing files: {stats['missing_files']}") print(f"Total size: {stats['total_size_mb']} MB ({stats['total_size_bytes']} bytes)") print("=" * 60) def main(): """Test the asset tracker functionality.""" import sys # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Create tracker tracker = AssetTracker() # Print current stats tracker.print_stats() # Cleanup missing files tracker.cleanup_missing_files() # Print updated stats if len(sys.argv) > 1 and sys.argv[1] == '--cleanup': print("\nAfter cleanup:") tracker.print_stats() if __name__ == "__main__": main()