2025-10-07 14:52:04 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Authentication Manager for ParentZone API
|
|
|
|
|
|
|
|
|
|
This module handles authentication against the ParentZone login API
|
|
|
|
|
and manages session tokens for API requests.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import aiohttp
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Optional, Dict, Any
|
|
|
|
|
from urllib.parse import urljoin
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AuthManager:
|
|
|
|
|
def __init__(self, api_url: str = "https://api.parentzone.me"):
|
|
|
|
|
"""
|
|
|
|
|
Initialize the authentication manager.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
api_url: Base URL of the API
|
|
|
|
|
"""
|
2025-11-11 11:28:01 +00:00
|
|
|
self.api_url = api_url.rstrip("/")
|
2025-10-07 14:52:04 +01:00
|
|
|
self.login_url = urljoin(self.api_url, "/v1/auth/login")
|
|
|
|
|
self.create_session_url = urljoin(self.api_url, "/v1/auth/create-session")
|
|
|
|
|
self.session_token: Optional[str] = None
|
|
|
|
|
self.api_key: Optional[str] = None
|
|
|
|
|
self.user_id: Optional[str] = None
|
|
|
|
|
self.user_name: Optional[str] = None
|
|
|
|
|
self.provider_name: Optional[str] = None
|
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
# Standard headers for login requests
|
|
|
|
|
self.headers = {
|
2025-11-11 11:28:01 +00:00
|
|
|
"accept": "application/json, text/plain, */*",
|
|
|
|
|
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8,ro;q=0.7",
|
|
|
|
|
"content-type": "application/json;charset=UTF-8",
|
|
|
|
|
"origin": "https://www.parentzone.me",
|
|
|
|
|
"priority": "u=1, i",
|
|
|
|
|
"sec-ch-ua": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
|
|
|
|
|
"sec-ch-ua-mobile": "?0",
|
|
|
|
|
"sec-ch-ua-platform": '"macOS"',
|
|
|
|
|
"sec-fetch-dest": "empty",
|
|
|
|
|
"sec-fetch-mode": "cors",
|
|
|
|
|
"sec-fetch-site": "same-site",
|
|
|
|
|
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
|
2025-10-07 14:52:04 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async def login(self, email: str, password: str) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Login to the ParentZone API using two-step authentication.
|
|
|
|
|
Step 1: Login with email/password to get user accounts
|
|
|
|
|
Step 2: Create session with first account ID and password to get API key
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
email: User email
|
|
|
|
|
password: User password
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if login successful, False otherwise
|
|
|
|
|
"""
|
|
|
|
|
self.logger.info(f"Attempting login for {email}")
|
|
|
|
|
|
|
|
|
|
# Step 1: Login to get user accounts
|
2025-11-11 11:28:01 +00:00
|
|
|
login_data = {"email": email, "password": password}
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
|
|
|
try:
|
|
|
|
|
async with session.post(
|
2025-11-11 11:28:01 +00:00
|
|
|
self.login_url, headers=self.headers, json=login_data
|
2025-10-07 14:52:04 +01:00
|
|
|
) as response:
|
|
|
|
|
self.logger.info(f"Login response status: {response.status}")
|
|
|
|
|
|
|
|
|
|
if response.status == 200:
|
|
|
|
|
data = await response.json()
|
|
|
|
|
self.logger.info("Login successful")
|
|
|
|
|
self.logger.debug(f"Response data type: {type(data)}")
|
|
|
|
|
self.logger.debug(f"Full response data: {data}")
|
|
|
|
|
|
|
|
|
|
# Handle list response with user accounts
|
|
|
|
|
if isinstance(data, list) and len(data) > 0:
|
|
|
|
|
# Use the first account
|
|
|
|
|
first_account = data[0]
|
2025-11-11 11:28:01 +00:00
|
|
|
self.user_id = first_account.get("id")
|
|
|
|
|
self.user_name = first_account.get("name")
|
|
|
|
|
self.provider_name = first_account.get("providerName")
|
2025-10-07 14:52:04 +01:00
|
|
|
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.info(
|
|
|
|
|
f"Selected account: {self.user_name} at {self.provider_name} (ID: {self.user_id})"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
# Step 2: Create session with the account ID
|
|
|
|
|
return await self._create_session(password)
|
|
|
|
|
else:
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.error(
|
|
|
|
|
f"Unexpected login response format: {data}"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
error_text = await response.text()
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.error(
|
|
|
|
|
f"Login failed with status {response.status}: {error_text}"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(f"Login request failed: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
async def _create_session(self, password: str) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Create a session using the user ID from login.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
password: User password
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if session creation successful, False otherwise
|
|
|
|
|
"""
|
|
|
|
|
if not self.user_id:
|
|
|
|
|
self.logger.error("No user ID available for session creation")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
self.logger.info(f"Creating session for user ID: {self.user_id}")
|
|
|
|
|
|
2025-11-11 11:28:01 +00:00
|
|
|
session_data = {"id": self.user_id, "password": password}
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
# Add x-api-product header for session creation
|
|
|
|
|
session_headers = self.headers.copy()
|
2025-11-11 11:28:01 +00:00
|
|
|
session_headers["x-api-product"] = "iConnect"
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
|
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
|
|
|
try:
|
|
|
|
|
async with session.post(
|
2025-11-11 11:28:01 +00:00
|
|
|
self.create_session_url, headers=session_headers, json=session_data
|
2025-10-07 14:52:04 +01:00
|
|
|
) as response:
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.info(
|
|
|
|
|
f"Create session response status: {response.status}"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
if response.status == 200:
|
|
|
|
|
data = await response.json()
|
|
|
|
|
self.logger.info("Session creation successful")
|
|
|
|
|
self.logger.debug(f"Session response data: {data}")
|
|
|
|
|
|
|
|
|
|
# Extract API key from response
|
2025-11-11 11:28:01 +00:00
|
|
|
if isinstance(data, dict) and "key" in data:
|
|
|
|
|
self.api_key = data["key"]
|
2025-10-07 14:52:04 +01:00
|
|
|
self.logger.info("API key obtained successfully")
|
|
|
|
|
return True
|
|
|
|
|
else:
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.error(
|
|
|
|
|
f"No 'key' field in session response: {data}"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
error_text = await response.text()
|
2025-11-11 11:28:01 +00:00
|
|
|
self.logger.error(
|
|
|
|
|
f"Session creation failed with status {response.status}: {error_text}"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(f"Session creation request failed: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def get_auth_headers(self) -> Dict[str, str]:
|
|
|
|
|
"""
|
|
|
|
|
Get headers with authentication token.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dictionary of headers including authentication
|
|
|
|
|
"""
|
|
|
|
|
headers = self.headers.copy()
|
|
|
|
|
|
|
|
|
|
if self.api_key:
|
|
|
|
|
# Use x-api-key header for authenticated requests
|
2025-11-11 11:28:01 +00:00
|
|
|
headers["x-api-key"] = self.api_key
|
|
|
|
|
headers["x-api-product"] = "iConnect"
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
return headers
|
|
|
|
|
|
|
|
|
|
def is_authenticated(self) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Check if currently authenticated.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if authenticated, False otherwise
|
|
|
|
|
"""
|
|
|
|
|
return self.api_key is not None
|
|
|
|
|
|
|
|
|
|
def logout(self):
|
|
|
|
|
"""Clear the session data."""
|
|
|
|
|
self.api_key = None
|
|
|
|
|
self.session_token = None
|
|
|
|
|
self.user_id = None
|
|
|
|
|
self.user_name = None
|
|
|
|
|
self.provider_name = None
|
|
|
|
|
self.logger.info("Logged out - session data cleared")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def test_login():
|
|
|
|
|
"""Test the login functionality."""
|
|
|
|
|
auth_manager = AuthManager()
|
|
|
|
|
|
|
|
|
|
# Test credentials (replace with actual credentials)
|
|
|
|
|
email = "tudor.sitaru@gmail.com"
|
|
|
|
|
password = "mTVq8uNUvY7R39EPGVAm@"
|
|
|
|
|
|
|
|
|
|
print("Testing ParentZone Login...")
|
|
|
|
|
success = await auth_manager.login(email, password)
|
|
|
|
|
|
|
|
|
|
if success:
|
|
|
|
|
print("✅ Login successful!")
|
|
|
|
|
print(f"User: {auth_manager.user_name} at {auth_manager.provider_name}")
|
|
|
|
|
print(f"User ID: {auth_manager.user_id}")
|
2025-11-11 11:28:01 +00:00
|
|
|
print(
|
|
|
|
|
f"API Key: {auth_manager.api_key[:20]}..."
|
|
|
|
|
if auth_manager.api_key
|
|
|
|
|
else "No API key found"
|
|
|
|
|
)
|
2025-10-07 14:52:04 +01:00
|
|
|
|
|
|
|
|
# Test getting auth headers
|
|
|
|
|
headers = auth_manager.get_auth_headers()
|
|
|
|
|
print(f"Auth headers: {list(headers.keys())}")
|
|
|
|
|
else:
|
|
|
|
|
print("❌ Login failed!")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
asyncio.run(test_login())
|