Integration Examples
This guide shows how to integrate the Upstox TOTP SDK with various frameworks and services.
Web Framework Integration
Flask Integration
from flask import Flask, jsonify, request
from upstox_totp import UpstoxTOTP, UpstoxError
import logging
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
class UpstoxService:
def __init__(self):
self.client = UpstoxTOTP()
def get_access_token(self):
try:
response = self.client.app_token.get_access_token()
if response.success:
return {
'success': True,
'token': response.data.access_token,
'user_id': response.data.user_id,
'user_name': response.data.user_name
}
else:
return {
'success': False,
'error': str(response.error)
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
upstox_service = UpstoxService()
@app.route('/health')
def health_check():
return jsonify({'status': 'healthy', 'service': 'upstox-totp'})
@app.route('/api/token', methods=['POST'])
def generate_token():
try:
result = upstox_service.get_access_token()
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
app.logger.error(f"Token generation failed: {e}")
return jsonify({'success': False, 'error': 'Internal server error'}), 500
@app.route('/api/user/profile')
def get_user_profile():
# Generate token first
token_result = upstox_service.get_access_token()
if not token_result['success']:
return jsonify(token_result), 400
# Use token to fetch profile
import requests
headers = {
'Authorization': f"Bearer {token_result['token']}",
'Content-Type': 'application/json'
}
try:
response = requests.get(
'https://api.upstox.com/v2/user/profile',
headers=headers
)
return jsonify(response.json())
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
FastAPI Integration
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from upstox_totp import UpstoxTOTP, UpstoxError
import logging
from typing import Optional
app = FastAPI(title="Upstox TOTP API", version="1.0.0")
logging.basicConfig(level=logging.INFO)
class TokenResponse(BaseModel):
success: bool
token: Optional[str] = None
user_id: Optional[str] = None
user_name: Optional[str] = None
error: Optional[str] = None
class UpstoxService:
def __init__(self):
self._client = None
@property
def client(self):
if self._client is None:
self._client = UpstoxTOTP()
return self._client
async def get_token(self) -> TokenResponse:
try:
response = self.client.app_token.get_access_token()
if response.success:
return TokenResponse(
success=True,
token=response.data.access_token,
user_id=response.data.user_id,
user_name=response.data.user_name
)
else:
return TokenResponse(
success=False,
error=str(response.error)
)
except Exception as e:
return TokenResponse(
success=False,
error=str(e)
)
# Dependency injection
def get_upstox_service() -> UpstoxService:
return UpstoxService()
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "upstox-totp"}
@app.post("/api/token", response_model=TokenResponse)
async def generate_token(service: UpstoxService = Depends(get_upstox_service)):
result = await service.get_token()
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return result
@app.get("/api/user/profile")
async def get_user_profile(service: UpstoxService = Depends(get_upstox_service)):
# Generate token
token_result = await service.get_token()
if not token_result.success:
raise HTTPException(status_code=400, detail=token_result.error)
# Fetch profile
import httpx
headers = {
'Authorization': f'Bearer {token_result.token}',
'Content-Type': 'application/json'
}
async with httpx.AsyncClient() as client:
response = await client.get(
'https://api.upstox.com/v2/user/profile',
headers=headers
)
return response.json()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Django Integration
# views.py
from django.http import JsonResponse
from django.views import View
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from upstox_totp import UpstoxTOTP, UpstoxError
import json
import logging
logger = logging.getLogger(__name__)
class UpstoxTokenView(View):
def __init__(self):
super().__init__()
self.upstox_client = UpstoxTOTP()
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request):
try:
response = self.upstox_client.app_token.get_access_token()
if response.success:
return JsonResponse({
'success': True,
'data': {
'access_token': response.data.access_token,
'user_id': response.data.user_id,
'user_name': response.data.user_name,
'email': response.data.email
}
})
else:
return JsonResponse({
'success': False,
'error': str(response.error)
}, status=400)
except Exception as e:
logger.error(f"Token generation failed: {e}")
return JsonResponse({
'success': False,
'error': 'Internal server error'
}, status=500)
# urls.py
from django.urls import path
from .views import UpstoxTokenView
urlpatterns = [
path('api/token/', UpstoxTokenView.as_view(), name='upstox_token'),
]
# settings.py addition
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'upstox.log',
},
},
'loggers': {
'upstox_totp': {
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
},
},
}
Database Integration
SQLAlchemy Integration
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
import json
Base = declarative_base()
class UpstoxToken(Base):
__tablename__ = 'upstox_tokens'
id = Column(Integer, primary_key=True)
user_id = Column(String(50), nullable=False)
access_token = Column(Text, nullable=False)
user_name = Column(String(100))
email = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime, nullable=False)
is_active = Column(Boolean, default=True)
class UpstoxTokenManager:
def __init__(self, database_url):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.upstox = UpstoxTOTP()
def get_valid_token(self, user_identifier=None):
"""Get valid token from database or generate new one."""
session = self.Session()
try:
# Try to find valid token
now = datetime.utcnow()
token_record = session.query(UpstoxToken).filter(
UpstoxToken.is_active == True,
UpstoxToken.expires_at > now + timedelta(hours=1) # 1 hour buffer
).first()
if token_record:
return token_record.access_token
# Generate new token
response = self.upstox.app_token.get_access_token()
if response.success:
# Deactivate old tokens
session.query(UpstoxToken).filter(
UpstoxToken.is_active == True
).update({'is_active': False})
# Store new token
new_token = UpstoxToken(
user_id=response.data.user_id,
access_token=response.data.access_token,
user_name=response.data.user_name,
email=response.data.email,
expires_at=datetime.utcnow() + timedelta(hours=24)
)
session.add(new_token)
session.commit()
return response.data.access_token
else:
raise Exception(f"Token generation failed: {response.error}")
finally:
session.close()
def cleanup_expired_tokens(self):
"""Remove expired tokens from database."""
session = self.Session()
try:
deleted = session.query(UpstoxToken).filter(
UpstoxToken.expires_at < datetime.utcnow()
).delete()
session.commit()
return deleted
finally:
session.close()
# Usage
token_manager = UpstoxTokenManager('sqlite:///upstox_tokens.db')
token = token_manager.get_valid_token()
MongoDB Integration
from pymongo import MongoClient
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
import logging
class MongoUpstoxTokenManager:
def __init__(self, connection_string, database_name='upstox_db'):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self.tokens = self.db.tokens
self.upstox = UpstoxTOTP()
self.logger = logging.getLogger(__name__)
def get_valid_token(self):
"""Get valid token from MongoDB or generate new one."""
# Check for valid token
now = datetime.utcnow()
buffer_time = now + timedelta(hours=1)
token_doc = self.tokens.find_one({
'is_active': True,
'expires_at': {'$gt': buffer_time}
})
if token_doc:
self.logger.info("Using cached token from MongoDB")
return token_doc['access_token']
# Generate new token
self.logger.info("Generating new token")
response = self.upstox.app_token.get_access_token()
if response.success:
# Deactivate old tokens
self.tokens.update_many(
{'is_active': True},
{'$set': {'is_active': False}}
)
# Insert new token
token_doc = {
'user_id': response.data.user_id,
'access_token': response.data.access_token,
'user_name': response.data.user_name,
'email': response.data.email,
'created_at': now,
'expires_at': now + timedelta(hours=24),
'is_active': True,
'metadata': {
'broker': response.data.broker,
'products': response.data.products,
'exchanges': response.data.exchanges
}
}
self.tokens.insert_one(token_doc)
self.logger.info("New token stored in MongoDB")
return response.data.access_token
else:
raise Exception(f"Token generation failed: {response.error}")
def get_token_history(self, limit=10):
"""Get token generation history."""
return list(self.tokens.find(
{},
{'access_token': 0} # Exclude sensitive token data
).sort('created_at', -1).limit(limit))
# Usage
mongo_manager = MongoUpstoxTokenManager('mongodb://localhost:27017/')
token = mongo_manager.get_valid_token()
Redis Integration
import redis
import json
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class RedisUpstoxTokenManager:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.upstox = UpstoxTOTP()
self.token_key = 'upstox:token'
self.metadata_key = 'upstox:metadata'
def get_valid_token(self):
"""Get valid token from Redis cache or generate new one."""
# Check cache
cached_token = self.redis_client.get(self.token_key)
if cached_token:
return cached_token
# Generate new token
response = self.upstox.app_token.get_access_token()
if response.success:
token = response.data.access_token
# Cache token with expiration (23 hours to be safe)
self.redis_client.setex(
self.token_key,
timedelta(hours=23),
token
)
# Store metadata separately
metadata = {
'user_id': response.data.user_id,
'user_name': response.data.user_name,
'email': response.data.email,
'generated_at': datetime.utcnow().isoformat()
}
self.redis_client.setex(
self.metadata_key,
timedelta(hours=23),
json.dumps(metadata)
)
return token
else:
raise Exception(f"Token generation failed: {response.error}")
def get_token_metadata(self):
"""Get token metadata from cache."""
metadata_json = self.redis_client.get(self.metadata_key)
if metadata_json:
return json.loads(metadata_json)
return None
def invalidate_token(self):
"""Manually invalidate cached token."""
self.redis_client.delete(self.token_key, self.metadata_key)
# Usage
redis_manager = RedisUpstoxTokenManager()
token = redis_manager.get_valid_token()
Background Job Integration
Celery Integration
from celery import Celery
from upstox_totp import UpstoxTOTP, UpstoxError
import logging
from datetime import datetime
# Configure Celery
app = Celery('upstox_tasks', broker='redis://localhost:6379/0')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3)
def generate_upstox_token(self):
"""Background task to generate Upstox token."""
try:
upstox = UpstoxTOTP()
response = upstox.app_token.get_access_token()
if response.success:
result = {
'success': True,
'token': response.data.access_token,
'user_id': response.data.user_id,
'generated_at': datetime.utcnow().isoformat()
}
logger.info(f"Token generated successfully: {result['user_id']}")
return result
else:
raise Exception(f"Token generation failed: {response.error}")
except Exception as e:
logger.error(f"Token generation task failed: {e}")
# Retry with exponential backoff
countdown = 2 ** self.request.retries
raise self.retry(exc=e, countdown=countdown)
@app.task
def refresh_all_user_tokens():
"""Background task to refresh tokens for all users."""
# This would integrate with your user management system
results = []
# Example: get all users who need token refresh
users_needing_refresh = get_users_needing_refresh() # Implement this
for user in users_needing_refresh:
try:
# Generate token for each user
result = generate_upstox_token.delay()
results.append({
'user_id': user['id'],
'task_id': result.id,
'status': 'pending'
})
except Exception as e:
logger.error(f"Failed to queue token refresh for user {user['id']}: {e}")
return results
# Periodic task
from celery.schedules import crontab
app.conf.beat_schedule = {
'refresh-tokens-daily': {
'task': 'refresh_all_user_tokens',
'schedule': crontab(hour=2, minute=0), # Run daily at 2 AM
},
}
APScheduler Integration
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from upstox_totp import UpstoxTOTP
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UpstoxTokenScheduler:
def __init__(self):
self.scheduler = BackgroundScheduler()
self.upstox = UpstoxTOTP()
self.current_token = None
def generate_token(self):
"""Job function to generate token."""
try:
logger.info("Starting scheduled token generation")
response = self.upstox.app_token.get_access_token()
if response.success:
self.current_token = response.data.access_token
logger.info(f"Token generated successfully at {datetime.now()}")
else:
logger.error(f"Token generation failed: {response.error}")
except Exception as e:
logger.error(f"Token generation exception: {e}")
def start_scheduler(self):
"""Start the token refresh scheduler."""
# Generate token immediately
self.generate_token()
# Schedule token refresh every 23 hours
self.scheduler.add_job(
self.generate_token,
'interval',
hours=23,
id='token_refresh',
replace_existing=True
)
self.scheduler.start()
logger.info("Token refresh scheduler started")
def stop_scheduler(self):
"""Stop the scheduler."""
self.scheduler.shutdown()
logger.info("Token refresh scheduler stopped")
def get_current_token(self):
"""Get the current valid token."""
if not self.current_token:
self.generate_token()
return self.current_token
# Usage
token_scheduler = UpstoxTokenScheduler()
token_scheduler.start_scheduler()
# Get token when needed
token = token_scheduler.get_current_token()
Cloud Service Integration
AWS Lambda Integration
import json
import boto3
from upstox_totp import UpstoxTOTP, UpstoxError
import logging
# Configure logging for Lambda
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""AWS Lambda function to generate Upstox token."""
try:
# Initialize Upstox client
upstox = UpstoxTOTP()
# Generate token
response = upstox.app_token.get_access_token()
if response.success:
# Store token in Parameter Store
ssm = boto3.client('ssm')
ssm.put_parameter(
Name='/upstox/access_token',
Value=response.data.access_token,
Type='SecureString',
Overwrite=True
)
# Return success response
return {
'statusCode': 200,
'body': json.dumps({
'success': True,
'user_id': response.data.user_id,
'message': 'Token generated and stored successfully'
})
}
else:
logger.error(f"Token generation failed: {response.error}")
return {
'statusCode': 400,
'body': json.dumps({
'success': False,
'error': str(response.error)
})
}
except Exception as e:
logger.error(f"Lambda function error: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'success': False,
'error': 'Internal server error'
})
}
# CloudWatch Events integration
def scheduled_lambda_handler(event, context):
"""Lambda function triggered by CloudWatch Events for periodic token refresh."""
logger.info("Scheduled token refresh triggered")
try:
upstox = UpstoxTOTP()
response = upstox.app_token.get_access_token()
if response.success:
# Store in Parameter Store
ssm = boto3.client('ssm')
ssm.put_parameter(
Name='/upstox/access_token',
Value=response.data.access_token,
Type='SecureString',
Overwrite=True
)
# Send notification
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:region:account:upstox-notifications',
Message=f'Token refreshed successfully for user {response.data.user_id}',
Subject='Upstox Token Refresh Success'
)
logger.info("Scheduled token refresh completed successfully")
else:
# Send error notification
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:region:account:upstox-notifications',
Message=f'Token refresh failed: {response.error}',
Subject='Upstox Token Refresh Failed'
)
logger.error(f"Scheduled token refresh failed: {response.error}")
except Exception as e:
logger.error(f"Scheduled token refresh error: {e}")
# Send error notification
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:region:account:upstox-notifications',
Message=f'Token refresh error: {str(e)}',
Subject='Upstox Token Refresh Error'
)
Google Cloud Functions Integration
import functions_framework
from google.cloud import secretmanager
from upstox_totp import UpstoxTOTP, UpstoxError
import json
import logging
@functions_framework.http
def generate_upstox_token(request):
"""HTTP Cloud Function to generate Upstox token."""
# Set CORS headers
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers': 'Content-Type'
}
if request.method == 'OPTIONS':
return ('', 204, headers)
try:
# Initialize Upstox client
upstox = UpstoxTOTP()
# Generate token
response = upstox.app_token.get_access_token()
if response.success:
# Store token in Secret Manager
client = secretmanager.SecretManagerServiceClient()
project_id = "your-project-id"
secret_id = "upstox-access-token"
# Create or update secret
parent = f"projects/{project_id}"
secret_path = f"{parent}/secrets/{secret_id}"
try:
# Add secret version
client.add_secret_version(
parent=secret_path,
payload={'data': response.data.access_token.encode('utf-8')}
)
except Exception:
# Secret might not exist, create it
client.create_secret(
parent=parent,
secret_id=secret_id,
secret={'replication': {'automatic': {}}}
)
client.add_secret_version(
parent=secret_path,
payload={'data': response.data.access_token.encode('utf-8')}
)
return (json.dumps({
'success': True,
'user_id': response.data.user_id,
'message': 'Token generated successfully'
}), 200, headers)
else:
return (json.dumps({
'success': False,
'error': str(response.error)
}), 400, headers)
except Exception as e:
logging.error(f"Cloud Function error: {e}")
return (json.dumps({
'success': False,
'error': 'Internal server error'
}), 500, headers)
Docker Integration
Production Docker Setup
# Dockerfile
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash upstox
RUN chown -R upstox:upstox /app
USER upstox
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
upstox-api:
build: .
ports:
- "8000:8000"
environment:
- UPSTOX_USERNAME=${UPSTOX_USERNAME}
- UPSTOX_PASSWORD=${UPSTOX_PASSWORD}
- UPSTOX_PIN_CODE=${UPSTOX_PIN_CODE}
- UPSTOX_TOTP_SECRET=${UPSTOX_TOTP_SECRET}
- UPSTOX_CLIENT_ID=${UPSTOX_CLIENT_ID}
- UPSTOX_CLIENT_SECRET=${UPSTOX_CLIENT_SECRET}
- UPSTOX_REDIRECT_URI=${UPSTOX_REDIRECT_URI}
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
restart: unless-stopped
volumes:
- ./logs:/app/logs
redis:
image: redis:7-alpine
ports:
- "6379:6379"
restart: unless-stopped
volumes:
- redis_data:/data
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- upstox-api
restart: unless-stopped
volumes:
redis_data:
Kubernetes Integration
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: upstox-totp-api
labels:
app: upstox-totp-api
spec:
replicas: 3
selector:
matchLabels:
app: upstox-totp-api
template:
metadata:
labels:
app: upstox-totp-api
spec:
containers:
- name: upstox-api
image: your-registry/upstox-totp-api:latest
ports:
- containerPort: 8000
env:
- name: UPSTOX_USERNAME
valueFrom:
secretKeyRef:
name: upstox-credentials
key: username
- name: UPSTOX_PASSWORD
valueFrom:
secretKeyRef:
name: upstox-credentials
key: password
# ... other environment variables
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: upstox-totp-service
spec:
selector:
app: upstox-totp-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: v1
kind: Secret
metadata:
name: upstox-credentials
type: Opaque
data:
username: <base64-encoded-username>
password: <base64-encoded-password>
# ... other credentials
Testing Integration
pytest Integration
# test_integration.py
import pytest
from unittest.mock import patch, Mock
from your_app import app, UpstoxService
from upstox_totp.models import AccessTokenResponse, AccessTokenData
@pytest.fixture
def client():
app.config['TESTING'] = True
with app.test_client() as client:
yield client
@pytest.fixture
def mock_upstox_response():
return AccessTokenResponse(
success=True,
data=AccessTokenData(
access_token="test-token-123",
user_id="TEST123",
user_name="Test User",
email="test@example.com",
broker="UPSTOX",
user_type="individual",
products=["D", "I"],
exchanges=["NSE_EQ", "BSE_EQ"],
is_active=True
),
error=None
)
@patch('your_app.UpstoxTOTP')
def test_token_generation_success(mock_upstox_class, client, mock_upstox_response):
# Setup mock
mock_upstox_instance = Mock()
mock_upstox_instance.app_token.get_access_token.return_value = mock_upstox_response
mock_upstox_class.return_value = mock_upstox_instance
# Make request
response = client.post('/api/token')
# Assert response
assert response.status_code == 200
data = response.get_json()
assert data['success'] is True
assert data['token'] == "test-token-123"
@patch('your_app.UpstoxTOTP')
def test_token_generation_failure(mock_upstox_class, client):
# Setup mock for failure
mock_upstox_instance = Mock()
mock_upstox_instance.app_token.get_access_token.return_value = AccessTokenResponse(
success=False,
data=None,
error={'message': 'Invalid credentials'}
)
mock_upstox_class.return_value = mock_upstox_instance
# Make request
response = client.post('/api/token')
# Assert response
assert response.status_code == 400
data = response.get_json()
assert data['success'] is False
Monitoring Integration
Prometheus Integration
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from upstox_totp import UpstoxTOTP
import time
# Metrics
token_requests_total = Counter('upstox_token_requests_total', 'Total token requests')
token_requests_duration = Histogram('upstox_token_requests_duration_seconds', 'Token request duration')
token_generation_failures = Counter('upstox_token_generation_failures_total', 'Token generation failures')
active_tokens = Gauge('upstox_active_tokens', 'Number of active tokens')
class MonitoredUpstoxClient:
def __init__(self):
self.upstox = UpstoxTOTP()
@token_requests_duration.time()
def get_token(self):
token_requests_total.inc()
try:
response = self.upstox.app_token.get_access_token()
if response.success:
active_tokens.inc()
return response.data.access_token
else:
token_generation_failures.inc()
raise Exception(f"Token generation failed: {response.error}")
except Exception as e:
token_generation_failures.inc()
raise
# Start metrics server
start_http_server(8001)
# Usage
client = MonitoredUpstoxClient()
Best Practices
Environment Configuration: Use environment variables for all sensitive data
Error Handling: Implement comprehensive error handling and logging
Token Caching: Cache tokens appropriately to avoid unnecessary API calls
Security: Follow security best practices for token storage and transmission
Monitoring: Implement monitoring and alerting for production deployments
Testing: Write comprehensive tests for all integration points
Documentation: Document your integration for future maintenance
See Also
Basic Usage Examples - Basic usage examples
Token Caching Examples - Token caching strategies
Security Best Practices - Security best practices
Configuration Guide - Configuration guide