#!/usr/bin/env python3 """ Configuration-based Image Downloader This script reads configuration from a JSON file and downloads images from a REST API. It's a simplified version of the main downloader for easier use. Usage: python config_downloader.py --config config.json """ import argparse import json import asyncio import aiohttp import aiofiles import os import logging from pathlib import Path from urllib.parse import urljoin, urlparse from typing import List, Dict, Any, Optional import time from tqdm import tqdm # Import the auth manager and asset tracker try: from auth_manager import AuthManager except ImportError: AuthManager = None try: from asset_tracker import AssetTracker except ImportError: AssetTracker = None class ConfigImageDownloader: def __init__(self, config_file: str): """ Initialize the downloader with configuration from a JSON file. Args: config_file: Path to the JSON configuration file """ self.config = self.load_config(config_file) self.setup_logging() # Create output directory self.output_dir = Path(self.config["output_dir"]) self.output_dir.mkdir(parents=True, exist_ok=True) # Track download statistics self.stats = {"total": 0, "successful": 0, "failed": 0, "skipped": 0} # Authentication manager self.auth_manager = None # Initialize asset tracker if enabled and available track_assets = self.config.get("track_assets", True) self.asset_tracker = None if track_assets and AssetTracker: self.asset_tracker = AssetTracker(storage_dir=str(self.output_dir)) self.logger.info("Asset tracking enabled") elif track_assets: self.logger.warning( "Asset tracking requested but AssetTracker not available" ) else: self.logger.info("Asset tracking disabled") def load_config(self, config_file: str) -> Dict[str, Any]: """Load configuration from JSON file.""" try: with open(config_file, "r") as f: config = json.load(f) # Validate required fields required_fields = [ "api_url", "list_endpoint", "download_endpoint", "output_dir", ] for field in required_fields: if field not in config: raise ValueError(f"Missing required field: {field}") # Set defaults for optional fields config.setdefault("max_concurrent", 5) config.setdefault("timeout", 30) config.setdefault("headers", {}) # Note: API key is now passed as URL parameter, not header # The x-api-key header is only used for the list endpoint # Add API key to headers for list endpoint authentication if "api_key" in config and config["api_key"]: config["headers"]["x-api-key"] = config["api_key"] return config except FileNotFoundError: raise FileNotFoundError(f"Configuration file not found: {config_file}") except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in configuration file: {e}") def setup_logging(self): """Setup logging configuration.""" log_file = Path(self.config["output_dir"]) / "download.log" # Create output directory if it doesn't exist log_file.parent.mkdir(parents=True, exist_ok=True) # Create log file if it doesn't exist log_file.touch(exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(log_file), logging.StreamHandler()], ) self.logger = logging.getLogger(__name__) async def authenticate(self): """Perform login authentication if credentials are provided in config.""" if "email" in self.config and "password" in self.config and AuthManager: self.logger.info("Attempting login authentication...") self.auth_manager = AuthManager(self.config["api_url"]) success = await self.auth_manager.login( self.config["email"], self.config["password"] ) if success: self.logger.info("Login authentication successful") else: self.logger.error("Login authentication failed") raise Exception("Login authentication failed") elif "email" in self.config or "password" in self.config: self.logger.warning( "Both email and password must be provided in config for login authentication" ) raise Exception( "Both email and password must be provided in config for login authentication" ) async def get_asset_list( self, session: aiohttp.ClientSession ) -> List[Dict[str, Any]]: """Fetch the list of assets from the API.""" url = urljoin(self.config["api_url"], self.config["list_endpoint"]) self.logger.info(f"Fetching asset list from: {url}") headers = self.config.get("headers", {}) # Use API key if provided if "api_key" in self.config and self.config["api_key"]: headers["x-api-key"] = self.config["api_key"] # Use login authentication if available elif self.auth_manager and self.auth_manager.is_authenticated(): headers.update(self.auth_manager.get_auth_headers()) try: async with session.get( url, headers=headers, timeout=self.config["timeout"] ) as response: response.raise_for_status() data = await response.json() # Handle different response formats if isinstance(data, list): assets = data elif isinstance(data, dict): # Common patterns for API responses for key in ["data", "results", "items", "assets", "images"]: if key in data and isinstance(data[key], list): assets = data[key] break else: assets = [data] # Single asset else: raise ValueError(f"Unexpected response format: {type(data)}") self.logger.info(f"Found {len(assets)} assets") return assets except Exception as e: self.logger.error(f"Failed to fetch asset list: {e}") raise def get_download_url(self, asset: Dict[str, Any]) -> str: """Generate the download URL for an asset.""" # Try different common patterns for asset IDs asset_id = None # Common field names for asset identifiers id_fields = ["id", "asset_id", "image_id", "file_id", "uuid", "key"] for field in id_fields: if field in asset: asset_id = asset[field] break if asset_id is None: # If no ID field found, try to use the asset itself as the ID asset_id = str(asset) # Build download URL with required parameters from urllib.parse import urlencode params = {"key": self.config.get("api_key", ""), "u": asset.get("updated", "")} download_url = urljoin( self.config["api_url"], f"/v1/media/{asset_id}/full?{urlencode(params)}" ) return download_url def get_filename(self, asset: Dict[str, Any], url: str) -> str: """Generate a filename for the downloaded asset.""" # Try to get filename from asset metadata if "fileName" in asset: filename = asset["fileName"] elif "filename" in asset: filename = asset["filename"] elif "name" in asset: filename = asset["name"] elif "title" in asset: filename = asset["title"] else: # Extract filename from URL parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) # If no extension, try to get it from content-type or add default if "." not in filename: if "mimeType" in asset: ext = self._get_extension_from_mime(asset["mimeType"]) elif "content_type" in asset: ext = self._get_extension_from_mime(asset["content_type"]) else: ext = ".jpg" # Default extension filename += ext # Sanitize filename filename = self._sanitize_filename(filename) # Ensure unique filename counter = 1 original_filename = filename while (self.output_dir / filename).exists(): name, ext = os.path.splitext(original_filename) filename = f"{name}_{counter}{ext}" counter += 1 return filename def _get_extension_from_mime(self, mime_type: str) -> str: """Get file extension from MIME type.""" mime_to_ext = { "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/png": ".png", "image/gif": ".gif", "image/webp": ".webp", "image/bmp": ".bmp", "image/tiff": ".tiff", "image/svg+xml": ".svg", } return mime_to_ext.get(mime_type.lower(), ".jpg") def _sanitize_filename(self, filename: str) -> str: """Sanitize filename by removing invalid characters.""" # Remove or replace invalid characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, "_") # Remove leading/trailing spaces and dots filename = filename.strip(". ") # Ensure filename is not empty if not filename: filename = "image" return filename async def download_asset( self, session: aiohttp.ClientSession, asset: Dict[str, Any], semaphore: asyncio.Semaphore, ) -> bool: """Download a single asset.""" async with semaphore: try: download_url = self.get_download_url(asset) filename = self.get_filename(asset, download_url) filepath = self.output_dir / filename # Check if file already exists and we're not tracking assets if filepath.exists() and not self.asset_tracker: self.logger.info(f"Skipping {filename} (already exists)") self.stats["skipped"] += 1 return True self.logger.info(f"Downloading {filename} from {download_url}") headers = self.config.get("headers", {}) async with session.get( download_url, headers=headers, timeout=self.config["timeout"] ) as response: response.raise_for_status() # Get content type to verify it's an image content_type = response.headers.get("content-type", "") if not content_type.startswith("image/"): self.logger.warning( f"Content type is not an image: {content_type}" ) # Download the file async with aiofiles.open(filepath, "wb") as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) # Set file modification time to match the updated timestamp if "updated" in asset: try: from datetime import datetime import os # Parse the ISO timestamp updated_time = datetime.fromisoformat( asset["updated"].replace("Z", "+00:00") ) # Set file modification time os.utime( filepath, (updated_time.timestamp(), updated_time.timestamp()), ) self.logger.info( f"Set file modification time to {asset['updated']}" ) except Exception as e: self.logger.warning( f"Failed to set file modification time: {e}" ) # Mark asset as downloaded in tracker if self.asset_tracker: self.asset_tracker.mark_asset_downloaded(asset, filepath, True) self.logger.info(f"Successfully downloaded {filename}") self.stats["successful"] += 1 return True except Exception as e: # Mark asset as failed in tracker if self.asset_tracker: download_url = self.get_download_url(asset) filename = self.get_filename(asset, download_url) filepath = self.output_dir / filename self.asset_tracker.mark_asset_downloaded(asset, filepath, False) self.logger.error( f"Failed to download asset {asset.get('id', 'unknown')}: {e}" ) self.stats["failed"] += 1 return False async def download_all_assets(self, force_redownload: bool = False): """ Download all assets from the API. Args: force_redownload: If True, download all assets regardless of tracking """ start_time = time.time() # Create aiohttp session with connection pooling connector = aiohttp.TCPConnector(limit=100, limit_per_host=30) timeout = aiohttp.ClientTimeout(total=self.config["timeout"]) async with aiohttp.ClientSession( connector=connector, timeout=timeout ) as session: try: # Perform authentication if needed await self.authenticate() # Get asset list all_assets = await self.get_asset_list(session) self.logger.info(f"Retrieved {len(all_assets)} total assets from API") if not all_assets: self.logger.warning("No assets found to download") return # Filter for new/modified assets if tracking is enabled if self.asset_tracker and not force_redownload: assets = self.asset_tracker.get_new_assets(all_assets) self.logger.info( f"Found {len(assets)} new/modified assets to download" ) if len(assets) == 0: self.logger.info("All assets are up to date!") return else: assets = all_assets if force_redownload: self.logger.info( "Force redownload enabled - downloading all assets" ) self.stats["total"] = len(assets) # Create semaphore to limit concurrent downloads semaphore = asyncio.Semaphore(self.config["max_concurrent"]) # Create tasks for all downloads tasks = [ self.download_asset(session, asset, semaphore) for asset in assets ] # Download all assets with progress bar with tqdm(total=len(tasks), desc="Downloading assets") as pbar: for coro in asyncio.as_completed(tasks): result = await coro pbar.update(1) pbar.set_postfix( { "Success": self.stats["successful"], "Failed": self.stats["failed"], "Skipped": self.stats["skipped"], } ) except Exception as e: self.logger.error(f"Error during download process: {e}") raise # Print final statistics elapsed_time = time.time() - start_time self.logger.info(f"Download completed in {elapsed_time:.2f} seconds") self.logger.info(f"Statistics: {self.stats}") def main(): parser = argparse.ArgumentParser( description="Download images using configuration file", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python config_downloader.py --config config.json # Create a config file first: cp config_example.json my_config.json # Edit my_config.json with your API details python config_downloader.py --config my_config.json """, ) parser.add_argument( "--config", required=True, help="Path to the JSON configuration file" ) parser.add_argument( "--force-redownload", action="store_true", help="Force re-download of all assets, even if already tracked", ) parser.add_argument( "--show-stats", action="store_true", help="Show asset tracking statistics and exit", ) parser.add_argument( "--cleanup", action="store_true", help="Clean up metadata for missing files and exit", ) args = parser.parse_args() # Handle special commands first if args.show_stats or args.cleanup: try: downloader = ConfigImageDownloader(args.config) if downloader.asset_tracker: if args.cleanup: downloader.asset_tracker.cleanup_missing_files() if args.show_stats: downloader.asset_tracker.print_stats() else: print("Asset tracking is not available") except Exception as e: print(f"Error: {e}") return 1 return 0 try: downloader = ConfigImageDownloader(args.config) asyncio.run( downloader.download_all_assets(force_redownload=args.force_redownload) ) except KeyboardInterrupt: print("\nDownload interrupted by user") except Exception as e: print(f"Error: {e}") return 1 return 0 if __name__ == "__main__": exit(main())