Files
parentzone_downloader/config_downloader.py
Tudor Sitaru ddde67ca62 first commit
2025-10-07 14:52:04 +01:00

477 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Configuration-based Image Downloader
This script reads configuration from a JSON file and downloads images from a REST API.
It's a simplified version of the main downloader for easier use.
Usage:
python config_downloader.py --config config.json
"""
import argparse
import json
import asyncio
import aiohttp
import aiofiles
import os
import logging
from pathlib import Path
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Any, Optional
import time
from tqdm import tqdm
# Import the auth manager and asset tracker
try:
from auth_manager import AuthManager
except ImportError:
AuthManager = None
try:
from asset_tracker import AssetTracker
except ImportError:
AssetTracker = None
class ConfigImageDownloader:
def __init__(self, config_file: str):
"""
Initialize the downloader with configuration from a JSON file.
Args:
config_file: Path to the JSON configuration file
"""
self.config = self.load_config(config_file)
self.setup_logging()
# Create output directory
self.output_dir = Path(self.config['output_dir'])
self.output_dir.mkdir(parents=True, exist_ok=True)
# Track download statistics
self.stats = {
'total': 0,
'successful': 0,
'failed': 0,
'skipped': 0
}
# Authentication manager
self.auth_manager = None
# Initialize asset tracker if enabled and available
track_assets = self.config.get('track_assets', True)
self.asset_tracker = None
if track_assets and AssetTracker:
self.asset_tracker = AssetTracker(storage_dir=str(self.output_dir))
self.logger.info("Asset tracking enabled")
elif track_assets:
self.logger.warning("Asset tracking requested but AssetTracker not available")
else:
self.logger.info("Asset tracking disabled")
def load_config(self, config_file: str) -> Dict[str, Any]:
"""Load configuration from JSON file."""
try:
with open(config_file, 'r') as f:
config = json.load(f)
# Validate required fields
required_fields = ['api_url', 'list_endpoint', 'download_endpoint', 'output_dir']
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required field: {field}")
# Set defaults for optional fields
config.setdefault('max_concurrent', 5)
config.setdefault('timeout', 30)
config.setdefault('headers', {})
# Note: API key is now passed as URL parameter, not header
# The x-api-key header is only used for the list endpoint
# Add API key to headers for list endpoint authentication
if 'api_key' in config and config['api_key']:
config['headers']['x-api-key'] = config['api_key']
return config
except FileNotFoundError:
raise FileNotFoundError(f"Configuration file not found: {config_file}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in configuration file: {e}")
def setup_logging(self):
"""Setup logging configuration."""
log_file = Path(self.config['output_dir']) / 'download.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def authenticate(self):
"""Perform login authentication if credentials are provided in config."""
if 'email' in self.config and 'password' in self.config and AuthManager:
self.logger.info("Attempting login authentication...")
self.auth_manager = AuthManager(self.config['api_url'])
success = await self.auth_manager.login(self.config['email'], self.config['password'])
if success:
self.logger.info("Login authentication successful")
else:
self.logger.error("Login authentication failed")
raise Exception("Login authentication failed")
elif 'email' in self.config or 'password' in self.config:
self.logger.warning("Both email and password must be provided in config for login authentication")
raise Exception("Both email and password must be provided in config for login authentication")
async def get_asset_list(self, session: aiohttp.ClientSession) -> List[Dict[str, Any]]:
"""Fetch the list of assets from the API."""
url = urljoin(self.config['api_url'], self.config['list_endpoint'])
self.logger.info(f"Fetching asset list from: {url}")
headers = self.config.get('headers', {})
# Use API key if provided
if 'api_key' in self.config and self.config['api_key']:
headers['x-api-key'] = self.config['api_key']
# Use login authentication if available
elif self.auth_manager and self.auth_manager.is_authenticated():
headers.update(self.auth_manager.get_auth_headers())
try:
async with session.get(url, headers=headers, timeout=self.config['timeout']) as response:
response.raise_for_status()
data = await response.json()
# Handle different response formats
if isinstance(data, list):
assets = data
elif isinstance(data, dict):
# Common patterns for API responses
for key in ['data', 'results', 'items', 'assets', 'images']:
if key in data and isinstance(data[key], list):
assets = data[key]
break
else:
assets = [data] # Single asset
else:
raise ValueError(f"Unexpected response format: {type(data)}")
self.logger.info(f"Found {len(assets)} assets")
return assets
except Exception as e:
self.logger.error(f"Failed to fetch asset list: {e}")
raise
def get_download_url(self, asset: Dict[str, Any]) -> str:
"""Generate the download URL for an asset."""
# Try different common patterns for asset IDs
asset_id = None
# Common field names for asset identifiers
id_fields = ['id', 'asset_id', 'image_id', 'file_id', 'uuid', 'key']
for field in id_fields:
if field in asset:
asset_id = asset[field]
break
if asset_id is None:
# If no ID field found, try to use the asset itself as the ID
asset_id = str(asset)
# Build download URL with required parameters
from urllib.parse import urlencode
params = {
'key': self.config.get('api_key', ''),
'u': asset.get('updated', '')
}
download_url = urljoin(self.config['api_url'], f"/v1/media/{asset_id}/full?{urlencode(params)}")
return download_url
def get_filename(self, asset: Dict[str, Any], url: str) -> str:
"""Generate a filename for the downloaded asset."""
# Try to get filename from asset metadata
if 'fileName' in asset:
filename = asset['fileName']
elif 'filename' in asset:
filename = asset['filename']
elif 'name' in asset:
filename = asset['name']
elif 'title' in asset:
filename = asset['title']
else:
# Extract filename from URL
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
# If no extension, try to get it from content-type or add default
if '.' not in filename:
if 'mimeType' in asset:
ext = self._get_extension_from_mime(asset['mimeType'])
elif 'content_type' in asset:
ext = self._get_extension_from_mime(asset['content_type'])
else:
ext = '.jpg' # Default extension
filename += ext
# Sanitize filename
filename = self._sanitize_filename(filename)
# Ensure unique filename
counter = 1
original_filename = filename
while (self.output_dir / filename).exists():
name, ext = os.path.splitext(original_filename)
filename = f"{name}_{counter}{ext}"
counter += 1
return filename
def _get_extension_from_mime(self, mime_type: str) -> str:
"""Get file extension from MIME type."""
mime_to_ext = {
'image/jpeg': '.jpg',
'image/jpg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'image/bmp': '.bmp',
'image/tiff': '.tiff',
'image/svg+xml': '.svg'
}
return mime_to_ext.get(mime_type.lower(), '.jpg')
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename by removing invalid characters."""
# Remove or replace invalid characters
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
# Remove leading/trailing spaces and dots
filename = filename.strip('. ')
# Ensure filename is not empty
if not filename:
filename = 'image'
return filename
async def download_asset(self, session: aiohttp.ClientSession, asset: Dict[str, Any],
semaphore: asyncio.Semaphore) -> bool:
"""Download a single asset."""
async with semaphore:
try:
download_url = self.get_download_url(asset)
filename = self.get_filename(asset, download_url)
filepath = self.output_dir / filename
# Check if file already exists and we're not tracking assets
if filepath.exists() and not self.asset_tracker:
self.logger.info(f"Skipping {filename} (already exists)")
self.stats['skipped'] += 1
return True
self.logger.info(f"Downloading {filename} from {download_url}")
headers = self.config.get('headers', {})
async with session.get(download_url, headers=headers, timeout=self.config['timeout']) as response:
response.raise_for_status()
# Get content type to verify it's an image
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
self.logger.warning(f"Content type is not an image: {content_type}")
# Download the file
async with aiofiles.open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
# Set file modification time to match the updated timestamp
if 'updated' in asset:
try:
from datetime import datetime
import os
# Parse the ISO timestamp
updated_time = datetime.fromisoformat(asset['updated'].replace('Z', '+00:00'))
# Set file modification time
os.utime(filepath, (updated_time.timestamp(), updated_time.timestamp()))
self.logger.info(f"Set file modification time to {asset['updated']}")
except Exception as e:
self.logger.warning(f"Failed to set file modification time: {e}")
# Mark asset as downloaded in tracker
if self.asset_tracker:
self.asset_tracker.mark_asset_downloaded(asset, filepath, True)
self.logger.info(f"Successfully downloaded {filename}")
self.stats['successful'] += 1
return True
except Exception as e:
# Mark asset as failed in tracker
if self.asset_tracker:
download_url = self.get_download_url(asset)
filename = self.get_filename(asset, download_url)
filepath = self.output_dir / filename
self.asset_tracker.mark_asset_downloaded(asset, filepath, False)
self.logger.error(f"Failed to download asset {asset.get('id', 'unknown')}: {e}")
self.stats['failed'] += 1
return False
async def download_all_assets(self, force_redownload: bool = False):
"""
Download all assets from the API.
Args:
force_redownload: If True, download all assets regardless of tracking
"""
start_time = time.time()
# Create aiohttp session with connection pooling
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
timeout = aiohttp.ClientTimeout(total=self.config['timeout'])
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
try:
# Perform authentication if needed
await self.authenticate()
# Get asset list
all_assets = await self.get_asset_list(session)
self.logger.info(f"Retrieved {len(all_assets)} total assets from API")
if not all_assets:
self.logger.warning("No assets found to download")
return
# Filter for new/modified assets if tracking is enabled
if self.asset_tracker and not force_redownload:
assets = self.asset_tracker.get_new_assets(all_assets)
self.logger.info(f"Found {len(assets)} new/modified assets to download")
if len(assets) == 0:
self.logger.info("All assets are up to date!")
return
else:
assets = all_assets
if force_redownload:
self.logger.info("Force redownload enabled - downloading all assets")
self.stats['total'] = len(assets)
# Create semaphore to limit concurrent downloads
semaphore = asyncio.Semaphore(self.config['max_concurrent'])
# Create tasks for all downloads
tasks = [
self.download_asset(session, asset, semaphore)
for asset in assets
]
# Download all assets with progress bar
with tqdm(total=len(tasks), desc="Downloading assets") as pbar:
for coro in asyncio.as_completed(tasks):
result = await coro
pbar.update(1)
pbar.set_postfix({
'Success': self.stats['successful'],
'Failed': self.stats['failed'],
'Skipped': self.stats['skipped']
})
except Exception as e:
self.logger.error(f"Error during download process: {e}")
raise
# Print final statistics
elapsed_time = time.time() - start_time
self.logger.info(f"Download completed in {elapsed_time:.2f} seconds")
self.logger.info(f"Statistics: {self.stats}")
def main():
parser = argparse.ArgumentParser(
description="Download images using configuration file",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python config_downloader.py --config config.json
# Create a config file first:
cp config_example.json my_config.json
# Edit my_config.json with your API details
python config_downloader.py --config my_config.json
"""
)
parser.add_argument(
'--config',
required=True,
help='Path to the JSON configuration file'
)
parser.add_argument(
'--force-redownload',
action='store_true',
help='Force re-download of all assets, even if already tracked'
)
parser.add_argument(
'--show-stats',
action='store_true',
help='Show asset tracking statistics and exit'
)
parser.add_argument(
'--cleanup',
action='store_true',
help='Clean up metadata for missing files and exit'
)
args = parser.parse_args()
# Handle special commands first
if args.show_stats or args.cleanup:
try:
downloader = ConfigImageDownloader(args.config)
if downloader.asset_tracker:
if args.cleanup:
downloader.asset_tracker.cleanup_missing_files()
if args.show_stats:
downloader.asset_tracker.print_stats()
else:
print("Asset tracking is not available")
except Exception as e:
print(f"Error: {e}")
return 1
return 0
try:
downloader = ConfigImageDownloader(args.config)
asyncio.run(downloader.download_all_assets(force_redownload=args.force_redownload))
except KeyboardInterrupt:
print("\nDownload interrupted by user")
except Exception as e:
print(f"Error: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())