Security Best Practices

This guide covers security considerations and best practices when using the Upstox TOTP SDK.

Overview

The Upstox TOTP SDK handles sensitive financial data and credentials. Following security best practices is essential to protect your trading account and API access.

Credential Security

Environment Variables

✅ Do:

# Use environment variables
export UPSTOX_USERNAME=9876543210
export UPSTOX_PASSWORD=secure-password
export UPSTOX_PIN_CODE=1234
export UPSTOX_TOTP_SECRET=JBSWY3DPEHPK3PXP
export UPSTOX_CLIENT_ID=your-client-id
export UPSTOX_CLIENT_SECRET=your-client-secret

❌ Don’t:

# Never hardcode credentials in source code
upx = UpstoxTOTP(
    username="9876543210",
    password=SecretStr("my-password"),  # Don't do this!
    # ...
)

.env Files

✅ Do:

# Create .env file with proper permissions
touch .env
chmod 600 .env  # Owner read/write only

# Add to .gitignore
echo ".env" >> .gitignore
echo ".env.*" >> .gitignore

❌ Don’t:

# Never commit .env files
git add .env  # Don't do this!

File Permissions

# Secure configuration files
chmod 600 .env
chmod 600 config.json
chmod 700 ~/.upstox/

# Check permissions
ls -la .env
# Should show: -rw------- (600)

Version Control Security

# .gitignore

# Environment files
.env
.env.local
.env.development
.env.staging
.env.production
.env.*.local

# Configuration files
config.json
secrets.yaml
credentials.ini

# Token caches
*.token
upstox_token.json
token_cache/

# Logs with sensitive data
debug.log
upstox.log

SecretStr Usage

The SDK uses Pydantic’s SecretStr to protect sensitive data:

from pydantic import SecretStr
from upstox_totp import UpstoxTOTP

# SecretStr automatically masks values in logs
password = SecretStr("my-password")
print(password)  # Output: SecretStr('**********')

# Access actual value only when needed
actual_password = password.get_secret_value()

# Use with UpstoxTOTP
upx = UpstoxTOTP(
    username="9876543210",
    password=password,  # Automatically protected
    # ...
)

Token Security

Token Storage

✅ Secure storage:

import json
import os
from cryptography.fernet import Fernet

class SecureTokenStorage:
    def __init__(self, key_file="token.key", token_file="token.enc"):
        self.key_file = key_file
        self.token_file = token_file
        self.key = self._load_or_create_key()
        self.cipher = Fernet(self.key)

    def _load_or_create_key(self):
        if os.path.exists(self.key_file):
            with open(self.key_file, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_file, 'wb') as f:
                f.write(key)
            os.chmod(self.key_file, 0o600)
            return key

    def store_token(self, token):
        encrypted_token = self.cipher.encrypt(token.encode())
        with open(self.token_file, 'wb') as f:
            f.write(encrypted_token)
        os.chmod(self.token_file, 0o600)

    def load_token(self):
        if not os.path.exists(self.token_file):
            return None

        with open(self.token_file, 'rb') as f:
            encrypted_token = f.read()

        decrypted_token = self.cipher.decrypt(encrypted_token)
        return decrypted_token.decode()

# Usage
storage = SecureTokenStorage()

# Store token securely
from upstox_totp import UpstoxTOTP
upx = UpstoxTOTP()
response = upx.app_token.get_access_token()
if response.success:
    storage.store_token(response.data.access_token)

❌ Insecure storage:

# Don't store tokens in plain text
with open('token.txt', 'w') as f:
    f.write(token)  # Insecure!

# Don't log tokens
print(f"Token: {token}")  # Don't do this!

Token Expiry

import jwt
from datetime import datetime, timedelta

def check_token_expiry(token):
    """Check if token is close to expiry."""
    try:
        # Decode without verification to check expiry
        decoded = jwt.decode(token, options={"verify_signature": False})

        if 'exp' in decoded:
            exp_time = datetime.fromtimestamp(decoded['exp'])
            now = datetime.now()

            # Check if token expires within 1 hour
            if exp_time - now < timedelta(hours=1):
                return True, "Token expires soon"

            return False, f"Token valid until {exp_time}"

        return None, "No expiry information in token"

    except jwt.DecodeError:
        return None, "Invalid token format"

# Usage
needs_refresh, message = check_token_expiry(token)
if needs_refresh:
    # Refresh token proactively
    new_response = upx.app_token.get_access_token()

Token Cleanup

import gc

def secure_token_cleanup(token_var):
    """Securely clear token from memory."""
    if token_var:
        # Overwrite memory (Python doesn't guarantee this)
        token_var = 'x' * len(token_var)
        del token_var
        gc.collect()

# Usage
token = response.data.access_token
# Use token...

# Clean up when done
secure_token_cleanup(token)
token = None

Network Security

HTTPS Only

from upstox_totp import UpstoxTOTP
import requests

# Verify SSL certificates (default behavior)
upx = UpstoxTOTP()

# Don't disable SSL verification
# upx.session.verify = False  # Never do this!

# Use proper SSL context if needed
import ssl
context = ssl.create_default_context()
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED

Custom Session Security

from upstox_totp import UpstoxTOTP
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

class SecureHTTPAdapter(HTTPAdapter):
    def init_poolmanager(self, *args, **kwargs):
        ctx = create_urllib3_context()
        ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
        kwargs['ssl_context'] = ctx
        return super().init_poolmanager(*args, **kwargs)

upx = UpstoxTOTP()
upx.session.mount('https://', SecureHTTPAdapter())

Request Headers Security

from upstox_totp import UpstoxTOTP

upx = UpstoxTOTP()

# Remove potentially identifying headers
upx.session.headers.update({
    'User-Agent': 'TradingApp/1.0',  # Use generic user agent
    'X-Forwarded-For': '',          # Don't expose IP
    'X-Real-IP': ''                 # Don't expose real IP
})

Production Security

Secrets Management

AWS Secrets Manager:

import boto3
import json
from upstox_totp import UpstoxTOTP
from pydantic import SecretStr

def get_upstox_credentials():
    """Get credentials from AWS Secrets Manager."""
    client = boto3.client('secretsmanager', region_name='us-east-1')

    try:
        response = client.get_secret_value(SecretId='upstox-credentials')
        secrets = json.loads(response['SecretString'])

        return UpstoxTOTP(
            username=secrets['username'],
            password=SecretStr(secrets['password']),
            pin_code=SecretStr(secrets['pin_code']),
            totp_secret=SecretStr(secrets['totp_secret']),
            client_id=secrets['client_id'],
            client_secret=SecretStr(secrets['client_secret']),
            redirect_uri=secrets['redirect_uri']
        )

    except Exception as e:
        raise Exception(f"Failed to get credentials: {e}")

Azure Key Vault:

from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
from upstox_totp import UpstoxTOTP
from pydantic import SecretStr

def get_upstox_from_keyvault():
    """Get credentials from Azure Key Vault."""
    credential = DefaultAzureCredential()
    client = SecretClient(
        vault_url="https://your-vault.vault.azure.net/",
        credential=credential
    )

    return UpstoxTOTP(
        username=client.get_secret("upstox-username").value,
        password=SecretStr(client.get_secret("upstox-password").value),
        pin_code=SecretStr(client.get_secret("upstox-pin").value),
        totp_secret=SecretStr(client.get_secret("upstox-totp-secret").value),
        client_id=client.get_secret("upstox-client-id").value,
        client_secret=SecretStr(client.get_secret("upstox-client-secret").value),
        redirect_uri=client.get_secret("upstox-redirect-uri").value
    )

Environment Isolation

import os
from upstox_totp import UpstoxTOTP

class EnvironmentManager:
    @staticmethod
    def get_client(environment='production'):
        """Get client for specific environment."""
        env_map = {
            'development': '.env.development',
            'staging': '.env.staging',
            'production': '.env.production'
        }

        env_file = env_map.get(environment)
        if not env_file:
            raise ValueError(f"Unknown environment: {environment}")

        if not os.path.exists(env_file):
            raise FileNotFoundError(f"Environment file not found: {env_file}")

        return UpstoxTOTP.from_env_file(env_file)

# Usage
if os.getenv('ENV') == 'production':
    upx = EnvironmentManager.get_client('production')
else:
    upx = EnvironmentManager.get_client('development')

Monitoring and Auditing

Security Logging

import logging
import hashlib
from datetime import datetime
from upstox_totp import UpstoxTOTP

# Configure security logger
security_logger = logging.getLogger('security')
security_handler = logging.FileHandler('security.log')
security_formatter = logging.Formatter(
    '%(asctime)s - SECURITY - %(levelname)s - %(message)s'
)
security_handler.setFormatter(security_formatter)
security_logger.addHandler(security_handler)
security_logger.setLevel(logging.INFO)

class SecureUpstoxClient:
    def __init__(self):
        self.upx = UpstoxTOTP()
        self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8]

    def get_token(self):
        """Get token with security logging."""
        security_logger.info(f"Token generation started - Session: {self.session_id}")

        try:
            response = self.upx.app_token.get_access_token()

            if response.success:
                security_logger.info(f"Token generation successful - Session: {self.session_id}")
                return response.data.access_token
            else:
                security_logger.warning(f"Token generation failed - Session: {self.session_id}")
                return None

        except Exception as e:
            security_logger.error(f"Token generation error - Session: {self.session_id} - Error: {str(e)}")
            raise

Rate Limiting

import time
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_requests=10, time_window=60):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = defaultdict(list)

    def allow_request(self, identifier):
        """Check if request is allowed under rate limit."""
        now = datetime.now()
        user_requests = self.requests[identifier]

        # Remove old requests
        cutoff = now - timedelta(seconds=self.time_window)
        user_requests[:] = [req_time for req_time in user_requests if req_time > cutoff]

        if len(user_requests) >= self.max_requests:
            return False

        user_requests.append(now)
        return True

class RateLimitedUpstoxClient:
    def __init__(self, user_id):
        self.upx = UpstoxTOTP()
        self.user_id = user_id
        self.rate_limiter = RateLimiter(max_requests=5, time_window=300)  # 5 requests per 5 minutes

    def get_token(self):
        """Get token with rate limiting."""
        if not self.rate_limiter.allow_request(self.user_id):
            raise Exception("Rate limit exceeded. Please try again later.")

        return self.upx.app_token.get_access_token()

Access Control

import os
import stat
from pathlib import Path

def secure_file_permissions(filepath):
    """Set secure permissions on sensitive files."""
    path = Path(filepath)

    # Owner read/write only (600)
    path.chmod(stat.S_IRUSR | stat.S_IWUSR)

    # Verify permissions
    file_stat = path.stat()
    mode = stat.filemode(file_stat.st_mode)

    if mode != '-rw-------':
        raise Exception(f"Failed to set secure permissions on {filepath}")

# Usage
secure_file_permissions('.env')
secure_file_permissions('token.json')

Incident Response

Security Breach Detection

import hashlib
import json
from datetime import datetime

class SecurityMonitor:
    def __init__(self):
        self.config_hash = None
        self.last_token_time = None

    def check_config_integrity(self, config_file='.env'):
        """Check if configuration file has been tampered with."""
        try:
            with open(config_file, 'r') as f:
                content = f.read()

            current_hash = hashlib.sha256(content.encode()).hexdigest()

            if self.config_hash is None:
                self.config_hash = current_hash
                return True

            if current_hash != self.config_hash:
                self._log_security_incident("Configuration file modified")
                return False

            return True

        except Exception as e:
            self._log_security_incident(f"Config check failed: {e}")
            return False

    def check_unusual_activity(self):
        """Check for unusual token generation patterns."""
        now = datetime.now()

        if self.last_token_time:
            time_diff = (now - self.last_token_time).total_seconds()

            # Alert if tokens generated too frequently
            if time_diff < 30:  # Less than 30 seconds
                self._log_security_incident("Frequent token generation detected")

        self.last_token_time = now

    def _log_security_incident(self, message):
        """Log security incident."""
        incident = {
            'timestamp': datetime.now().isoformat(),
            'message': message,
            'severity': 'HIGH'
        }

        with open('security_incidents.log', 'a') as f:
            f.write(json.dumps(incident) + '\n')

# Usage
monitor = SecurityMonitor()

# Check before token generation
if monitor.check_config_integrity():
    monitor.check_unusual_activity()
    # Proceed with token generation
else:
    # Handle security incident
    raise Exception("Security check failed")

Credential Rotation

from datetime import datetime, timedelta
import json

class CredentialRotationManager:
    def __init__(self, rotation_file='last_rotation.json'):
        self.rotation_file = rotation_file

    def needs_rotation(self, days=90):
        """Check if credentials need rotation."""
        try:
            with open(self.rotation_file, 'r') as f:
                data = json.load(f)

            last_rotation = datetime.fromisoformat(data['last_rotation'])

            if datetime.now() - last_rotation > timedelta(days=days):
                return True

            return False

        except (FileNotFoundError, KeyError, ValueError):
            # No rotation record, assume needs rotation
            return True

    def record_rotation(self):
        """Record that credentials were rotated."""
        data = {
            'last_rotation': datetime.now().isoformat(),
            'rotated_by': 'automated_system'
        }

        with open(self.rotation_file, 'w') as f:
            json.dump(data, f, indent=2)

    def alert_rotation_needed(self):
        """Alert that credential rotation is needed."""
        print("⚠️  SECURITY ALERT: Credentials need rotation")
        print("Please update your Upstox password and regenerate TOTP secret")

# Usage
rotation_manager = CredentialRotationManager()

if rotation_manager.needs_rotation():
    rotation_manager.alert_rotation_needed()
    # Implement rotation process

Testing Security

Security Test Cases

import pytest
import os
from upstox_totp import UpstoxTOTP, ConfigurationError

def test_no_credentials_in_logs(caplog):
    """Ensure credentials don't appear in logs."""
    upx = UpstoxTOTP(debug=True)

    # Generate token with debug logging
    try:
        response = upx.app_token.get_access_token()
    except:
        pass  # Error is fine, we're testing logging

    # Check logs don't contain sensitive data
    log_output = caplog.text.lower()

    assert 'password' not in log_output
    assert 'secret' not in log_output
    assert 'pin' not in log_output

def test_secretstr_masking():
    """Test that SecretStr properly masks sensitive data."""
    from pydantic import SecretStr

    secret = SecretStr("sensitive-data")
    str_repr = str(secret)

    assert "sensitive-data" not in str_repr
    assert "**********" in str_repr

def test_environment_variable_isolation():
    """Test that environment variables are properly isolated."""
    # Backup original environment
    original_env = dict(os.environ)

    try:
        # Clear sensitive variables
        for key in list(os.environ.keys()):
            if key.startswith('UPSTOX_'):
                del os.environ[key]

        # Should fail without credentials
        with pytest.raises(ConfigurationError):
            UpstoxTOTP()

    finally:
        # Restore environment
        os.environ.clear()
        os.environ.update(original_env)

Penetration Testing

import requests
from upstox_totp import UpstoxTOTP

def test_ssl_security():
    """Test SSL/TLS configuration."""
    upx = UpstoxTOTP()
    session = upx.session

    # Test that SSL verification is enabled
    assert session.verify is True

    # Test that session uses secure protocols
    adapter = session.get_adapter('https://')
    assert hasattr(adapter, 'init_poolmanager')

def test_request_headers():
    """Test that request headers don't leak sensitive information."""
    upx = UpstoxTOTP()

    # Check default headers
    headers = dict(upx.session.headers)

    # Ensure no sensitive data in headers
    sensitive_patterns = ['password', 'secret', 'key', 'token']
    for header_name, header_value in headers.items():
        for pattern in sensitive_patterns:
            assert pattern.lower() not in header_name.lower()
            assert pattern.lower() not in str(header_value).lower()

Security Checklist

Configuration Security

  • [ ] Use environment variables for all credentials

  • [ ] Never hardcode secrets in source code

  • [ ] Set proper file permissions (600) on configuration files

  • [ ] Add all sensitive files to .gitignore

  • [ ] Use different credentials for different environments

  • [ ] Regularly rotate credentials (every 90 days)

Application Security

  • [ ] Enable SSL certificate verification

  • [ ] Use SecretStr for sensitive data

  • [ ] Implement proper error handling (don’t leak sensitive info)

  • [ ] Clear sensitive data from memory when possible

  • [ ] Use secure random number generation for session IDs

  • [ ] Implement rate limiting for token generation

Token Security

  • [ ] Store tokens securely (encrypted if possible)

  • [ ] Monitor token expiry and refresh proactively

  • [ ] Don’t log tokens or include them in error messages

  • [ ] Use HTTPS only for API calls

  • [ ] Implement token validation

  • [ ] Clear tokens from memory after use

Production Security

  • [ ] Use secrets management services (AWS Secrets Manager, Azure Key Vault)

  • [ ] Implement security monitoring and alerting

  • [ ] Regularly audit access logs

  • [ ] Use separate credentials for each environment

  • [ ] Monitor for unusual access patterns

  • [ ] Have an incident response plan

Compliance Considerations

Data Protection

  • Follow data protection regulations (GDPR, CCPA)

  • Minimize data collection and retention

  • Implement proper data encryption

  • Provide data access and deletion capabilities

  • Document data processing activities

Financial Regulations

  • Comply with financial industry standards

  • Implement proper audit trails

  • Ensure data integrity and availability

  • Follow broker-specific security requirements

  • Maintain transaction logs for compliance

Regular Security Reviews

Monthly Tasks

  • Review access logs for unusual activity

  • Check for failed authentication attempts

  • Verify SSL certificate validity

  • Update dependencies with security patches

  • Review and rotate API keys if needed

Quarterly Tasks

  • Rotate all credentials

  • Review and update security policies

  • Conduct security testing

  • Update incident response procedures

  • Review third-party dependencies

Annual Tasks

  • Comprehensive security audit

  • Penetration testing

  • Review and update security architecture

  • Staff security training

  • Compliance assessment

Emergency Procedures

Suspected Compromise

  1. Immediately disable compromised credentials

  2. Generate new API keys and TOTP secrets

  3. Review all recent transactions

  4. Check access logs for unauthorized activity

  5. Notify Upstox of potential security incident

  6. Update all affected systems with new credentials

  7. Document the incident for future reference

Data Breach Response

  1. Contain the breach immediately

  2. Assess the scope of compromised data

  3. Notify relevant authorities if required

  4. Inform affected users

  5. Implement additional security measures

  6. Monitor for further suspicious activity

  7. Conduct post-incident review

Contact Information

See Also