Token Caching Examples

This guide shows various strategies for caching Upstox access tokens to improve performance and reduce API calls.

Why Cache Tokens?

Upstox access tokens:

  • Expire after 24 hours - Need periodic refresh

  • Take 2-5 seconds to generate - Network overhead

  • Have rate limits - Avoid unnecessary calls

  • Are reusable - Same token works for multiple API calls

Caching tokens reduces latency and improves application performance.

Simple File-Based Caching

Basic File Cache

import json
import os
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class SimpleTokenCache:
    def __init__(self, cache_file="upstox_token.json"):
        self.cache_file = cache_file
        self.upstox = UpstoxTOTP()

    def get_cached_token(self):
        """Get token from cache if still valid."""
        if not os.path.exists(self.cache_file):
            return None

        try:
            with open(self.cache_file, 'r') as f:
                data = json.load(f)

            # Check expiry (with 1 hour buffer)
            expiry = datetime.fromisoformat(data['expiry'])
            if expiry > datetime.now() + timedelta(hours=1):
                print("✅ Using cached token")
                return data['token']
            else:
                print("⚠️ Cached token expired")
                os.remove(self.cache_file)
                return None

        except (json.JSONDecodeError, KeyError, ValueError):
            print("⚠️ Invalid cache file")
            if os.path.exists(self.cache_file):
                os.remove(self.cache_file)
            return None

    def cache_token(self, token):
        """Cache token with 24-hour expiry."""
        expiry = datetime.now() + timedelta(hours=24)
        data = {
            'token': token,
            'expiry': expiry.isoformat(),
            'created': datetime.now().isoformat()
        }

        with open(self.cache_file, 'w') as f:
            json.dump(data, f, indent=2)

        print("💾 Token cached successfully")

    def get_fresh_token(self):
        """Get token from cache or generate new one."""
        # Try cache first
        token = self.get_cached_token()
        if token:
            return token

        # Generate new token
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            token = response.data.access_token
            self.cache_token(token)
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

# Usage
cache = SimpleTokenCache()
token = cache.get_fresh_token()

Secure File Cache

import json
import os
import stat
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from upstox_totp import UpstoxTOTP

class SecureTokenCache:
    def __init__(self, cache_file="upstox_token.enc", key_file="token.key"):
        self.cache_file = cache_file
        self.key_file = key_file
        self.upstox = UpstoxTOTP()
        self.cipher = self._get_cipher()

    def _get_cipher(self):
        """Get or create encryption cipher."""
        if os.path.exists(self.key_file):
            with open(self.key_file, 'rb') as f:
                key = f.read()
        else:
            key = Fernet.generate_key()
            with open(self.key_file, 'wb') as f:
                f.write(key)
            # Set secure permissions
            os.chmod(self.key_file, stat.S_IRUSR | stat.S_IWUSR)  # 600

        return Fernet(key)

    def get_cached_token(self):
        """Get encrypted token from cache."""
        if not os.path.exists(self.cache_file):
            return None

        try:
            with open(self.cache_file, 'rb') as f:
                encrypted_data = f.read()

            # Decrypt data
            decrypted_data = self.cipher.decrypt(encrypted_data)
            data = json.loads(decrypted_data.decode())

            # Check expiry
            expiry = datetime.fromisoformat(data['expiry'])
            if expiry > datetime.now() + timedelta(hours=1):
                return data['token']
            else:
                os.remove(self.cache_file)
                return None

        except Exception:
            if os.path.exists(self.cache_file):
                os.remove(self.cache_file)
            return None

    def cache_token(self, token):
        """Cache token with encryption."""
        expiry = datetime.now() + timedelta(hours=24)
        data = {
            'token': token,
            'expiry': expiry.isoformat(),
            'created': datetime.now().isoformat()
        }

        # Encrypt data
        json_data = json.dumps(data).encode()
        encrypted_data = self.cipher.encrypt(json_data)

        with open(self.cache_file, 'wb') as f:
            f.write(encrypted_data)

        # Set secure permissions
        os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IWUSR)  # 600

    def get_fresh_token(self):
        """Get token from secure cache or generate new one."""
        token = self.get_cached_token()
        if token:
            return token

        response = self.upstox.app_token.get_access_token()
        if response.success and response.data:
            token = response.data.access_token
            self.cache_token(token)
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

# Usage
secure_cache = SecureTokenCache()
token = secure_cache.get_fresh_token()

Memory-Based Caching

In-Memory Cache

import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class MemoryTokenCache:
    def __init__(self):
        self.upstox = UpstoxTOTP()
        self._token = None
        self._expiry = None
        self._lock = threading.Lock()

    def get_fresh_token(self):
        """Get token from memory cache or generate new one."""
        with self._lock:
            # Check if token is still valid
            if (self._token and self._expiry and
                self._expiry > datetime.now() + timedelta(hours=1)):
                print("✅ Using cached token from memory")
                return self._token

            # Generate new token
            print("🔄 Generating new token...")
            response = self.upstox.app_token.get_access_token()

            if response.success and response.data:
                self._token = response.data.access_token
                self._expiry = datetime.now() + timedelta(hours=24)
                print("💾 Token cached in memory")
                return self._token
            else:
                raise Exception(f"Token generation failed: {response.error}")

    def invalidate_cache(self):
        """Manually invalidate cached token."""
        with self._lock:
            self._token = None
            self._expiry = None
            print("🗑️ Memory cache invalidated")

# Usage
memory_cache = MemoryTokenCache()
token = memory_cache.get_fresh_token()

LRU Cache with TTL

import time
import threading
from functools import wraps
from upstox_totp import UpstoxTOTP

class TTLCache:
    def __init__(self, ttl_seconds=3600):  # 1 hour default
        self.ttl_seconds = ttl_seconds
        self.cache = {}
        self.lock = threading.Lock()

    def get(self, key):
        with self.lock:
            if key in self.cache:
                value, timestamp = self.cache[key]
                if time.time() - timestamp < self.ttl_seconds:
                    return value
                else:
                    del self.cache[key]
            return None

    def set(self, key, value):
        with self.lock:
            self.cache[key] = (value, time.time())

    def delete(self, key):
        with self.lock:
            if key in self.cache:
                del self.cache[key]

def cached_token(ttl_seconds=23*3600):  # 23 hours
    """Decorator for token caching."""
    cache = TTLCache(ttl_seconds)

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from function args
            cache_key = f"{func.__name__}_{hash(str(args) + str(sorted(kwargs.items())))}"

            # Try cache first
            cached_result = cache.get(cache_key)
            if cached_result:
                print("✅ Using cached token")
                return cached_result

            # Generate new token
            print("🔄 Generating new token...")
            result = func(*args, **kwargs)

            # Cache result
            cache.set(cache_key, result)
            print("💾 Token cached")

            return result
        return wrapper
    return decorator

class CachedUpstoxClient:
    def __init__(self):
        self.upstox = UpstoxTOTP()

    @cached_token(ttl_seconds=23*3600)  # 23 hours
    def get_access_token(self):
        response = self.upstox.app_token.get_access_token()
        if response.success and response.data:
            return response.data.access_token
        else:
            raise Exception(f"Token generation failed: {response.error}")

# Usage
client = CachedUpstoxClient()
token = client.get_access_token()  # First call - generates token
token = client.get_access_token()  # Second call - uses cache

Redis Caching

Basic Redis Cache

import redis
import json
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class RedisTokenCache:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        self.upstox = UpstoxTOTP()
        self.cache_key = 'upstox:access_token'

    def get_fresh_token(self):
        """Get token from Redis cache or generate new one."""
        # Try cache first
        cached_token = self.redis_client.get(self.cache_key)
        if cached_token:
            print("✅ Using cached token from Redis")
            return cached_token

        # Generate new token
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            token = response.data.access_token

            # Cache with expiration (23 hours)
            self.redis_client.setex(
                self.cache_key,
                23 * 3600,  # 23 hours in seconds
                token
            )

            print("💾 Token cached in Redis")
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

    def invalidate_cache(self):
        """Manually invalidate cached token."""
        self.redis_client.delete(self.cache_key)
        print("🗑️ Redis cache invalidated")

# Usage
redis_cache = RedisTokenCache()
token = redis_cache.get_fresh_token()

Redis with Metadata

import redis
import json
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class AdvancedRedisTokenCache:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        self.upstox = UpstoxTOTP()
        self.token_key = 'upstox:token'
        self.metadata_key = 'upstox:token:metadata'
        self.stats_key = 'upstox:token:stats'

    def get_fresh_token(self):
        """Get token with metadata tracking."""
        # Check cache
        cached_token = self.redis_client.get(self.token_key)
        if cached_token:
            # Update stats
            self._increment_cache_hits()
            print("✅ Using cached token from Redis")
            return cached_token

        # Generate new token
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            token = response.data.access_token

            # Store token
            self.redis_client.setex(self.token_key, 23 * 3600, token)

            # Store metadata
            metadata = {
                'user_id': response.data.user_id,
                'user_name': response.data.user_name,
                'email': response.data.email,
                'generated_at': datetime.now().isoformat(),
                'expires_at': (datetime.now() + timedelta(hours=24)).isoformat()
            }

            self.redis_client.setex(
                self.metadata_key,
                23 * 3600,
                json.dumps(metadata)
            )

            # Update stats
            self._increment_cache_misses()
            self._increment_total_generations()

            print("💾 Token and metadata cached in Redis")
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

    def get_token_metadata(self):
        """Get token metadata."""
        metadata_json = self.redis_client.get(self.metadata_key)
        if metadata_json:
            return json.loads(metadata_json)
        return None

    def get_cache_stats(self):
        """Get cache statistics."""
        stats = self.redis_client.hgetall(self.stats_key)
        return {
            'cache_hits': int(stats.get('hits', 0)),
            'cache_misses': int(stats.get('misses', 0)),
            'total_generations': int(stats.get('generations', 0)),
            'hit_ratio': self._calculate_hit_ratio(stats)
        }

    def _increment_cache_hits(self):
        self.redis_client.hincrby(self.stats_key, 'hits', 1)

    def _increment_cache_misses(self):
        self.redis_client.hincrby(self.stats_key, 'misses', 1)

    def _increment_total_generations(self):
        self.redis_client.hincrby(self.stats_key, 'generations', 1)

    def _calculate_hit_ratio(self, stats):
        hits = int(stats.get('hits', 0))
        misses = int(stats.get('misses', 0))
        total = hits + misses
        return (hits / total * 100) if total > 0 else 0

# Usage
advanced_cache = AdvancedRedisTokenCache()
token = advanced_cache.get_fresh_token()

# Get metadata and stats
metadata = advanced_cache.get_token_metadata()
stats = advanced_cache.get_cache_stats()
print(f"Cache hit ratio: {stats['hit_ratio']:.1f}%")

Database Caching

SQLite Cache

import sqlite3
import json
from datetime import datetime, timedelta
from contextlib import contextmanager
from upstox_totp import UpstoxTOTP

class SQLiteTokenCache:
    def __init__(self, db_path="upstox_cache.db"):
        self.db_path = db_path
        self.upstox = UpstoxTOTP()
        self.init_database()

    def init_database(self):
        """Initialize SQLite database."""
        with self.get_connection() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS token_cache (
                    id INTEGER PRIMARY KEY,
                    token TEXT NOT NULL,
                    user_id TEXT,
                    user_name TEXT,
                    email TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    expires_at TIMESTAMP NOT NULL,
                    is_active BOOLEAN DEFAULT TRUE
                )
            ''')

            conn.execute('''
                CREATE TABLE IF NOT EXISTS cache_stats (
                    id INTEGER PRIMARY KEY,
                    cache_hits INTEGER DEFAULT 0,
                    cache_misses INTEGER DEFAULT 0,
                    total_generations INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # Initialize stats if not exists
            conn.execute('''
                INSERT OR IGNORE INTO cache_stats (id, cache_hits, cache_misses, total_generations)
                VALUES (1, 0, 0, 0)
            ''')

            conn.commit()

    @contextmanager
    def get_connection(self):
        """Get database connection with context management."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()

    def get_fresh_token(self):
        """Get token from SQLite cache or generate new one."""
        with self.get_connection() as conn:
            # Check for valid token
            cursor = conn.execute('''
                SELECT token FROM token_cache
                WHERE is_active = TRUE
                  AND expires_at > datetime('now', '+1 hour')
                ORDER BY created_at DESC
                LIMIT 1
            ''')

            row = cursor.fetchone()
            if row:
                self._update_cache_hits(conn)
                print("✅ Using cached token from SQLite")
                return row['token']

        # Generate new token
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            token = response.data.access_token
            expires_at = datetime.now() + timedelta(hours=24)

            with self.get_connection() as conn:
                # Deactivate old tokens
                conn.execute('''
                    UPDATE token_cache
                    SET is_active = FALSE
                    WHERE is_active = TRUE
                ''')

                # Insert new token
                conn.execute('''
                    INSERT INTO token_cache (token, user_id, user_name, email, expires_at)
                    VALUES (?, ?, ?, ?, ?)
                ''', (
                    token,
                    response.data.user_id,
                    response.data.user_name,
                    response.data.email,
                    expires_at
                ))

                self._update_cache_misses(conn)
                self._update_total_generations(conn)
                conn.commit()

            print("💾 Token cached in SQLite")
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

    def get_cache_stats(self):
        """Get cache statistics."""
        with self.get_connection() as conn:
            cursor = conn.execute('''
                SELECT cache_hits, cache_misses, total_generations
                FROM cache_stats WHERE id = 1
            ''')

            row = cursor.fetchone()
            if row:
                hits = row['cache_hits']
                misses = row['cache_misses']
                total = hits + misses
                hit_ratio = (hits / total * 100) if total > 0 else 0

                return {
                    'cache_hits': hits,
                    'cache_misses': misses,
                    'total_generations': row['total_generations'],
                    'hit_ratio': hit_ratio
                }
            return {}

    def cleanup_expired_tokens(self):
        """Remove expired tokens."""
        with self.get_connection() as conn:
            cursor = conn.execute('''
                DELETE FROM token_cache
                WHERE expires_at < datetime('now')
            ''')
            deleted_count = cursor.rowcount
            conn.commit()
            return deleted_count

    def _update_cache_hits(self, conn):
        conn.execute('''
            UPDATE cache_stats
            SET cache_hits = cache_hits + 1,
                last_updated = CURRENT_TIMESTAMP
            WHERE id = 1
        ''')

    def _update_cache_misses(self, conn):
        conn.execute('''
            UPDATE cache_stats
            SET cache_misses = cache_misses + 1,
                last_updated = CURRENT_TIMESTAMP
            WHERE id = 1
        ''')

    def _update_total_generations(self, conn):
        conn.execute('''
            UPDATE cache_stats
            SET total_generations = total_generations + 1,
                last_updated = CURRENT_TIMESTAMP
            WHERE id = 1
        ''')

# Usage
sqlite_cache = SQLiteTokenCache()
token = sqlite_cache.get_fresh_token()

# Get stats
stats = sqlite_cache.get_cache_stats()
print(f"Cache performance: {stats}")

# Cleanup
deleted = sqlite_cache.cleanup_expired_tokens()
print(f"Cleaned up {deleted} expired tokens")

Multi-Level Caching

Memory + Redis Cache

import redis
import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class MultiLevelTokenCache:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.upstox = UpstoxTOTP()

        # Level 1: Memory cache
        self._memory_token = None
        self._memory_expiry = None
        self._memory_lock = threading.Lock()

        # Level 2: Redis cache
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            decode_responses=True
        )
        self.redis_key = 'upstox:token:multilevel'

    def get_fresh_token(self):
        """Get token from multi-level cache or generate new one."""

        # Level 1: Check memory cache
        with self._memory_lock:
            if (self._memory_token and self._memory_expiry and
                self._memory_expiry > datetime.now() + timedelta(hours=1)):
                print("✅ Using token from memory cache (L1)")
                return self._memory_token

        # Level 2: Check Redis cache
        redis_token = self.redis_client.get(self.redis_key)
        if redis_token:
            # Store in memory cache
            with self._memory_lock:
                self._memory_token = redis_token
                self._memory_expiry = datetime.now() + timedelta(hours=24)

            print("✅ Using token from Redis cache (L2), cached in memory")
            return redis_token

        # Level 3: Generate new token
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            token = response.data.access_token

            # Store in Redis (L2)
            self.redis_client.setex(self.redis_key, 23 * 3600, token)

            # Store in memory (L1)
            with self._memory_lock:
                self._memory_token = token
                self._memory_expiry = datetime.now() + timedelta(hours=24)

            print("💾 Token cached in memory (L1) and Redis (L2)")
            return token
        else:
            raise Exception(f"Token generation failed: {response.error}")

    def invalidate_cache(self, level='all'):
        """Invalidate cache at specified level."""
        if level in ('all', 'memory'):
            with self._memory_lock:
                self._memory_token = None
                self._memory_expiry = None
            print("🗑️ Memory cache (L1) invalidated")

        if level in ('all', 'redis'):
            self.redis_client.delete(self.redis_key)
            print("🗑️ Redis cache (L2) invalidated")

# Usage
multi_cache = MultiLevelTokenCache()

token1 = multi_cache.get_fresh_token()  # Generates new token
token2 = multi_cache.get_fresh_token()  # Uses memory cache

# Invalidate memory cache only
multi_cache.invalidate_cache('memory')
token3 = multi_cache.get_fresh_token()  # Uses Redis cache

Advanced Caching Strategies

Write-Through Cache

import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class WriteThroughTokenCache:
    def __init__(self):
        self.upstox = UpstoxTOTP()
        self.primary_cache = {}  # Memory
        self.secondary_cache = {}  # Persistent storage simulation
        self.lock = threading.Lock()

    def get_fresh_token(self):
        """Get token with write-through caching."""
        with self.lock:
            # Check primary cache
            if self._is_token_valid(self.primary_cache):
                print("✅ Using token from primary cache")
                return self.primary_cache['token']

            # Check secondary cache
            if self._is_token_valid(self.secondary_cache):
                # Promote to primary cache
                self.primary_cache = self.secondary_cache.copy()
                print("✅ Using token from secondary cache, promoted to primary")
                return self.secondary_cache['token']

            # Generate new token
            print("🔄 Generating new token...")
            response = self.upstox.app_token.get_access_token()

            if response.success and response.data:
                token_data = {
                    'token': response.data.access_token,
                    'expiry': datetime.now() + timedelta(hours=24),
                    'user_id': response.data.user_id
                }

                # Write to both caches simultaneously
                self.primary_cache = token_data.copy()
                self.secondary_cache = token_data.copy()

                print("💾 Token stored in both primary and secondary caches")
                return token_data['token']
            else:
                raise Exception(f"Token generation failed: {response.error}")

    def _is_token_valid(self, cache):
        return (cache and 'token' in cache and 'expiry' in cache and
                cache['expiry'] > datetime.now() + timedelta(hours=1))

# Usage
write_through_cache = WriteThroughTokenCache()
token = write_through_cache.get_fresh_token()

Cache with Background Refresh

import threading
import time
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP

class BackgroundRefreshCache:
    def __init__(self, refresh_threshold_hours=2):
        self.upstox = UpstoxTOTP()
        self.refresh_threshold = timedelta(hours=refresh_threshold_hours)
        self.token_data = None
        self.lock = threading.Lock()
        self.refresh_thread = None
        self.stop_refresh = threading.Event()

    def get_fresh_token(self):
        """Get token with background refresh."""
        with self.lock:
            # Check if we have a valid token
            if self._is_token_valid():
                # Check if we need background refresh
                if self._needs_refresh():
                    self._start_background_refresh()

                return self.token_data['token']

            # No valid token, generate immediately
            return self._generate_token()

    def _is_token_valid(self):
        return (self.token_data and
                self.token_data['expiry'] > datetime.now() + timedelta(hours=1))

    def _needs_refresh(self):
        return (self.token_data and
                self.token_data['expiry'] - datetime.now() < self.refresh_threshold)

    def _generate_token(self):
        """Generate new token synchronously."""
        print("🔄 Generating new token...")
        response = self.upstox.app_token.get_access_token()

        if response.success and response.data:
            self.token_data = {
                'token': response.data.access_token,
                'expiry': datetime.now() + timedelta(hours=24),
                'generated_at': datetime.now()
            }
            print("💾 Token generated and cached")
            return self.token_data['token']
        else:
            raise Exception(f"Token generation failed: {response.error}")

    def _start_background_refresh(self):
        """Start background token refresh if not already running."""
        if self.refresh_thread and self.refresh_thread.is_alive():
            return  # Already refreshing

        self.refresh_thread = threading.Thread(target=self._background_refresh)
        self.refresh_thread.daemon = True
        self.refresh_thread.start()
        print("🔄 Background token refresh started")

    def _background_refresh(self):
        """Background thread to refresh token."""
        try:
            time.sleep(1)  # Small delay to avoid immediate refresh

            with self.lock:
                if not self._needs_refresh():
                    return  # No longer needs refresh

                print("🔄 Background: Refreshing token...")
                self._generate_token()
                print("✅ Background: Token refreshed successfully")

        except Exception as e:
            print(f"❌ Background refresh failed: {e}")

    def stop_background_refresh(self):
        """Stop background refresh thread."""
        self.stop_refresh.set()
        if self.refresh_thread:
            self.refresh_thread.join(timeout=5)

# Usage
bg_cache = BackgroundRefreshCache(refresh_threshold_hours=2)

# This will generate a token
token1 = bg_cache.get_fresh_token()

# Later calls will use cached token and start background refresh if needed
token2 = bg_cache.get_fresh_token()

# Clean shutdown
bg_cache.stop_background_refresh()

Cache Monitoring and Metrics

Comprehensive Cache Monitor

import time
import threading
from datetime import datetime, timedelta
from collections import defaultdict
from upstox_totp import UpstoxTOTP

class MonitoredTokenCache:
    def __init__(self):
        self.upstox = UpstoxTOTP()
        self.cache = {}
        self.lock = threading.Lock()

        # Metrics
        self.metrics = {
            'cache_hits': 0,
            'cache_misses': 0,
            'token_generations': 0,
            'total_requests': 0,
            'average_generation_time': 0,
            'last_generation_time': None,
            'generation_times': [],
            'errors': 0
        }

        self.request_history = []

    def get_fresh_token(self):
        """Get token with comprehensive monitoring."""
        start_time = time.time()
        request_id = f"req_{int(time.time() * 1000)}"

        self.metrics['total_requests'] += 1

        with self.lock:
            # Check cache
            if self._is_token_valid():
                self.metrics['cache_hits'] += 1
                self._log_request(request_id, 'cache_hit', time.time() - start_time)
                return self.cache['token']

            # Cache miss - generate new token
            self.metrics['cache_misses'] += 1

            try:
                generation_start = time.time()
                response = self.upstox.app_token.get_access_token()
                generation_time = time.time() - generation_start

                if response.success and response.data:
                    self.cache = {
                        'token': response.data.access_token,
                        'expiry': datetime.now() + timedelta(hours=24),
                        'generated_at': datetime.now(),
                        'generation_time': generation_time
                    }

                    # Update metrics
                    self.metrics['token_generations'] += 1
                    self.metrics['generation_times'].append(generation_time)
                    self.metrics['average_generation_time'] = (
                        sum(self.metrics['generation_times']) /
                        len(self.metrics['generation_times'])
                    )
                    self.metrics['last_generation_time'] = datetime.now()

                    total_time = time.time() - start_time
                    self._log_request(request_id, 'cache_miss_generated', total_time, generation_time)

                    return self.cache['token']
                else:
                    self.metrics['errors'] += 1
                    self._log_request(request_id, 'error', time.time() - start_time)
                    raise Exception(f"Token generation failed: {response.error}")

            except Exception as e:
                self.metrics['errors'] += 1
                self._log_request(request_id, 'exception', time.time() - start_time)
                raise

    def _is_token_valid(self):
        return (self.cache and 'expiry' in self.cache and
                self.cache['expiry'] > datetime.now() + timedelta(hours=1))

    def _log_request(self, request_id, result_type, total_time, generation_time=None):
        log_entry = {
            'request_id': request_id,
            'timestamp': datetime.now(),
            'result_type': result_type,
            'total_time': total_time,
            'generation_time': generation_time
        }

        self.request_history.append(log_entry)

        # Keep only last 1000 requests
        if len(self.request_history) > 1000:
            self.request_history.pop(0)

    def get_metrics(self):
        """Get comprehensive cache metrics."""
        with self.lock:
            total_requests = self.metrics['total_requests']
            cache_hits = self.metrics['cache_hits']

            hit_ratio = (cache_hits / total_requests * 100) if total_requests > 0 else 0

            recent_requests = [
                req for req in self.request_history
                if req['timestamp'] > datetime.now() - timedelta(minutes=5)
            ]

            return {
                'cache_hits': cache_hits,
                'cache_misses': self.metrics['cache_misses'],
                'hit_ratio': hit_ratio,
                'token_generations': self.metrics['token_generations'],
                'total_requests': total_requests,
                'average_generation_time': self.metrics['average_generation_time'],
                'last_generation_time': self.metrics['last_generation_time'],
                'errors': self.metrics['errors'],
                'recent_requests_5min': len(recent_requests),
                'cache_status': 'valid' if self._is_token_valid() else 'invalid'
            }

    def get_performance_report(self):
        """Get detailed performance report."""
        metrics = self.get_metrics()
        generation_times = self.metrics['generation_times']

        if generation_times:
            report = {
                **metrics,
                'min_generation_time': min(generation_times),
                'max_generation_time': max(generation_times),
                'median_generation_time': sorted(generation_times)[len(generation_times)//2],
                'p95_generation_time': sorted(generation_times)[int(len(generation_times)*0.95)],
                'generation_time_samples': len(generation_times)
            }
        else:
            report = metrics

        return report

# Usage
monitored_cache = MonitoredTokenCache()

# Generate some requests
for i in range(10):
    token = monitored_cache.get_fresh_token()
    time.sleep(0.1)  # Simulate some delay

# Get metrics
metrics = monitored_cache.get_metrics()
print(f"Cache hit ratio: {metrics['hit_ratio']:.1f}%")
print(f"Average generation time: {metrics['average_generation_time']:.2f}s")

# Get detailed report
report = monitored_cache.get_performance_report()
print(f"Performance report: {report}")

Best Practices

  1. Choose Appropriate TTL: Use 23 hours for tokens (1-hour safety buffer)

  2. Implement Proper Locking: Use threading locks for thread-safe caching

  3. Handle Cache Failures: Always have fallback to token generation

  4. Monitor Cache Performance: Track hit ratios and generation times

  5. Secure Sensitive Data: Encrypt cached tokens when possible

  6. Clean Up Expired Entries: Regularly remove old cache entries

  7. Use Multi-Level Caching: Combine memory and persistent caching

  8. Implement Background Refresh: Refresh tokens before they expire

  9. Add Proper Logging: Log cache operations for debugging

  10. Test Cache Logic: Thoroughly test cache expiry and refresh logic

See Also