Files
parentzone_downloader/webserver.py
Tudor Sitaru e17d69c308
All checks were successful
Build Docker Image / build (push) Successful in 1m20s
adding webserver
2025-10-10 16:55:12 +01:00

504 lines
15 KiB
Python

#!/usr/bin/env python3
"""
ParentZone Snapshots Web Server
A simple web server that serves HTML snapshot files and their assets.
Provides a directory listing and serves static files from the snapshots folder.
"""
import os
import asyncio
import argparse
import logging
from pathlib import Path
from urllib.parse import unquote
from datetime import datetime
import aiohttp
from aiohttp import web, hdrs
from aiohttp.web_response import Response
class SnapshotsWebServer:
def __init__(
self,
snapshots_dir: str = "./snapshots",
port: int = 8080,
host: str = "0.0.0.0",
):
self.snapshots_dir = Path(snapshots_dir).resolve()
self.port = port
self.host = host
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
self.logger = logging.getLogger(__name__)
# Ensure snapshots directory exists
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Serving snapshots from: {self.snapshots_dir}")
async def index_handler(self, request):
"""Serve the main directory listing page."""
try:
html_files = []
# Find all HTML files in the snapshots directory
for file_path in self.snapshots_dir.glob("*.html"):
stat = file_path.stat()
html_files.append(
{
"name": file_path.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime),
"path": file_path.name,
}
)
# Sort by modification time (newest first)
html_files.sort(key=lambda x: x["modified"], reverse=True)
# Generate HTML page
html_content = self._generate_index_html(html_files)
return web.Response(text=html_content, content_type="text/html")
except Exception as e:
self.logger.error(f"Error generating index: {e}")
return web.Response(
text=f"<h1>Error</h1><p>Could not generate directory listing: {e}</p>",
status=500,
content_type="text/html",
)
def _generate_index_html(self, html_files):
"""Generate the HTML directory listing page."""
files_list = ""
if not html_files:
files_list = "<p class='no-files'>No snapshot files found.</p>"
else:
for file_info in html_files:
size_mb = file_info["size"] / (1024 * 1024)
files_list += f"""
<div class="file-item">
<div class="file-info">
<h3><a href="/{file_info["path"]}" class="file-link">{file_info["name"]}</a></h3>
<div class="file-meta">
<span class="file-size">{size_mb:.2f} MB</span>
<span class="file-date">{file_info["modified"].strftime("%Y-%m-%d %H:%M:%S")}</span>
</div>
</div>
</div>
"""
return f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>ParentZone Snapshots</title>
<style>
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.6;
color: #333;
background-color: #f5f5f5;
padding: 20px;
}}
.container {{
max-width: 1000px;
margin: 0 auto;
}}
.header {{
background: white;
padding: 30px;
border-radius: 10px;
margin-bottom: 30px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
text-align: center;
}}
.header h1 {{
color: #2c3e50;
margin-bottom: 10px;
font-size: 2.5em;
}}
.header p {{
color: #7f8c8d;
font-size: 1.1em;
}}
.files-container {{
background: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
overflow: hidden;
}}
.files-header {{
background: #3498db;
color: white;
padding: 20px;
font-size: 1.2em;
font-weight: bold;
}}
.file-item {{
border-bottom: 1px solid #ecf0f1;
padding: 20px;
transition: background-color 0.2s;
}}
.file-item:last-child {{
border-bottom: none;
}}
.file-item:hover {{
background-color: #f8f9fa;
}}
.file-link {{
color: #2c3e50;
text-decoration: none;
font-size: 1.1em;
font-weight: 500;
}}
.file-link:hover {{
color: #3498db;
text-decoration: underline;
}}
.file-meta {{
margin-top: 8px;
display: flex;
gap: 20px;
color: #7f8c8d;
font-size: 0.9em;
}}
.no-files {{
padding: 40px;
text-align: center;
color: #7f8c8d;
font-size: 1.1em;
}}
.footer {{
margin-top: 30px;
text-align: center;
color: #7f8c8d;
font-size: 0.9em;
}}
@media (max-width: 600px) {{
.file-meta {{
flex-direction: column;
gap: 5px;
}}
.header h1 {{
font-size: 2em;
}}
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📸 ParentZone Snapshots</h1>
<p>Browse and view your downloaded snapshot files</p>
</div>
<div class="files-container">
<div class="files-header">
📁 Available Snapshot Files ({len(html_files)} files)
</div>
{files_list}
</div>
<div class="footer">
<p>Served from: {self.snapshots_dir}</p>
<p>Server running on {self.host}:{self.port}</p>
</div>
</div>
</body>
</html>
"""
async def file_handler(self, request):
"""Serve individual HTML files and their assets."""
try:
# Get the requested file path
file_path = unquote(request.match_info["filename"])
requested_file = self.snapshots_dir / file_path
# Security check: ensure the file is within the snapshots directory
try:
requested_file.resolve().relative_to(self.snapshots_dir.resolve())
except ValueError:
self.logger.warning(f"Attempted path traversal: {file_path}")
return web.Response(
text="<h1>403 Forbidden</h1><p>Access denied.</p>",
status=403,
content_type="text/html",
)
# Check if file exists
if not requested_file.exists():
return web.Response(
text="<h1>404 Not Found</h1><p>The requested file was not found.</p>",
status=404,
content_type="text/html",
)
# Determine content type
content_type = self._get_content_type(requested_file)
# Read and serve the file
with open(requested_file, "rb") as f:
content = f.read()
return web.Response(
body=content,
content_type=content_type,
headers={
"Cache-Control": "public, max-age=3600",
"Last-Modified": datetime.fromtimestamp(
requested_file.stat().st_mtime
).strftime("%a, %d %b %Y %H:%M:%S GMT"),
},
)
except Exception as e:
self.logger.error(
f"Error serving file {request.match_info.get('filename', 'unknown')}: {e}"
)
return web.Response(
text=f"<h1>500 Internal Server Error</h1><p>Could not serve file: {e}</p>",
status=500,
content_type="text/html",
)
async def assets_handler(self, request):
"""Serve asset files (images, CSS, JS, etc.) from assets subdirectories."""
try:
# Get the requested asset path
asset_path = unquote(request.match_info["path"])
requested_file = self.snapshots_dir / "assets" / asset_path
# Security check
try:
requested_file.resolve().relative_to(self.snapshots_dir.resolve())
except ValueError:
self.logger.warning(f"Attempted path traversal in assets: {asset_path}")
return web.Response(text="403 Forbidden", status=403)
# Check if file exists
if not requested_file.exists():
return web.Response(text="404 Not Found", status=404)
# Determine content type
content_type = self._get_content_type(requested_file)
# Read and serve the file
with open(requested_file, "rb") as f:
content = f.read()
return web.Response(
body=content,
content_type=content_type,
headers={
"Cache-Control": "public, max-age=86400", # Cache assets for 24 hours
"Last-Modified": datetime.fromtimestamp(
requested_file.stat().st_mtime
).strftime("%a, %d %b %Y %H:%M:%S GMT"),
},
)
except Exception as e:
self.logger.error(
f"Error serving asset {request.match_info.get('path', 'unknown')}: {e}"
)
return web.Response(text="500 Internal Server Error", status=500)
def _get_content_type(self, file_path: Path) -> str:
"""Determine the content type based on file extension."""
suffix = file_path.suffix.lower()
content_types = {
".html": "text/html; charset=utf-8",
".css": "text/css; charset=utf-8",
".js": "application/javascript; charset=utf-8",
".json": "application/json; charset=utf-8",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".svg": "image/svg+xml",
".ico": "image/x-icon",
".pdf": "application/pdf",
".txt": "text/plain; charset=utf-8",
".log": "text/plain; charset=utf-8",
}
return content_types.get(suffix, "application/octet-stream")
def setup_routes(self, app):
"""Configure the web application routes."""
# Main index page
app.router.add_get("/", self.index_handler)
# Serve HTML files directly
app.router.add_get("/{filename:.+\.html}", self.file_handler)
# Serve assets (images, CSS, JS, etc.)
app.router.add_get("/assets/{path:.+}", self.assets_handler)
# Serve other static files (logs, etc.)
app.router.add_get(
"/{filename:.+\.(css|js|json|txt|log|ico)}", self.file_handler
)
async def create_app(self):
"""Create and configure the web application."""
app = web.Application()
# Setup routes
self.setup_routes(app)
# Add middleware for logging
async def logging_middleware(request, handler):
start_time = datetime.now()
try:
response = await handler(request)
# Log the request
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(
f"{request.remote} - {request.method} {request.path} - "
f"{response.status} - {duration:.3f}s"
)
return response
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
self.logger.error(
f"{request.remote} - {request.method} {request.path} - "
f"ERROR: {e} - {duration:.3f}s"
)
raise
app.middlewares.append(logging_middleware)
return app
async def start_server(self):
"""Start the web server."""
app = await self.create_app()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
self.logger.info(f"🚀 ParentZone Snapshots Web Server started!")
self.logger.info(f"📂 Serving files from: {self.snapshots_dir}")
self.logger.info(f"🌐 Server running at: http://{self.host}:{self.port}")
self.logger.info(f"🔗 Open in browser: http://localhost:{self.port}")
self.logger.info("Press Ctrl+C to stop the server")
return runner
def main():
parser = argparse.ArgumentParser(
description="ParentZone Snapshots Web Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Start server with default settings
python webserver.py
# Start server on custom port
python webserver.py --port 3000
# Serve from custom directory
python webserver.py --snapshots-dir /path/to/snapshots
# Start server on all interfaces
python webserver.py --host 0.0.0.0 --port 8080
""",
)
parser.add_argument(
"--snapshots-dir",
default="./snapshots",
help="Directory containing snapshot files (default: ./snapshots)",
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="Port to run the server on (default: 8080)",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Host to bind the server to (default: 0.0.0.0)",
)
args = parser.parse_args()
# Create and start the server
server = SnapshotsWebServer(
snapshots_dir=args.snapshots_dir, port=args.port, host=args.host
)
async def run_server():
runner = None
try:
runner = await server.start_server()
# Keep the server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n👋 Shutting down server...")
except Exception as e:
print(f"❌ Server error: {e}")
finally:
if runner:
await runner.cleanup()
try:
asyncio.run(run_server())
except KeyboardInterrupt:
print("\n✅ Server stopped")
if __name__ == "__main__":
main()