first commit

This commit is contained in:
Tudor Sitaru
2025-10-07 14:52:04 +01:00
commit ddde67ca62
73 changed files with 14025 additions and 0 deletions

229
auth_manager.py Normal file
View File

@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
Authentication Manager for ParentZone API
This module handles authentication against the ParentZone login API
and manages session tokens for API requests.
"""
import asyncio
import aiohttp
import json
import logging
from typing import Optional, Dict, Any
from urllib.parse import urljoin
class AuthManager:
def __init__(self, api_url: str = "https://api.parentzone.me"):
"""
Initialize the authentication manager.
Args:
api_url: Base URL of the API
"""
self.api_url = api_url.rstrip('/')
self.login_url = urljoin(self.api_url, "/v1/auth/login")
self.create_session_url = urljoin(self.api_url, "/v1/auth/create-session")
self.session_token: Optional[str] = None
self.api_key: Optional[str] = None
self.user_id: Optional[str] = None
self.user_name: Optional[str] = None
self.provider_name: Optional[str] = None
self.logger = logging.getLogger(__name__)
# Standard headers for login requests
self.headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,ro;q=0.7',
'content-type': 'application/json;charset=UTF-8',
'origin': 'https://www.parentzone.me',
'priority': 'u=1, i',
'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
}
async def login(self, email: str, password: str) -> bool:
"""
Login to the ParentZone API using two-step authentication.
Step 1: Login with email/password to get user accounts
Step 2: Create session with first account ID and password to get API key
Args:
email: User email
password: User password
Returns:
True if login successful, False otherwise
"""
self.logger.info(f"Attempting login for {email}")
# Step 1: Login to get user accounts
login_data = {
"email": email,
"password": password
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(
self.login_url,
headers=self.headers,
json=login_data
) as response:
self.logger.info(f"Login response status: {response.status}")
if response.status == 200:
data = await response.json()
self.logger.info("Login successful")
self.logger.debug(f"Response data type: {type(data)}")
self.logger.debug(f"Full response data: {data}")
# Handle list response with user accounts
if isinstance(data, list) and len(data) > 0:
# Use the first account
first_account = data[0]
self.user_id = first_account.get('id')
self.user_name = first_account.get('name')
self.provider_name = first_account.get('providerName')
self.logger.info(f"Selected account: {self.user_name} at {self.provider_name} (ID: {self.user_id})")
# Step 2: Create session with the account ID
return await self._create_session(password)
else:
self.logger.error(f"Unexpected login response format: {data}")
return False
else:
error_text = await response.text()
self.logger.error(f"Login failed with status {response.status}: {error_text}")
return False
except Exception as e:
self.logger.error(f"Login request failed: {e}")
return False
async def _create_session(self, password: str) -> bool:
"""
Create a session using the user ID from login.
Args:
password: User password
Returns:
True if session creation successful, False otherwise
"""
if not self.user_id:
self.logger.error("No user ID available for session creation")
return False
self.logger.info(f"Creating session for user ID: {self.user_id}")
session_data = {
"id": self.user_id,
"password": password
}
# Add x-api-product header for session creation
session_headers = self.headers.copy()
session_headers['x-api-product'] = 'iConnect'
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(
self.create_session_url,
headers=session_headers,
json=session_data
) as response:
self.logger.info(f"Create session response status: {response.status}")
if response.status == 200:
data = await response.json()
self.logger.info("Session creation successful")
self.logger.debug(f"Session response data: {data}")
# Extract API key from response
if isinstance(data, dict) and 'key' in data:
self.api_key = data['key']
self.logger.info("API key obtained successfully")
return True
else:
self.logger.error(f"No 'key' field in session response: {data}")
return False
else:
error_text = await response.text()
self.logger.error(f"Session creation failed with status {response.status}: {error_text}")
return False
except Exception as e:
self.logger.error(f"Session creation request failed: {e}")
return False
def get_auth_headers(self) -> Dict[str, str]:
"""
Get headers with authentication token.
Returns:
Dictionary of headers including authentication
"""
headers = self.headers.copy()
if self.api_key:
# Use x-api-key header for authenticated requests
headers['x-api-key'] = self.api_key
headers['x-api-product'] = 'iConnect'
return headers
def is_authenticated(self) -> bool:
"""
Check if currently authenticated.
Returns:
True if authenticated, False otherwise
"""
return self.api_key is not None
def logout(self):
"""Clear the session data."""
self.api_key = None
self.session_token = None
self.user_id = None
self.user_name = None
self.provider_name = None
self.logger.info("Logged out - session data cleared")
async def test_login():
"""Test the login functionality."""
auth_manager = AuthManager()
# Test credentials (replace with actual credentials)
email = "tudor.sitaru@gmail.com"
password = "mTVq8uNUvY7R39EPGVAm@"
print("Testing ParentZone Login...")
success = await auth_manager.login(email, password)
if success:
print("✅ Login successful!")
print(f"User: {auth_manager.user_name} at {auth_manager.provider_name}")
print(f"User ID: {auth_manager.user_id}")
print(f"API Key: {auth_manager.api_key[:20]}..." if auth_manager.api_key else "No API key found")
# Test getting auth headers
headers = auth_manager.get_auth_headers()
print(f"Auth headers: {list(headers.keys())}")
else:
print("❌ Login failed!")
if __name__ == "__main__":
asyncio.run(test_login())