diff --git a/config_downloader.py b/config_downloader.py index 8addbdd..e550b80 100644 --- a/config_downloader.py +++ b/config_downloader.py @@ -46,54 +46,56 @@ class ConfigImageDownloader: self.setup_logging() # Create output directory - self.output_dir = Path(self.config['output_dir']) + self.output_dir = Path(self.config["output_dir"]) self.output_dir.mkdir(parents=True, exist_ok=True) # Track download statistics - self.stats = { - 'total': 0, - 'successful': 0, - 'failed': 0, - 'skipped': 0 - } + self.stats = {"total": 0, "successful": 0, "failed": 0, "skipped": 0} # Authentication manager self.auth_manager = None # Initialize asset tracker if enabled and available - track_assets = self.config.get('track_assets', True) + track_assets = self.config.get("track_assets", True) self.asset_tracker = None if track_assets and AssetTracker: self.asset_tracker = AssetTracker(storage_dir=str(self.output_dir)) self.logger.info("Asset tracking enabled") elif track_assets: - self.logger.warning("Asset tracking requested but AssetTracker not available") + self.logger.warning( + "Asset tracking requested but AssetTracker not available" + ) else: self.logger.info("Asset tracking disabled") def load_config(self, config_file: str) -> Dict[str, Any]: """Load configuration from JSON file.""" try: - with open(config_file, 'r') as f: + with open(config_file, "r") as f: config = json.load(f) # Validate required fields - required_fields = ['api_url', 'list_endpoint', 'download_endpoint', 'output_dir'] + required_fields = [ + "api_url", + "list_endpoint", + "download_endpoint", + "output_dir", + ] for field in required_fields: if field not in config: raise ValueError(f"Missing required field: {field}") # Set defaults for optional fields - config.setdefault('max_concurrent', 5) - config.setdefault('timeout', 30) - config.setdefault('headers', {}) + config.setdefault("max_concurrent", 5) + config.setdefault("timeout", 30) + config.setdefault("headers", {}) # Note: API key is now passed as URL parameter, not header # The x-api-key header is only used for the list endpoint # Add API key to headers for list endpoint authentication - if 'api_key' in config and config['api_key']: - config['headers']['x-api-key'] = config['api_key'] + if "api_key" in config and config["api_key"]: + config["headers"]["x-api-key"] = config["api_key"] return config @@ -104,51 +106,64 @@ class ConfigImageDownloader: def setup_logging(self): """Setup logging configuration.""" - log_file = Path(self.config['output_dir']) / 'download.log' + log_file = Path(self.config["output_dir"]) / "download.log" + + # Create output directory if it doesn't exist + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Create log file if it doesn't exist + log_file.touch(exist_ok=True) logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler() - ] + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.FileHandler(log_file), logging.StreamHandler()], ) self.logger = logging.getLogger(__name__) async def authenticate(self): """Perform login authentication if credentials are provided in config.""" - if 'email' in self.config and 'password' in self.config and AuthManager: + if "email" in self.config and "password" in self.config and AuthManager: self.logger.info("Attempting login authentication...") - self.auth_manager = AuthManager(self.config['api_url']) - success = await self.auth_manager.login(self.config['email'], self.config['password']) + self.auth_manager = AuthManager(self.config["api_url"]) + success = await self.auth_manager.login( + self.config["email"], self.config["password"] + ) if success: self.logger.info("Login authentication successful") else: self.logger.error("Login authentication failed") raise Exception("Login authentication failed") - elif 'email' in self.config or 'password' in self.config: - self.logger.warning("Both email and password must be provided in config for login authentication") - raise Exception("Both email and password must be provided in config for login authentication") + elif "email" in self.config or "password" in self.config: + self.logger.warning( + "Both email and password must be provided in config for login authentication" + ) + raise Exception( + "Both email and password must be provided in config for login authentication" + ) - async def get_asset_list(self, session: aiohttp.ClientSession) -> List[Dict[str, Any]]: + async def get_asset_list( + self, session: aiohttp.ClientSession + ) -> List[Dict[str, Any]]: """Fetch the list of assets from the API.""" - url = urljoin(self.config['api_url'], self.config['list_endpoint']) + url = urljoin(self.config["api_url"], self.config["list_endpoint"]) self.logger.info(f"Fetching asset list from: {url}") - headers = self.config.get('headers', {}) + headers = self.config.get("headers", {}) # Use API key if provided - if 'api_key' in self.config and self.config['api_key']: - headers['x-api-key'] = self.config['api_key'] + if "api_key" in self.config and self.config["api_key"]: + headers["x-api-key"] = self.config["api_key"] # Use login authentication if available elif self.auth_manager and self.auth_manager.is_authenticated(): headers.update(self.auth_manager.get_auth_headers()) try: - async with session.get(url, headers=headers, timeout=self.config['timeout']) as response: + async with session.get( + url, headers=headers, timeout=self.config["timeout"] + ) as response: response.raise_for_status() data = await response.json() @@ -157,7 +172,7 @@ class ConfigImageDownloader: assets = data elif isinstance(data, dict): # Common patterns for API responses - for key in ['data', 'results', 'items', 'assets', 'images']: + for key in ["data", "results", "items", "assets", "images"]: if key in data and isinstance(data[key], list): assets = data[key] break @@ -179,7 +194,7 @@ class ConfigImageDownloader: asset_id = None # Common field names for asset identifiers - id_fields = ['id', 'asset_id', 'image_id', 'file_id', 'uuid', 'key'] + id_fields = ["id", "asset_id", "image_id", "file_id", "uuid", "key"] for field in id_fields: if field in asset: asset_id = asset[field] @@ -192,38 +207,37 @@ class ConfigImageDownloader: # Build download URL with required parameters from urllib.parse import urlencode - params = { - 'key': self.config.get('api_key', ''), - 'u': asset.get('updated', '') - } + params = {"key": self.config.get("api_key", ""), "u": asset.get("updated", "")} - download_url = urljoin(self.config['api_url'], f"/v1/media/{asset_id}/full?{urlencode(params)}") + download_url = urljoin( + self.config["api_url"], f"/v1/media/{asset_id}/full?{urlencode(params)}" + ) return download_url def get_filename(self, asset: Dict[str, Any], url: str) -> str: """Generate a filename for the downloaded asset.""" # Try to get filename from asset metadata - if 'fileName' in asset: - filename = asset['fileName'] - elif 'filename' in asset: - filename = asset['filename'] - elif 'name' in asset: - filename = asset['name'] - elif 'title' in asset: - filename = asset['title'] + if "fileName" in asset: + filename = asset["fileName"] + elif "filename" in asset: + filename = asset["filename"] + elif "name" in asset: + filename = asset["name"] + elif "title" in asset: + filename = asset["title"] else: # Extract filename from URL parsed_url = urlparse(url) filename = os.path.basename(parsed_url.path) # If no extension, try to get it from content-type or add default - if '.' not in filename: - if 'mimeType' in asset: - ext = self._get_extension_from_mime(asset['mimeType']) - elif 'content_type' in asset: - ext = self._get_extension_from_mime(asset['content_type']) + if "." not in filename: + if "mimeType" in asset: + ext = self._get_extension_from_mime(asset["mimeType"]) + elif "content_type" in asset: + ext = self._get_extension_from_mime(asset["content_type"]) else: - ext = '.jpg' # Default extension + ext = ".jpg" # Default extension filename += ext # Sanitize filename @@ -242,35 +256,39 @@ class ConfigImageDownloader: def _get_extension_from_mime(self, mime_type: str) -> str: """Get file extension from MIME type.""" mime_to_ext = { - 'image/jpeg': '.jpg', - 'image/jpg': '.jpg', - 'image/png': '.png', - 'image/gif': '.gif', - 'image/webp': '.webp', - 'image/bmp': '.bmp', - 'image/tiff': '.tiff', - 'image/svg+xml': '.svg' + "image/jpeg": ".jpg", + "image/jpg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "image/webp": ".webp", + "image/bmp": ".bmp", + "image/tiff": ".tiff", + "image/svg+xml": ".svg", } - return mime_to_ext.get(mime_type.lower(), '.jpg') + return mime_to_ext.get(mime_type.lower(), ".jpg") def _sanitize_filename(self, filename: str) -> str: """Sanitize filename by removing invalid characters.""" # Remove or replace invalid characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: - filename = filename.replace(char, '_') + filename = filename.replace(char, "_") # Remove leading/trailing spaces and dots - filename = filename.strip('. ') + filename = filename.strip(". ") # Ensure filename is not empty if not filename: - filename = 'image' + filename = "image" return filename - async def download_asset(self, session: aiohttp.ClientSession, asset: Dict[str, Any], - semaphore: asyncio.Semaphore) -> bool: + async def download_asset( + self, + session: aiohttp.ClientSession, + asset: Dict[str, Any], + semaphore: asyncio.Semaphore, + ) -> bool: """Download a single asset.""" async with semaphore: try: @@ -281,44 +299,58 @@ class ConfigImageDownloader: # Check if file already exists and we're not tracking assets if filepath.exists() and not self.asset_tracker: self.logger.info(f"Skipping {filename} (already exists)") - self.stats['skipped'] += 1 + self.stats["skipped"] += 1 return True self.logger.info(f"Downloading {filename} from {download_url}") - headers = self.config.get('headers', {}) - async with session.get(download_url, headers=headers, timeout=self.config['timeout']) as response: + headers = self.config.get("headers", {}) + async with session.get( + download_url, headers=headers, timeout=self.config["timeout"] + ) as response: response.raise_for_status() # Get content type to verify it's an image - content_type = response.headers.get('content-type', '') - if not content_type.startswith('image/'): - self.logger.warning(f"Content type is not an image: {content_type}") + content_type = response.headers.get("content-type", "") + if not content_type.startswith("image/"): + self.logger.warning( + f"Content type is not an image: {content_type}" + ) # Download the file - async with aiofiles.open(filepath, 'wb') as f: + async with aiofiles.open(filepath, "wb") as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) # Set file modification time to match the updated timestamp - if 'updated' in asset: + if "updated" in asset: try: from datetime import datetime import os + # Parse the ISO timestamp - updated_time = datetime.fromisoformat(asset['updated'].replace('Z', '+00:00')) + updated_time = datetime.fromisoformat( + asset["updated"].replace("Z", "+00:00") + ) # Set file modification time - os.utime(filepath, (updated_time.timestamp(), updated_time.timestamp())) - self.logger.info(f"Set file modification time to {asset['updated']}") + os.utime( + filepath, + (updated_time.timestamp(), updated_time.timestamp()), + ) + self.logger.info( + f"Set file modification time to {asset['updated']}" + ) except Exception as e: - self.logger.warning(f"Failed to set file modification time: {e}") + self.logger.warning( + f"Failed to set file modification time: {e}" + ) # Mark asset as downloaded in tracker if self.asset_tracker: self.asset_tracker.mark_asset_downloaded(asset, filepath, True) self.logger.info(f"Successfully downloaded {filename}") - self.stats['successful'] += 1 + self.stats["successful"] += 1 return True except Exception as e: @@ -329,8 +361,10 @@ class ConfigImageDownloader: filepath = self.output_dir / filename self.asset_tracker.mark_asset_downloaded(asset, filepath, False) - self.logger.error(f"Failed to download asset {asset.get('id', 'unknown')}: {e}") - self.stats['failed'] += 1 + self.logger.error( + f"Failed to download asset {asset.get('id', 'unknown')}: {e}" + ) + self.stats["failed"] += 1 return False async def download_all_assets(self, force_redownload: bool = False): @@ -344,9 +378,11 @@ class ConfigImageDownloader: # Create aiohttp session with connection pooling connector = aiohttp.TCPConnector(limit=100, limit_per_host=30) - timeout = aiohttp.ClientTimeout(total=self.config['timeout']) + timeout = aiohttp.ClientTimeout(total=self.config["timeout"]) - async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + async with aiohttp.ClientSession( + connector=connector, timeout=timeout + ) as session: try: # Perform authentication if needed await self.authenticate() @@ -362,24 +398,27 @@ class ConfigImageDownloader: # Filter for new/modified assets if tracking is enabled if self.asset_tracker and not force_redownload: assets = self.asset_tracker.get_new_assets(all_assets) - self.logger.info(f"Found {len(assets)} new/modified assets to download") + self.logger.info( + f"Found {len(assets)} new/modified assets to download" + ) if len(assets) == 0: self.logger.info("All assets are up to date!") return else: assets = all_assets if force_redownload: - self.logger.info("Force redownload enabled - downloading all assets") + self.logger.info( + "Force redownload enabled - downloading all assets" + ) - self.stats['total'] = len(assets) + self.stats["total"] = len(assets) # Create semaphore to limit concurrent downloads - semaphore = asyncio.Semaphore(self.config['max_concurrent']) + semaphore = asyncio.Semaphore(self.config["max_concurrent"]) # Create tasks for all downloads tasks = [ - self.download_asset(session, asset, semaphore) - for asset in assets + self.download_asset(session, asset, semaphore) for asset in assets ] # Download all assets with progress bar @@ -387,11 +426,13 @@ class ConfigImageDownloader: for coro in asyncio.as_completed(tasks): result = await coro pbar.update(1) - pbar.set_postfix({ - 'Success': self.stats['successful'], - 'Failed': self.stats['failed'], - 'Skipped': self.stats['skipped'] - }) + pbar.set_postfix( + { + "Success": self.stats["successful"], + "Failed": self.stats["failed"], + "Skipped": self.stats["skipped"], + } + ) except Exception as e: self.logger.error(f"Error during download process: {e}") @@ -415,31 +456,29 @@ Examples: cp config_example.json my_config.json # Edit my_config.json with your API details python config_downloader.py --config my_config.json - """ + """, ) parser.add_argument( - '--config', - required=True, - help='Path to the JSON configuration file' + "--config", required=True, help="Path to the JSON configuration file" ) parser.add_argument( - '--force-redownload', - action='store_true', - help='Force re-download of all assets, even if already tracked' + "--force-redownload", + action="store_true", + help="Force re-download of all assets, even if already tracked", ) parser.add_argument( - '--show-stats', - action='store_true', - help='Show asset tracking statistics and exit' + "--show-stats", + action="store_true", + help="Show asset tracking statistics and exit", ) parser.add_argument( - '--cleanup', - action='store_true', - help='Clean up metadata for missing files and exit' + "--cleanup", + action="store_true", + help="Clean up metadata for missing files and exit", ) args = parser.parse_args() @@ -462,7 +501,9 @@ Examples: try: downloader = ConfigImageDownloader(args.config) - asyncio.run(downloader.download_all_assets(force_redownload=args.force_redownload)) + asyncio.run( + downloader.download_all_assets(force_redownload=args.force_redownload) + ) except KeyboardInterrupt: print("\nDownload interrupted by user") except Exception as e: