230 lines
8.3 KiB
Python
230 lines
8.3 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Authentication Manager for ParentZone API
|
||
|
|
|
||
|
|
This module handles authentication against the ParentZone login API
|
||
|
|
and manages session tokens for API requests.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import aiohttp
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Optional, Dict, Any
|
||
|
|
from urllib.parse import urljoin
|
||
|
|
|
||
|
|
|
||
|
|
class AuthManager:
|
||
|
|
def __init__(self, api_url: str = "https://api.parentzone.me"):
|
||
|
|
"""
|
||
|
|
Initialize the authentication manager.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
api_url: Base URL of the API
|
||
|
|
"""
|
||
|
|
self.api_url = api_url.rstrip('/')
|
||
|
|
self.login_url = urljoin(self.api_url, "/v1/auth/login")
|
||
|
|
self.create_session_url = urljoin(self.api_url, "/v1/auth/create-session")
|
||
|
|
self.session_token: Optional[str] = None
|
||
|
|
self.api_key: Optional[str] = None
|
||
|
|
self.user_id: Optional[str] = None
|
||
|
|
self.user_name: Optional[str] = None
|
||
|
|
self.provider_name: Optional[str] = None
|
||
|
|
self.logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
# Standard headers for login requests
|
||
|
|
self.headers = {
|
||
|
|
'accept': 'application/json, text/plain, */*',
|
||
|
|
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,ro;q=0.7',
|
||
|
|
'content-type': 'application/json;charset=UTF-8',
|
||
|
|
'origin': 'https://www.parentzone.me',
|
||
|
|
'priority': 'u=1, i',
|
||
|
|
'sec-ch-ua': '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
|
||
|
|
'sec-ch-ua-mobile': '?0',
|
||
|
|
'sec-ch-ua-platform': '"macOS"',
|
||
|
|
'sec-fetch-dest': 'empty',
|
||
|
|
'sec-fetch-mode': 'cors',
|
||
|
|
'sec-fetch-site': 'same-site',
|
||
|
|
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36'
|
||
|
|
}
|
||
|
|
|
||
|
|
async def login(self, email: str, password: str) -> bool:
|
||
|
|
"""
|
||
|
|
Login to the ParentZone API using two-step authentication.
|
||
|
|
Step 1: Login with email/password to get user accounts
|
||
|
|
Step 2: Create session with first account ID and password to get API key
|
||
|
|
|
||
|
|
Args:
|
||
|
|
email: User email
|
||
|
|
password: User password
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if login successful, False otherwise
|
||
|
|
"""
|
||
|
|
self.logger.info(f"Attempting login for {email}")
|
||
|
|
|
||
|
|
# Step 1: Login to get user accounts
|
||
|
|
login_data = {
|
||
|
|
"email": email,
|
||
|
|
"password": password
|
||
|
|
}
|
||
|
|
|
||
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
||
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||
|
|
try:
|
||
|
|
async with session.post(
|
||
|
|
self.login_url,
|
||
|
|
headers=self.headers,
|
||
|
|
json=login_data
|
||
|
|
) as response:
|
||
|
|
self.logger.info(f"Login response status: {response.status}")
|
||
|
|
|
||
|
|
if response.status == 200:
|
||
|
|
data = await response.json()
|
||
|
|
self.logger.info("Login successful")
|
||
|
|
self.logger.debug(f"Response data type: {type(data)}")
|
||
|
|
self.logger.debug(f"Full response data: {data}")
|
||
|
|
|
||
|
|
# Handle list response with user accounts
|
||
|
|
if isinstance(data, list) and len(data) > 0:
|
||
|
|
# Use the first account
|
||
|
|
first_account = data[0]
|
||
|
|
self.user_id = first_account.get('id')
|
||
|
|
self.user_name = first_account.get('name')
|
||
|
|
self.provider_name = first_account.get('providerName')
|
||
|
|
|
||
|
|
self.logger.info(f"Selected account: {self.user_name} at {self.provider_name} (ID: {self.user_id})")
|
||
|
|
|
||
|
|
# Step 2: Create session with the account ID
|
||
|
|
return await self._create_session(password)
|
||
|
|
else:
|
||
|
|
self.logger.error(f"Unexpected login response format: {data}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
error_text = await response.text()
|
||
|
|
self.logger.error(f"Login failed with status {response.status}: {error_text}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Login request failed: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def _create_session(self, password: str) -> bool:
|
||
|
|
"""
|
||
|
|
Create a session using the user ID from login.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
password: User password
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if session creation successful, False otherwise
|
||
|
|
"""
|
||
|
|
if not self.user_id:
|
||
|
|
self.logger.error("No user ID available for session creation")
|
||
|
|
return False
|
||
|
|
|
||
|
|
self.logger.info(f"Creating session for user ID: {self.user_id}")
|
||
|
|
|
||
|
|
session_data = {
|
||
|
|
"id": self.user_id,
|
||
|
|
"password": password
|
||
|
|
}
|
||
|
|
|
||
|
|
# Add x-api-product header for session creation
|
||
|
|
session_headers = self.headers.copy()
|
||
|
|
session_headers['x-api-product'] = 'iConnect'
|
||
|
|
|
||
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
||
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||
|
|
try:
|
||
|
|
async with session.post(
|
||
|
|
self.create_session_url,
|
||
|
|
headers=session_headers,
|
||
|
|
json=session_data
|
||
|
|
) as response:
|
||
|
|
self.logger.info(f"Create session response status: {response.status}")
|
||
|
|
|
||
|
|
if response.status == 200:
|
||
|
|
data = await response.json()
|
||
|
|
self.logger.info("Session creation successful")
|
||
|
|
self.logger.debug(f"Session response data: {data}")
|
||
|
|
|
||
|
|
# Extract API key from response
|
||
|
|
if isinstance(data, dict) and 'key' in data:
|
||
|
|
self.api_key = data['key']
|
||
|
|
self.logger.info("API key obtained successfully")
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
self.logger.error(f"No 'key' field in session response: {data}")
|
||
|
|
return False
|
||
|
|
else:
|
||
|
|
error_text = await response.text()
|
||
|
|
self.logger.error(f"Session creation failed with status {response.status}: {error_text}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
self.logger.error(f"Session creation request failed: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def get_auth_headers(self) -> Dict[str, str]:
|
||
|
|
"""
|
||
|
|
Get headers with authentication token.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dictionary of headers including authentication
|
||
|
|
"""
|
||
|
|
headers = self.headers.copy()
|
||
|
|
|
||
|
|
if self.api_key:
|
||
|
|
# Use x-api-key header for authenticated requests
|
||
|
|
headers['x-api-key'] = self.api_key
|
||
|
|
headers['x-api-product'] = 'iConnect'
|
||
|
|
|
||
|
|
return headers
|
||
|
|
|
||
|
|
def is_authenticated(self) -> bool:
|
||
|
|
"""
|
||
|
|
Check if currently authenticated.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if authenticated, False otherwise
|
||
|
|
"""
|
||
|
|
return self.api_key is not None
|
||
|
|
|
||
|
|
def logout(self):
|
||
|
|
"""Clear the session data."""
|
||
|
|
self.api_key = None
|
||
|
|
self.session_token = None
|
||
|
|
self.user_id = None
|
||
|
|
self.user_name = None
|
||
|
|
self.provider_name = None
|
||
|
|
self.logger.info("Logged out - session data cleared")
|
||
|
|
|
||
|
|
|
||
|
|
async def test_login():
|
||
|
|
"""Test the login functionality."""
|
||
|
|
auth_manager = AuthManager()
|
||
|
|
|
||
|
|
# Test credentials (replace with actual credentials)
|
||
|
|
email = "tudor.sitaru@gmail.com"
|
||
|
|
password = "mTVq8uNUvY7R39EPGVAm@"
|
||
|
|
|
||
|
|
print("Testing ParentZone Login...")
|
||
|
|
success = await auth_manager.login(email, password)
|
||
|
|
|
||
|
|
if success:
|
||
|
|
print("✅ Login successful!")
|
||
|
|
print(f"User: {auth_manager.user_name} at {auth_manager.provider_name}")
|
||
|
|
print(f"User ID: {auth_manager.user_id}")
|
||
|
|
print(f"API Key: {auth_manager.api_key[:20]}..." if auth_manager.api_key else "No API key found")
|
||
|
|
|
||
|
|
# Test getting auth headers
|
||
|
|
headers = auth_manager.get_auth_headers()
|
||
|
|
print(f"Auth headers: {list(headers.keys())}")
|
||
|
|
else:
|
||
|
|
print("❌ Login failed!")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(test_login())
|