Security Best Practices
This guide covers security considerations and best practices when using the Upstox TOTP SDK.
Overview
The Upstox TOTP SDK handles sensitive financial data and credentials. Following security best practices is essential to protect your trading account and API access.
Credential Security
Environment Variables
✅ Do:
# Use environment variables
export UPSTOX_USERNAME=9876543210
export UPSTOX_PASSWORD=secure-password
export UPSTOX_PIN_CODE=1234
export UPSTOX_TOTP_SECRET=JBSWY3DPEHPK3PXP
export UPSTOX_CLIENT_ID=your-client-id
export UPSTOX_CLIENT_SECRET=your-client-secret
❌ Don’t:
# Never hardcode credentials in source code
upx = UpstoxTOTP(
username="9876543210",
password=SecretStr("my-password"), # Don't do this!
# ...
)
.env Files
✅ Do:
# Create .env file with proper permissions
touch .env
chmod 600 .env # Owner read/write only
# Add to .gitignore
echo ".env" >> .gitignore
echo ".env.*" >> .gitignore
❌ Don’t:
# Never commit .env files
git add .env # Don't do this!
File Permissions
# Secure configuration files
chmod 600 .env
chmod 600 config.json
chmod 700 ~/.upstox/
# Check permissions
ls -la .env
# Should show: -rw------- (600)
Version Control Security
# .gitignore
# Environment files
.env
.env.local
.env.development
.env.staging
.env.production
.env.*.local
# Configuration files
config.json
secrets.yaml
credentials.ini
# Token caches
*.token
upstox_token.json
token_cache/
# Logs with sensitive data
debug.log
upstox.log
SecretStr Usage
The SDK uses Pydantic’s SecretStr to protect sensitive data:
from pydantic import SecretStr
from upstox_totp import UpstoxTOTP
# SecretStr automatically masks values in logs
password = SecretStr("my-password")
print(password) # Output: SecretStr('**********')
# Access actual value only when needed
actual_password = password.get_secret_value()
# Use with UpstoxTOTP
upx = UpstoxTOTP(
username="9876543210",
password=password, # Automatically protected
# ...
)
Token Security
Token Storage
✅ Secure storage:
import json
import os
from cryptography.fernet import Fernet
class SecureTokenStorage:
def __init__(self, key_file="token.key", token_file="token.enc"):
self.key_file = key_file
self.token_file = token_file
self.key = self._load_or_create_key()
self.cipher = Fernet(self.key)
def _load_or_create_key(self):
if os.path.exists(self.key_file):
with open(self.key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
os.chmod(self.key_file, 0o600)
return key
def store_token(self, token):
encrypted_token = self.cipher.encrypt(token.encode())
with open(self.token_file, 'wb') as f:
f.write(encrypted_token)
os.chmod(self.token_file, 0o600)
def load_token(self):
if not os.path.exists(self.token_file):
return None
with open(self.token_file, 'rb') as f:
encrypted_token = f.read()
decrypted_token = self.cipher.decrypt(encrypted_token)
return decrypted_token.decode()
# Usage
storage = SecureTokenStorage()
# Store token securely
from upstox_totp import UpstoxTOTP
upx = UpstoxTOTP()
response = upx.app_token.get_access_token()
if response.success:
storage.store_token(response.data.access_token)
❌ Insecure storage:
# Don't store tokens in plain text
with open('token.txt', 'w') as f:
f.write(token) # Insecure!
# Don't log tokens
print(f"Token: {token}") # Don't do this!
Token Expiry
import jwt
from datetime import datetime, timedelta
def check_token_expiry(token):
"""Check if token is close to expiry."""
try:
# Decode without verification to check expiry
decoded = jwt.decode(token, options={"verify_signature": False})
if 'exp' in decoded:
exp_time = datetime.fromtimestamp(decoded['exp'])
now = datetime.now()
# Check if token expires within 1 hour
if exp_time - now < timedelta(hours=1):
return True, "Token expires soon"
return False, f"Token valid until {exp_time}"
return None, "No expiry information in token"
except jwt.DecodeError:
return None, "Invalid token format"
# Usage
needs_refresh, message = check_token_expiry(token)
if needs_refresh:
# Refresh token proactively
new_response = upx.app_token.get_access_token()
Token Cleanup
import gc
def secure_token_cleanup(token_var):
"""Securely clear token from memory."""
if token_var:
# Overwrite memory (Python doesn't guarantee this)
token_var = 'x' * len(token_var)
del token_var
gc.collect()
# Usage
token = response.data.access_token
# Use token...
# Clean up when done
secure_token_cleanup(token)
token = None
Network Security
HTTPS Only
from upstox_totp import UpstoxTOTP
import requests
# Verify SSL certificates (default behavior)
upx = UpstoxTOTP()
# Don't disable SSL verification
# upx.session.verify = False # Never do this!
# Use proper SSL context if needed
import ssl
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
Custom Session Security
from upstox_totp import UpstoxTOTP
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class SecureHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
upx = UpstoxTOTP()
upx.session.mount('https://', SecureHTTPAdapter())
Request Headers Security
from upstox_totp import UpstoxTOTP
upx = UpstoxTOTP()
# Remove potentially identifying headers
upx.session.headers.update({
'User-Agent': 'TradingApp/1.0', # Use generic user agent
'X-Forwarded-For': '', # Don't expose IP
'X-Real-IP': '' # Don't expose real IP
})
Production Security
Secrets Management
AWS Secrets Manager:
import boto3
import json
from upstox_totp import UpstoxTOTP
from pydantic import SecretStr
def get_upstox_credentials():
"""Get credentials from AWS Secrets Manager."""
client = boto3.client('secretsmanager', region_name='us-east-1')
try:
response = client.get_secret_value(SecretId='upstox-credentials')
secrets = json.loads(response['SecretString'])
return UpstoxTOTP(
username=secrets['username'],
password=SecretStr(secrets['password']),
pin_code=SecretStr(secrets['pin_code']),
totp_secret=SecretStr(secrets['totp_secret']),
client_id=secrets['client_id'],
client_secret=SecretStr(secrets['client_secret']),
redirect_uri=secrets['redirect_uri']
)
except Exception as e:
raise Exception(f"Failed to get credentials: {e}")
Azure Key Vault:
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from upstox_totp import UpstoxTOTP
from pydantic import SecretStr
def get_upstox_from_keyvault():
"""Get credentials from Azure Key Vault."""
credential = DefaultAzureCredential()
client = SecretClient(
vault_url="https://your-vault.vault.azure.net/",
credential=credential
)
return UpstoxTOTP(
username=client.get_secret("upstox-username").value,
password=SecretStr(client.get_secret("upstox-password").value),
pin_code=SecretStr(client.get_secret("upstox-pin").value),
totp_secret=SecretStr(client.get_secret("upstox-totp-secret").value),
client_id=client.get_secret("upstox-client-id").value,
client_secret=SecretStr(client.get_secret("upstox-client-secret").value),
redirect_uri=client.get_secret("upstox-redirect-uri").value
)
Environment Isolation
import os
from upstox_totp import UpstoxTOTP
class EnvironmentManager:
@staticmethod
def get_client(environment='production'):
"""Get client for specific environment."""
env_map = {
'development': '.env.development',
'staging': '.env.staging',
'production': '.env.production'
}
env_file = env_map.get(environment)
if not env_file:
raise ValueError(f"Unknown environment: {environment}")
if not os.path.exists(env_file):
raise FileNotFoundError(f"Environment file not found: {env_file}")
return UpstoxTOTP.from_env_file(env_file)
# Usage
if os.getenv('ENV') == 'production':
upx = EnvironmentManager.get_client('production')
else:
upx = EnvironmentManager.get_client('development')
Monitoring and Auditing
Security Logging
import logging
import hashlib
from datetime import datetime
from upstox_totp import UpstoxTOTP
# Configure security logger
security_logger = logging.getLogger('security')
security_handler = logging.FileHandler('security.log')
security_formatter = logging.Formatter(
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
)
security_handler.setFormatter(security_formatter)
security_logger.addHandler(security_handler)
security_logger.setLevel(logging.INFO)
class SecureUpstoxClient:
def __init__(self):
self.upx = UpstoxTOTP()
self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8]
def get_token(self):
"""Get token with security logging."""
security_logger.info(f"Token generation started - Session: {self.session_id}")
try:
response = self.upx.app_token.get_access_token()
if response.success:
security_logger.info(f"Token generation successful - Session: {self.session_id}")
return response.data.access_token
else:
security_logger.warning(f"Token generation failed - Session: {self.session_id}")
return None
except Exception as e:
security_logger.error(f"Token generation error - Session: {self.session_id} - Error: {str(e)}")
raise
Rate Limiting
import time
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests=10, time_window=60):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def allow_request(self, identifier):
"""Check if request is allowed under rate limit."""
now = datetime.now()
user_requests = self.requests[identifier]
# Remove old requests
cutoff = now - timedelta(seconds=self.time_window)
user_requests[:] = [req_time for req_time in user_requests if req_time > cutoff]
if len(user_requests) >= self.max_requests:
return False
user_requests.append(now)
return True
class RateLimitedUpstoxClient:
def __init__(self, user_id):
self.upx = UpstoxTOTP()
self.user_id = user_id
self.rate_limiter = RateLimiter(max_requests=5, time_window=300) # 5 requests per 5 minutes
def get_token(self):
"""Get token with rate limiting."""
if not self.rate_limiter.allow_request(self.user_id):
raise Exception("Rate limit exceeded. Please try again later.")
return self.upx.app_token.get_access_token()
Access Control
import os
import stat
from pathlib import Path
def secure_file_permissions(filepath):
"""Set secure permissions on sensitive files."""
path = Path(filepath)
# Owner read/write only (600)
path.chmod(stat.S_IRUSR | stat.S_IWUSR)
# Verify permissions
file_stat = path.stat()
mode = stat.filemode(file_stat.st_mode)
if mode != '-rw-------':
raise Exception(f"Failed to set secure permissions on {filepath}")
# Usage
secure_file_permissions('.env')
secure_file_permissions('token.json')
Incident Response
Security Breach Detection
import hashlib
import json
from datetime import datetime
class SecurityMonitor:
def __init__(self):
self.config_hash = None
self.last_token_time = None
def check_config_integrity(self, config_file='.env'):
"""Check if configuration file has been tampered with."""
try:
with open(config_file, 'r') as f:
content = f.read()
current_hash = hashlib.sha256(content.encode()).hexdigest()
if self.config_hash is None:
self.config_hash = current_hash
return True
if current_hash != self.config_hash:
self._log_security_incident("Configuration file modified")
return False
return True
except Exception as e:
self._log_security_incident(f"Config check failed: {e}")
return False
def check_unusual_activity(self):
"""Check for unusual token generation patterns."""
now = datetime.now()
if self.last_token_time:
time_diff = (now - self.last_token_time).total_seconds()
# Alert if tokens generated too frequently
if time_diff < 30: # Less than 30 seconds
self._log_security_incident("Frequent token generation detected")
self.last_token_time = now
def _log_security_incident(self, message):
"""Log security incident."""
incident = {
'timestamp': datetime.now().isoformat(),
'message': message,
'severity': 'HIGH'
}
with open('security_incidents.log', 'a') as f:
f.write(json.dumps(incident) + '\n')
# Usage
monitor = SecurityMonitor()
# Check before token generation
if monitor.check_config_integrity():
monitor.check_unusual_activity()
# Proceed with token generation
else:
# Handle security incident
raise Exception("Security check failed")
Credential Rotation
from datetime import datetime, timedelta
import json
class CredentialRotationManager:
def __init__(self, rotation_file='last_rotation.json'):
self.rotation_file = rotation_file
def needs_rotation(self, days=90):
"""Check if credentials need rotation."""
try:
with open(self.rotation_file, 'r') as f:
data = json.load(f)
last_rotation = datetime.fromisoformat(data['last_rotation'])
if datetime.now() - last_rotation > timedelta(days=days):
return True
return False
except (FileNotFoundError, KeyError, ValueError):
# No rotation record, assume needs rotation
return True
def record_rotation(self):
"""Record that credentials were rotated."""
data = {
'last_rotation': datetime.now().isoformat(),
'rotated_by': 'automated_system'
}
with open(self.rotation_file, 'w') as f:
json.dump(data, f, indent=2)
def alert_rotation_needed(self):
"""Alert that credential rotation is needed."""
print("⚠️ SECURITY ALERT: Credentials need rotation")
print("Please update your Upstox password and regenerate TOTP secret")
# Usage
rotation_manager = CredentialRotationManager()
if rotation_manager.needs_rotation():
rotation_manager.alert_rotation_needed()
# Implement rotation process
Testing Security
Security Test Cases
import pytest
import os
from upstox_totp import UpstoxTOTP, ConfigurationError
def test_no_credentials_in_logs(caplog):
"""Ensure credentials don't appear in logs."""
upx = UpstoxTOTP(debug=True)
# Generate token with debug logging
try:
response = upx.app_token.get_access_token()
except:
pass # Error is fine, we're testing logging
# Check logs don't contain sensitive data
log_output = caplog.text.lower()
assert 'password' not in log_output
assert 'secret' not in log_output
assert 'pin' not in log_output
def test_secretstr_masking():
"""Test that SecretStr properly masks sensitive data."""
from pydantic import SecretStr
secret = SecretStr("sensitive-data")
str_repr = str(secret)
assert "sensitive-data" not in str_repr
assert "**********" in str_repr
def test_environment_variable_isolation():
"""Test that environment variables are properly isolated."""
# Backup original environment
original_env = dict(os.environ)
try:
# Clear sensitive variables
for key in list(os.environ.keys()):
if key.startswith('UPSTOX_'):
del os.environ[key]
# Should fail without credentials
with pytest.raises(ConfigurationError):
UpstoxTOTP()
finally:
# Restore environment
os.environ.clear()
os.environ.update(original_env)
Penetration Testing
import requests
from upstox_totp import UpstoxTOTP
def test_ssl_security():
"""Test SSL/TLS configuration."""
upx = UpstoxTOTP()
session = upx.session
# Test that SSL verification is enabled
assert session.verify is True
# Test that session uses secure protocols
adapter = session.get_adapter('https://')
assert hasattr(adapter, 'init_poolmanager')
def test_request_headers():
"""Test that request headers don't leak sensitive information."""
upx = UpstoxTOTP()
# Check default headers
headers = dict(upx.session.headers)
# Ensure no sensitive data in headers
sensitive_patterns = ['password', 'secret', 'key', 'token']
for header_name, header_value in headers.items():
for pattern in sensitive_patterns:
assert pattern.lower() not in header_name.lower()
assert pattern.lower() not in str(header_value).lower()
Security Checklist
Configuration Security
[ ] Use environment variables for all credentials
[ ] Never hardcode secrets in source code
[ ] Set proper file permissions (600) on configuration files
[ ] Add all sensitive files to .gitignore
[ ] Use different credentials for different environments
[ ] Regularly rotate credentials (every 90 days)
Application Security
[ ] Enable SSL certificate verification
[ ] Use SecretStr for sensitive data
[ ] Implement proper error handling (don’t leak sensitive info)
[ ] Clear sensitive data from memory when possible
[ ] Use secure random number generation for session IDs
[ ] Implement rate limiting for token generation
Token Security
[ ] Store tokens securely (encrypted if possible)
[ ] Monitor token expiry and refresh proactively
[ ] Don’t log tokens or include them in error messages
[ ] Use HTTPS only for API calls
[ ] Implement token validation
[ ] Clear tokens from memory after use
Production Security
[ ] Use secrets management services (AWS Secrets Manager, Azure Key Vault)
[ ] Implement security monitoring and alerting
[ ] Regularly audit access logs
[ ] Use separate credentials for each environment
[ ] Monitor for unusual access patterns
[ ] Have an incident response plan
Compliance Considerations
Data Protection
Follow data protection regulations (GDPR, CCPA)
Minimize data collection and retention
Implement proper data encryption
Provide data access and deletion capabilities
Document data processing activities
Financial Regulations
Comply with financial industry standards
Implement proper audit trails
Ensure data integrity and availability
Follow broker-specific security requirements
Maintain transaction logs for compliance
Regular Security Reviews
Monthly Tasks
Review access logs for unusual activity
Check for failed authentication attempts
Verify SSL certificate validity
Update dependencies with security patches
Review and rotate API keys if needed
Quarterly Tasks
Rotate all credentials
Review and update security policies
Conduct security testing
Update incident response procedures
Review third-party dependencies
Annual Tasks
Comprehensive security audit
Penetration testing
Review and update security architecture
Staff security training
Compliance assessment
Emergency Procedures
Suspected Compromise
Immediately disable compromised credentials
Generate new API keys and TOTP secrets
Review all recent transactions
Check access logs for unauthorized activity
Notify Upstox of potential security incident
Update all affected systems with new credentials
Document the incident for future reference
Data Breach Response
Contain the breach immediately
Assess the scope of compromised data
Notify relevant authorities if required
Inform affected users
Implement additional security measures
Monitor for further suspicious activity
Conduct post-incident review
Contact Information
SDK Issues: https://github.com/batpool/upstox-totp/issues
Security Vulnerabilities: Report privately via GitHub Security tab
See Also
Configuration Guide - Configuration guide
Troubleshooting Guide - Troubleshooting guide
Error Handling Reference - Error handling
Token Caching Examples - Secure token caching