Source code for upstox_totp._api.app_token

"""App token API."""

from __future__ import annotations

import base64
from typing import Any
from urllib.parse import parse_qs, urlparse

from curl_cffi.requests.models import Response

from upstox_totp._api.base import BaseAPI
from upstox_totp.errors import UpstoxError, ValidationError
from upstox_totp.logging import logger
from upstox_totp.models import (
    AccessTokenResponse,
    OAuthAuthorizationResponse,
    OTPGenerationResponse,
    OTPValidationResponse,
    ResponseBase,
    TwoFactorAuthenticationResponse,
    UserIdAndUserType,
)


[docs] class AppTokenAPI(BaseAPI): """App token API.""" client_id: str | None = None
[docs] def get_user_id_and_user_type(self) -> UserIdAndUserType: """Get user id and user type.""" url: str = self.client.base_domain.format(stage="api") + "/v2/login/authorization/dialog" params: dict[str, Any] = { "response_type": "code", "client_id": self.client.config.client_id, "redirect_uri": self.client.config.redirect_uri, } logger.debug(f"Making authorization dialog request to: {url}") logger.debug(f"Request params: {params}") response = self._request( method="GET", url=url, params=params, allow_redirects=True, ) if isinstance(response, Response): redirect_url = response.url # Raw Response object else: if isinstance(response, ResponseBase): response_data = {"success": response.success, "data": response.data, "error": response.error} if not response.success or (response.data and isinstance(response.data, dict) and response.data.get("status") == "error"): raise UpstoxError.from_response(response_data) else: raise ValidationError(f"API returned JSON response instead of redirect. This suggests authentication failure or incorrect configuration. Response details: {response_data}") else: raise ValidationError(f"Unexpected response type: {type(response)}. Expected raw Response with redirect URL.") parsed_url = urlparse(redirect_url) params = parse_qs(parsed_url.query) # Extract required parameters with proper error handling user_id_list = params.get("user_id") client_id_list = params.get("client_id") user_type_list = params.get("user_type") if not user_id_list or not client_id_list or not user_type_list: raise ValidationError(f"Missing required parameters in redirect URL. Got params: {params}") payload = { "user_id": user_id_list[0], "client_id": client_id_list[0], "user_type": user_type_list[0], } self.client_id = client_id_list[0] return UserIdAndUserType.model_validate(payload)
[docs] def generate_otp(self) -> OTPGenerationResponse: """Generate OTP.""" url: str = self.client.base_domain.format(stage="service") + "/login/open/v6/auth/1fa/otp/generate" user_id_and_user_type: UserIdAndUserType = self.get_user_id_and_user_type() json_payload: dict[str, Any] = { "data": { "mobileNumber": self.client.config.username, "userId": user_id_and_user_type.user_id, } } response: OTPGenerationResponse = self._request( # pyright: ignore[reportAssignmentType] method="POST", url=url, json=json_payload, model=OTPGenerationResponse, ) return response
[docs] def validate_otp(self) -> OTPValidationResponse: """Validate OTP.""" otp_generation_response: OTPGenerationResponse = self.generate_otp() totp_secret: str = self.client.generate_totp_secret() url: str = self.client.base_domain.format(stage="service") + "/login/open/v4/auth/1fa/otp-totp/verify" if otp_generation_response.data is None: raise ValidationError("Failed to generate OTP - response data is None") json_payload: dict[str, Any] = { "data": { "otp": totp_secret, "validateOtpToken": otp_generation_response.data.validateOTPToken, } } response: OTPValidationResponse = self._request( # pyright: ignore[reportAssignmentType] method="POST", url=url, json=json_payload, model=OTPValidationResponse, ) return response
[docs] def submit_pin(self) -> TwoFactorAuthenticationResponse: """Submit PIN for 2FA.""" url: str = self.client.base_domain.format(stage="service") + "/login/open/v3/auth/2fa" _ = self.validate_otp() pin_encoded = base64.b64encode(self.client.config.pin_code.get_secret_value().encode()).decode() params: dict[str, Any] = { "client_id": self.client_id, "redirect_uri": self.client.config.redirect_uri_upstox, } json_payload: dict[str, Any] = { "data": { "twoFAMethod": "SECRET_PIN", "inputText": pin_encoded, } } response: TwoFactorAuthenticationResponse = self._request( # pyright: ignore[reportAssignmentType] method="POST", url=url, params=params, json=json_payload, model=TwoFactorAuthenticationResponse, allow_redirects=True, ) return response
[docs] def oauth_authorization(self) -> OAuthAuthorizationResponse: """Two factor authentication.""" _ = self.submit_pin() url: str = self.client.base_domain.format(stage="service") + "/login/v2/oauth/authorize" params: dict[str, Any] = { "client_id": self.client_id, "redirect_uri": self.client.config.redirect_uri_upstox, "requestId": self.client.generate_request_id(), "response_type": "code", } json_payload: dict[str, Any] = { "data": { "userOAuthApproval": True, } } response: OAuthAuthorizationResponse = self._request( # pyright: ignore[reportAssignmentType] method="POST", url=url, params=params, json=json_payload, model=OAuthAuthorizationResponse, allow_redirects=True, ) return response
[docs] def get_access_token(self) -> AccessTokenResponse: """Get access token.""" url: str = self.client.base_domain.format(stage="api") + "/v2/login/authorization/token" oauth_response: OAuthAuthorizationResponse = self.oauth_authorization() if oauth_response.data is None: raise ValidationError("OAuth response missing data") parsed = urlparse(oauth_response.data.redirectUri) params = parse_qs(parsed.query) code_list = params.get("code") if not code_list: raise ValidationError(f"Authorization code not found in redirect URI. Got params: {params}") code = code_list[0] data = f"code={code}&client_id={self.client.config.client_id}&client_secret={self.client.config.client_secret.get_secret_value()}&redirect_uri={self.client.config.redirect_uri}&grant_type=authorization_code" self.client.reset_session() headers: dict[str, str] = { "accept": "application/json", "content-type": "application/x-www-form-urlencoded", } response: AccessTokenResponse = self._request( # pyright: ignore[reportAssignmentType] method="POST", url=url, data=data, headers=headers, model=AccessTokenResponse, ) return response