This commit is contained in:
@@ -25,6 +25,7 @@ try:
|
|||||||
from .image_downloader import ImageDownloader
|
from .image_downloader import ImageDownloader
|
||||||
from .snapshot_downloader import SnapshotDownloader
|
from .snapshot_downloader import SnapshotDownloader
|
||||||
from .webserver import SnapshotsWebServer
|
from .webserver import SnapshotsWebServer
|
||||||
|
from .utils import sanitize_filename, get_extension_from_mime
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"AssetTracker",
|
"AssetTracker",
|
||||||
@@ -34,6 +35,8 @@ try:
|
|||||||
"ImageDownloader",
|
"ImageDownloader",
|
||||||
"SnapshotDownloader",
|
"SnapshotDownloader",
|
||||||
"SnapshotsWebServer",
|
"SnapshotsWebServer",
|
||||||
|
"sanitize_filename",
|
||||||
|
"get_extension_from_mime",
|
||||||
]
|
]
|
||||||
|
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import logging
|
|||||||
import os
|
import os
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Set, Any, Optional
|
from typing import Dict, List, Any
|
||||||
import hashlib
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ from typing import List, Dict, Any, Optional
|
|||||||
import time
|
import time
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
# Import the auth manager and asset tracker
|
# Import the auth manager, asset tracker, and utilities
|
||||||
try:
|
try:
|
||||||
from src.auth_manager import AuthManager
|
from src.auth_manager import AuthManager
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -33,6 +33,24 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
AssetTracker = None
|
AssetTracker = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.utils import sanitize_filename, get_extension_from_mime
|
||||||
|
except ImportError:
|
||||||
|
# Fallback implementations if utils not available
|
||||||
|
def sanitize_filename(filename: str) -> str:
|
||||||
|
invalid_chars = '<>:"/\\|?*'
|
||||||
|
for char in invalid_chars:
|
||||||
|
filename = filename.replace(char, "_")
|
||||||
|
filename = filename.strip(". ")
|
||||||
|
return filename if filename else "file"
|
||||||
|
|
||||||
|
def get_extension_from_mime(mime_type: str) -> str:
|
||||||
|
mime_to_ext = {
|
||||||
|
"image/jpeg": ".jpg", "image/jpg": ".jpg", "image/png": ".png",
|
||||||
|
"image/gif": ".gif", "image/webp": ".webp",
|
||||||
|
}
|
||||||
|
return mime_to_ext.get(mime_type.lower(), ".jpg")
|
||||||
|
|
||||||
|
|
||||||
class ConfigImageDownloader:
|
class ConfigImageDownloader:
|
||||||
def __init__(self, config_file: str):
|
def __init__(self, config_file: str):
|
||||||
@@ -233,15 +251,15 @@ class ConfigImageDownloader:
|
|||||||
# If no extension, try to get it from content-type or add default
|
# If no extension, try to get it from content-type or add default
|
||||||
if "." not in filename:
|
if "." not in filename:
|
||||||
if "mimeType" in asset:
|
if "mimeType" in asset:
|
||||||
ext = self._get_extension_from_mime(asset["mimeType"])
|
ext = get_extension_from_mime(asset["mimeType"])
|
||||||
elif "content_type" in asset:
|
elif "content_type" in asset:
|
||||||
ext = self._get_extension_from_mime(asset["content_type"])
|
ext = get_extension_from_mime(asset["content_type"])
|
||||||
else:
|
else:
|
||||||
ext = ".jpg" # Default extension
|
ext = ".jpg" # Default extension
|
||||||
filename += ext
|
filename += ext
|
||||||
|
|
||||||
# Sanitize filename
|
# Sanitize filename
|
||||||
filename = self._sanitize_filename(filename)
|
filename = sanitize_filename(filename)
|
||||||
|
|
||||||
# Ensure unique filename
|
# Ensure unique filename
|
||||||
counter = 1
|
counter = 1
|
||||||
@@ -253,35 +271,6 @@ class ConfigImageDownloader:
|
|||||||
|
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
def _get_extension_from_mime(self, mime_type: str) -> str:
|
|
||||||
"""Get file extension from MIME type."""
|
|
||||||
mime_to_ext = {
|
|
||||||
"image/jpeg": ".jpg",
|
|
||||||
"image/jpg": ".jpg",
|
|
||||||
"image/png": ".png",
|
|
||||||
"image/gif": ".gif",
|
|
||||||
"image/webp": ".webp",
|
|
||||||
"image/bmp": ".bmp",
|
|
||||||
"image/tiff": ".tiff",
|
|
||||||
"image/svg+xml": ".svg",
|
|
||||||
}
|
|
||||||
return mime_to_ext.get(mime_type.lower(), ".jpg")
|
|
||||||
|
|
||||||
def _sanitize_filename(self, filename: str) -> str:
|
|
||||||
"""Sanitize filename by removing invalid characters."""
|
|
||||||
# Remove or replace invalid characters
|
|
||||||
invalid_chars = '<>:"/\\|?*'
|
|
||||||
for char in invalid_chars:
|
|
||||||
filename = filename.replace(char, "_")
|
|
||||||
|
|
||||||
# Remove leading/trailing spaces and dots
|
|
||||||
filename = filename.strip(". ")
|
|
||||||
|
|
||||||
# Ensure filename is not empty
|
|
||||||
if not filename:
|
|
||||||
filename = "image"
|
|
||||||
|
|
||||||
return filename
|
|
||||||
|
|
||||||
async def download_asset(
|
async def download_asset(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import argparse
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ import aiofiles
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
|
|
||||||
# Import the auth manager and asset tracker
|
# Import the auth manager, asset tracker, and utilities
|
||||||
try:
|
try:
|
||||||
from src.auth_manager import AuthManager
|
from src.auth_manager import AuthManager
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -34,6 +34,24 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
AssetTracker = None
|
AssetTracker = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.utils import sanitize_filename, get_extension_from_mime
|
||||||
|
except ImportError:
|
||||||
|
# Fallback implementations if utils not available
|
||||||
|
def sanitize_filename(filename: str) -> str:
|
||||||
|
invalid_chars = '<>:"/\\|?*'
|
||||||
|
for char in invalid_chars:
|
||||||
|
filename = filename.replace(char, "_")
|
||||||
|
filename = filename.strip(". ")
|
||||||
|
return filename if filename else "file"
|
||||||
|
|
||||||
|
def get_extension_from_mime(mime_type: str) -> str:
|
||||||
|
mime_to_ext = {
|
||||||
|
"image/jpeg": ".jpg", "image/jpg": ".jpg", "image/png": ".png",
|
||||||
|
"image/gif": ".gif", "image/webp": ".webp",
|
||||||
|
}
|
||||||
|
return mime_to_ext.get(mime_type.lower(), ".jpg")
|
||||||
|
|
||||||
|
|
||||||
class ImageDownloader:
|
class ImageDownloader:
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -241,15 +259,15 @@ class ImageDownloader:
|
|||||||
# If no extension, try to get it from content-type or add default
|
# If no extension, try to get it from content-type or add default
|
||||||
if "." not in filename:
|
if "." not in filename:
|
||||||
if "mimeType" in asset:
|
if "mimeType" in asset:
|
||||||
ext = self._get_extension_from_mime(asset["mimeType"])
|
ext = get_extension_from_mime(asset["mimeType"])
|
||||||
elif "content_type" in asset:
|
elif "content_type" in asset:
|
||||||
ext = self._get_extension_from_mime(asset["content_type"])
|
ext = get_extension_from_mime(asset["content_type"])
|
||||||
else:
|
else:
|
||||||
ext = ".jpg" # Default extension
|
ext = ".jpg" # Default extension
|
||||||
filename += ext
|
filename += ext
|
||||||
|
|
||||||
# Sanitize filename
|
# Sanitize filename
|
||||||
filename = self._sanitize_filename(filename)
|
filename = sanitize_filename(filename)
|
||||||
|
|
||||||
# Ensure unique filename
|
# Ensure unique filename
|
||||||
counter = 1
|
counter = 1
|
||||||
@@ -261,35 +279,6 @@ class ImageDownloader:
|
|||||||
|
|
||||||
return filename
|
return filename
|
||||||
|
|
||||||
def _get_extension_from_mime(self, mime_type: str) -> str:
|
|
||||||
"""Get file extension from MIME type."""
|
|
||||||
mime_to_ext = {
|
|
||||||
"image/jpeg": ".jpg",
|
|
||||||
"image/jpg": ".jpg",
|
|
||||||
"image/png": ".png",
|
|
||||||
"image/gif": ".gif",
|
|
||||||
"image/webp": ".webp",
|
|
||||||
"image/bmp": ".bmp",
|
|
||||||
"image/tiff": ".tiff",
|
|
||||||
"image/svg+xml": ".svg",
|
|
||||||
}
|
|
||||||
return mime_to_ext.get(mime_type.lower(), ".jpg")
|
|
||||||
|
|
||||||
def _sanitize_filename(self, filename: str) -> str:
|
|
||||||
"""Sanitize filename by removing invalid characters."""
|
|
||||||
# Remove or replace invalid characters
|
|
||||||
invalid_chars = '<>:"/\\|?*'
|
|
||||||
for char in invalid_chars:
|
|
||||||
filename = filename.replace(char, "_")
|
|
||||||
|
|
||||||
# Remove leading/trailing spaces and dots
|
|
||||||
filename = filename.strip(". ")
|
|
||||||
|
|
||||||
# Ensure filename is not empty
|
|
||||||
if not filename:
|
|
||||||
filename = "image"
|
|
||||||
|
|
||||||
return filename
|
|
||||||
|
|
||||||
async def download_asset(
|
async def download_asset(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -19,12 +19,23 @@ from urllib.parse import urlencode
|
|||||||
import aiofiles
|
import aiofiles
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
# Import the auth manager
|
# Import the auth manager and utilities
|
||||||
try:
|
try:
|
||||||
from src.auth_manager import AuthManager
|
from src.auth_manager import AuthManager
|
||||||
except ImportError:
|
except ImportError:
|
||||||
AuthManager = None
|
AuthManager = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from src.utils import sanitize_filename
|
||||||
|
except ImportError:
|
||||||
|
# Fallback implementation if utils not available
|
||||||
|
def sanitize_filename(filename: str) -> str:
|
||||||
|
invalid_chars = '<>:"/\\|?*'
|
||||||
|
for char in invalid_chars:
|
||||||
|
filename = filename.replace(char, "_")
|
||||||
|
filename = filename.strip(". ")
|
||||||
|
return filename if filename else "file"
|
||||||
|
|
||||||
|
|
||||||
class SnapshotDownloader:
|
class SnapshotDownloader:
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -509,7 +520,7 @@ class SnapshotDownloader:
|
|||||||
|
|
||||||
filename = media.get("fileName", f"media_{media_id}")
|
filename = media.get("fileName", f"media_{media_id}")
|
||||||
# Sanitize filename
|
# Sanitize filename
|
||||||
filename = self._sanitize_filename(filename)
|
filename = sanitize_filename(filename)
|
||||||
|
|
||||||
# Check if file already exists
|
# Check if file already exists
|
||||||
filepath = self.assets_dir / filename
|
filepath = self.assets_dir / filename
|
||||||
@@ -543,21 +554,6 @@ class SnapshotDownloader:
|
|||||||
self.logger.error(f"Failed to download media {filename}: {e}")
|
self.logger.error(f"Failed to download media {filename}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _sanitize_filename(self, filename: str) -> str:
|
|
||||||
"""Sanitize filename by removing invalid characters."""
|
|
||||||
# Remove or replace invalid characters
|
|
||||||
invalid_chars = '<>:"/\\|?*'
|
|
||||||
for char in invalid_chars:
|
|
||||||
filename = filename.replace(char, "_")
|
|
||||||
|
|
||||||
# Remove leading/trailing spaces and dots
|
|
||||||
filename = filename.strip(". ")
|
|
||||||
|
|
||||||
# Ensure filename is not empty
|
|
||||||
if not filename:
|
|
||||||
filename = "media_file"
|
|
||||||
|
|
||||||
return filename
|
|
||||||
|
|
||||||
async def generate_html_file(
|
async def generate_html_file(
|
||||||
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
|
self, snapshots: List[Dict[str, Any]], date_from: str, date_to: str
|
||||||
|
|||||||
55
src/utils.py
Normal file
55
src/utils.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Utility functions for ParentZone Downloader
|
||||||
|
|
||||||
|
This module contains shared utility functions used across multiple modules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_filename(filename: str) -> str:
|
||||||
|
"""
|
||||||
|
Sanitize filename by removing invalid characters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: The filename to sanitize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Sanitized filename safe for filesystem use
|
||||||
|
"""
|
||||||
|
# Remove or replace invalid characters
|
||||||
|
invalid_chars = '<>:"/\\|?*'
|
||||||
|
for char in invalid_chars:
|
||||||
|
filename = filename.replace(char, "_")
|
||||||
|
|
||||||
|
# Remove leading/trailing spaces and dots
|
||||||
|
filename = filename.strip(". ")
|
||||||
|
|
||||||
|
# Ensure filename is not empty
|
||||||
|
if not filename:
|
||||||
|
filename = "file"
|
||||||
|
|
||||||
|
return filename
|
||||||
|
|
||||||
|
|
||||||
|
def get_extension_from_mime(mime_type: str) -> str:
|
||||||
|
"""
|
||||||
|
Get file extension from MIME type.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mime_type: The MIME type string (e.g., 'image/jpeg')
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
File extension including the dot (e.g., '.jpg')
|
||||||
|
"""
|
||||||
|
mime_to_ext = {
|
||||||
|
"image/jpeg": ".jpg",
|
||||||
|
"image/jpg": ".jpg",
|
||||||
|
"image/png": ".png",
|
||||||
|
"image/gif": ".gif",
|
||||||
|
"image/webp": ".webp",
|
||||||
|
"image/bmp": ".bmp",
|
||||||
|
"image/tiff": ".tiff",
|
||||||
|
"image/svg+xml": ".svg",
|
||||||
|
}
|
||||||
|
return mime_to_ext.get(mime_type.lower(), ".jpg")
|
||||||
|
|
||||||
@@ -14,9 +14,7 @@ from pathlib import Path
|
|||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import aiohttp
|
from aiohttp import web
|
||||||
from aiohttp import web, hdrs
|
|
||||||
from aiohttp.web_response import Response
|
|
||||||
|
|
||||||
|
|
||||||
class SnapshotsWebServer:
|
class SnapshotsWebServer:
|
||||||
|
|||||||
Reference in New Issue
Block a user