Token Caching Examples
This guide shows various strategies for caching Upstox access tokens to improve performance and reduce API calls.
Why Cache Tokens?
Upstox access tokens:
Expire after 24 hours - Need periodic refresh
Take 2-5 seconds to generate - Network overhead
Have rate limits - Avoid unnecessary calls
Are reusable - Same token works for multiple API calls
Caching tokens reduces latency and improves application performance.
Simple File-Based Caching
Basic File Cache
import json
import os
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class SimpleTokenCache:
def __init__(self, cache_file="upstox_token.json"):
self.cache_file = cache_file
self.upstox = UpstoxTOTP()
def get_cached_token(self):
"""Get token from cache if still valid."""
if not os.path.exists(self.cache_file):
return None
try:
with open(self.cache_file, 'r') as f:
data = json.load(f)
# Check expiry (with 1 hour buffer)
expiry = datetime.fromisoformat(data['expiry'])
if expiry > datetime.now() + timedelta(hours=1):
print("✅ Using cached token")
return data['token']
else:
print("⚠️ Cached token expired")
os.remove(self.cache_file)
return None
except (json.JSONDecodeError, KeyError, ValueError):
print("⚠️ Invalid cache file")
if os.path.exists(self.cache_file):
os.remove(self.cache_file)
return None
def cache_token(self, token):
"""Cache token with 24-hour expiry."""
expiry = datetime.now() + timedelta(hours=24)
data = {
'token': token,
'expiry': expiry.isoformat(),
'created': datetime.now().isoformat()
}
with open(self.cache_file, 'w') as f:
json.dump(data, f, indent=2)
print("💾 Token cached successfully")
def get_fresh_token(self):
"""Get token from cache or generate new one."""
# Try cache first
token = self.get_cached_token()
if token:
return token
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
self.cache_token(token)
return token
else:
raise Exception(f"Token generation failed: {response.error}")
# Usage
cache = SimpleTokenCache()
token = cache.get_fresh_token()
Secure File Cache
import json
import os
import stat
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from upstox_totp import UpstoxTOTP
class SecureTokenCache:
def __init__(self, cache_file="upstox_token.enc", key_file="token.key"):
self.cache_file = cache_file
self.key_file = key_file
self.upstox = UpstoxTOTP()
self.cipher = self._get_cipher()
def _get_cipher(self):
"""Get or create encryption cipher."""
if os.path.exists(self.key_file):
with open(self.key_file, 'rb') as f:
key = f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
# Set secure permissions
os.chmod(self.key_file, stat.S_IRUSR | stat.S_IWUSR) # 600
return Fernet(key)
def get_cached_token(self):
"""Get encrypted token from cache."""
if not os.path.exists(self.cache_file):
return None
try:
with open(self.cache_file, 'rb') as f:
encrypted_data = f.read()
# Decrypt data
decrypted_data = self.cipher.decrypt(encrypted_data)
data = json.loads(decrypted_data.decode())
# Check expiry
expiry = datetime.fromisoformat(data['expiry'])
if expiry > datetime.now() + timedelta(hours=1):
return data['token']
else:
os.remove(self.cache_file)
return None
except Exception:
if os.path.exists(self.cache_file):
os.remove(self.cache_file)
return None
def cache_token(self, token):
"""Cache token with encryption."""
expiry = datetime.now() + timedelta(hours=24)
data = {
'token': token,
'expiry': expiry.isoformat(),
'created': datetime.now().isoformat()
}
# Encrypt data
json_data = json.dumps(data).encode()
encrypted_data = self.cipher.encrypt(json_data)
with open(self.cache_file, 'wb') as f:
f.write(encrypted_data)
# Set secure permissions
os.chmod(self.cache_file, stat.S_IRUSR | stat.S_IWUSR) # 600
def get_fresh_token(self):
"""Get token from secure cache or generate new one."""
token = self.get_cached_token()
if token:
return token
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
self.cache_token(token)
return token
else:
raise Exception(f"Token generation failed: {response.error}")
# Usage
secure_cache = SecureTokenCache()
token = secure_cache.get_fresh_token()
Memory-Based Caching
In-Memory Cache
import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class MemoryTokenCache:
def __init__(self):
self.upstox = UpstoxTOTP()
self._token = None
self._expiry = None
self._lock = threading.Lock()
def get_fresh_token(self):
"""Get token from memory cache or generate new one."""
with self._lock:
# Check if token is still valid
if (self._token and self._expiry and
self._expiry > datetime.now() + timedelta(hours=1)):
print("✅ Using cached token from memory")
return self._token
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
self._token = response.data.access_token
self._expiry = datetime.now() + timedelta(hours=24)
print("💾 Token cached in memory")
return self._token
else:
raise Exception(f"Token generation failed: {response.error}")
def invalidate_cache(self):
"""Manually invalidate cached token."""
with self._lock:
self._token = None
self._expiry = None
print("🗑️ Memory cache invalidated")
# Usage
memory_cache = MemoryTokenCache()
token = memory_cache.get_fresh_token()
LRU Cache with TTL
import time
import threading
from functools import wraps
from upstox_totp import UpstoxTOTP
class TTLCache:
def __init__(self, ttl_seconds=3600): # 1 hour default
self.ttl_seconds = ttl_seconds
self.cache = {}
self.lock = threading.Lock()
def get(self, key):
with self.lock:
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl_seconds:
return value
else:
del self.cache[key]
return None
def set(self, key, value):
with self.lock:
self.cache[key] = (value, time.time())
def delete(self, key):
with self.lock:
if key in self.cache:
del self.cache[key]
def cached_token(ttl_seconds=23*3600): # 23 hours
"""Decorator for token caching."""
cache = TTLCache(ttl_seconds)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from function args
cache_key = f"{func.__name__}_{hash(str(args) + str(sorted(kwargs.items())))}"
# Try cache first
cached_result = cache.get(cache_key)
if cached_result:
print("✅ Using cached token")
return cached_result
# Generate new token
print("🔄 Generating new token...")
result = func(*args, **kwargs)
# Cache result
cache.set(cache_key, result)
print("💾 Token cached")
return result
return wrapper
return decorator
class CachedUpstoxClient:
def __init__(self):
self.upstox = UpstoxTOTP()
@cached_token(ttl_seconds=23*3600) # 23 hours
def get_access_token(self):
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
return response.data.access_token
else:
raise Exception(f"Token generation failed: {response.error}")
# Usage
client = CachedUpstoxClient()
token = client.get_access_token() # First call - generates token
token = client.get_access_token() # Second call - uses cache
Redis Caching
Basic Redis Cache
import redis
import json
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class RedisTokenCache:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.upstox = UpstoxTOTP()
self.cache_key = 'upstox:access_token'
def get_fresh_token(self):
"""Get token from Redis cache or generate new one."""
# Try cache first
cached_token = self.redis_client.get(self.cache_key)
if cached_token:
print("✅ Using cached token from Redis")
return cached_token
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
# Cache with expiration (23 hours)
self.redis_client.setex(
self.cache_key,
23 * 3600, # 23 hours in seconds
token
)
print("💾 Token cached in Redis")
return token
else:
raise Exception(f"Token generation failed: {response.error}")
def invalidate_cache(self):
"""Manually invalidate cached token."""
self.redis_client.delete(self.cache_key)
print("🗑️ Redis cache invalidated")
# Usage
redis_cache = RedisTokenCache()
token = redis_cache.get_fresh_token()
Redis with Metadata
import redis
import json
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class AdvancedRedisTokenCache:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.upstox = UpstoxTOTP()
self.token_key = 'upstox:token'
self.metadata_key = 'upstox:token:metadata'
self.stats_key = 'upstox:token:stats'
def get_fresh_token(self):
"""Get token with metadata tracking."""
# Check cache
cached_token = self.redis_client.get(self.token_key)
if cached_token:
# Update stats
self._increment_cache_hits()
print("✅ Using cached token from Redis")
return cached_token
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
# Store token
self.redis_client.setex(self.token_key, 23 * 3600, token)
# Store metadata
metadata = {
'user_id': response.data.user_id,
'user_name': response.data.user_name,
'email': response.data.email,
'generated_at': datetime.now().isoformat(),
'expires_at': (datetime.now() + timedelta(hours=24)).isoformat()
}
self.redis_client.setex(
self.metadata_key,
23 * 3600,
json.dumps(metadata)
)
# Update stats
self._increment_cache_misses()
self._increment_total_generations()
print("💾 Token and metadata cached in Redis")
return token
else:
raise Exception(f"Token generation failed: {response.error}")
def get_token_metadata(self):
"""Get token metadata."""
metadata_json = self.redis_client.get(self.metadata_key)
if metadata_json:
return json.loads(metadata_json)
return None
def get_cache_stats(self):
"""Get cache statistics."""
stats = self.redis_client.hgetall(self.stats_key)
return {
'cache_hits': int(stats.get('hits', 0)),
'cache_misses': int(stats.get('misses', 0)),
'total_generations': int(stats.get('generations', 0)),
'hit_ratio': self._calculate_hit_ratio(stats)
}
def _increment_cache_hits(self):
self.redis_client.hincrby(self.stats_key, 'hits', 1)
def _increment_cache_misses(self):
self.redis_client.hincrby(self.stats_key, 'misses', 1)
def _increment_total_generations(self):
self.redis_client.hincrby(self.stats_key, 'generations', 1)
def _calculate_hit_ratio(self, stats):
hits = int(stats.get('hits', 0))
misses = int(stats.get('misses', 0))
total = hits + misses
return (hits / total * 100) if total > 0 else 0
# Usage
advanced_cache = AdvancedRedisTokenCache()
token = advanced_cache.get_fresh_token()
# Get metadata and stats
metadata = advanced_cache.get_token_metadata()
stats = advanced_cache.get_cache_stats()
print(f"Cache hit ratio: {stats['hit_ratio']:.1f}%")
Database Caching
SQLite Cache
import sqlite3
import json
from datetime import datetime, timedelta
from contextlib import contextmanager
from upstox_totp import UpstoxTOTP
class SQLiteTokenCache:
def __init__(self, db_path="upstox_cache.db"):
self.db_path = db_path
self.upstox = UpstoxTOTP()
self.init_database()
def init_database(self):
"""Initialize SQLite database."""
with self.get_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS token_cache (
id INTEGER PRIMARY KEY,
token TEXT NOT NULL,
user_id TEXT,
user_name TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_active BOOLEAN DEFAULT TRUE
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS cache_stats (
id INTEGER PRIMARY KEY,
cache_hits INTEGER DEFAULT 0,
cache_misses INTEGER DEFAULT 0,
total_generations INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Initialize stats if not exists
conn.execute('''
INSERT OR IGNORE INTO cache_stats (id, cache_hits, cache_misses, total_generations)
VALUES (1, 0, 0, 0)
''')
conn.commit()
@contextmanager
def get_connection(self):
"""Get database connection with context management."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def get_fresh_token(self):
"""Get token from SQLite cache or generate new one."""
with self.get_connection() as conn:
# Check for valid token
cursor = conn.execute('''
SELECT token FROM token_cache
WHERE is_active = TRUE
AND expires_at > datetime('now', '+1 hour')
ORDER BY created_at DESC
LIMIT 1
''')
row = cursor.fetchone()
if row:
self._update_cache_hits(conn)
print("✅ Using cached token from SQLite")
return row['token']
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
expires_at = datetime.now() + timedelta(hours=24)
with self.get_connection() as conn:
# Deactivate old tokens
conn.execute('''
UPDATE token_cache
SET is_active = FALSE
WHERE is_active = TRUE
''')
# Insert new token
conn.execute('''
INSERT INTO token_cache (token, user_id, user_name, email, expires_at)
VALUES (?, ?, ?, ?, ?)
''', (
token,
response.data.user_id,
response.data.user_name,
response.data.email,
expires_at
))
self._update_cache_misses(conn)
self._update_total_generations(conn)
conn.commit()
print("💾 Token cached in SQLite")
return token
else:
raise Exception(f"Token generation failed: {response.error}")
def get_cache_stats(self):
"""Get cache statistics."""
with self.get_connection() as conn:
cursor = conn.execute('''
SELECT cache_hits, cache_misses, total_generations
FROM cache_stats WHERE id = 1
''')
row = cursor.fetchone()
if row:
hits = row['cache_hits']
misses = row['cache_misses']
total = hits + misses
hit_ratio = (hits / total * 100) if total > 0 else 0
return {
'cache_hits': hits,
'cache_misses': misses,
'total_generations': row['total_generations'],
'hit_ratio': hit_ratio
}
return {}
def cleanup_expired_tokens(self):
"""Remove expired tokens."""
with self.get_connection() as conn:
cursor = conn.execute('''
DELETE FROM token_cache
WHERE expires_at < datetime('now')
''')
deleted_count = cursor.rowcount
conn.commit()
return deleted_count
def _update_cache_hits(self, conn):
conn.execute('''
UPDATE cache_stats
SET cache_hits = cache_hits + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
''')
def _update_cache_misses(self, conn):
conn.execute('''
UPDATE cache_stats
SET cache_misses = cache_misses + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
''')
def _update_total_generations(self, conn):
conn.execute('''
UPDATE cache_stats
SET total_generations = total_generations + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
''')
# Usage
sqlite_cache = SQLiteTokenCache()
token = sqlite_cache.get_fresh_token()
# Get stats
stats = sqlite_cache.get_cache_stats()
print(f"Cache performance: {stats}")
# Cleanup
deleted = sqlite_cache.cleanup_expired_tokens()
print(f"Cleaned up {deleted} expired tokens")
Multi-Level Caching
Memory + Redis Cache
import redis
import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class MultiLevelTokenCache:
def __init__(self, redis_host='localhost', redis_port=6379):
self.upstox = UpstoxTOTP()
# Level 1: Memory cache
self._memory_token = None
self._memory_expiry = None
self._memory_lock = threading.Lock()
# Level 2: Redis cache
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.redis_key = 'upstox:token:multilevel'
def get_fresh_token(self):
"""Get token from multi-level cache or generate new one."""
# Level 1: Check memory cache
with self._memory_lock:
if (self._memory_token and self._memory_expiry and
self._memory_expiry > datetime.now() + timedelta(hours=1)):
print("✅ Using token from memory cache (L1)")
return self._memory_token
# Level 2: Check Redis cache
redis_token = self.redis_client.get(self.redis_key)
if redis_token:
# Store in memory cache
with self._memory_lock:
self._memory_token = redis_token
self._memory_expiry = datetime.now() + timedelta(hours=24)
print("✅ Using token from Redis cache (L2), cached in memory")
return redis_token
# Level 3: Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token = response.data.access_token
# Store in Redis (L2)
self.redis_client.setex(self.redis_key, 23 * 3600, token)
# Store in memory (L1)
with self._memory_lock:
self._memory_token = token
self._memory_expiry = datetime.now() + timedelta(hours=24)
print("💾 Token cached in memory (L1) and Redis (L2)")
return token
else:
raise Exception(f"Token generation failed: {response.error}")
def invalidate_cache(self, level='all'):
"""Invalidate cache at specified level."""
if level in ('all', 'memory'):
with self._memory_lock:
self._memory_token = None
self._memory_expiry = None
print("🗑️ Memory cache (L1) invalidated")
if level in ('all', 'redis'):
self.redis_client.delete(self.redis_key)
print("🗑️ Redis cache (L2) invalidated")
# Usage
multi_cache = MultiLevelTokenCache()
token1 = multi_cache.get_fresh_token() # Generates new token
token2 = multi_cache.get_fresh_token() # Uses memory cache
# Invalidate memory cache only
multi_cache.invalidate_cache('memory')
token3 = multi_cache.get_fresh_token() # Uses Redis cache
Advanced Caching Strategies
Write-Through Cache
import threading
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class WriteThroughTokenCache:
def __init__(self):
self.upstox = UpstoxTOTP()
self.primary_cache = {} # Memory
self.secondary_cache = {} # Persistent storage simulation
self.lock = threading.Lock()
def get_fresh_token(self):
"""Get token with write-through caching."""
with self.lock:
# Check primary cache
if self._is_token_valid(self.primary_cache):
print("✅ Using token from primary cache")
return self.primary_cache['token']
# Check secondary cache
if self._is_token_valid(self.secondary_cache):
# Promote to primary cache
self.primary_cache = self.secondary_cache.copy()
print("✅ Using token from secondary cache, promoted to primary")
return self.secondary_cache['token']
# Generate new token
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
token_data = {
'token': response.data.access_token,
'expiry': datetime.now() + timedelta(hours=24),
'user_id': response.data.user_id
}
# Write to both caches simultaneously
self.primary_cache = token_data.copy()
self.secondary_cache = token_data.copy()
print("💾 Token stored in both primary and secondary caches")
return token_data['token']
else:
raise Exception(f"Token generation failed: {response.error}")
def _is_token_valid(self, cache):
return (cache and 'token' in cache and 'expiry' in cache and
cache['expiry'] > datetime.now() + timedelta(hours=1))
# Usage
write_through_cache = WriteThroughTokenCache()
token = write_through_cache.get_fresh_token()
Cache with Background Refresh
import threading
import time
from datetime import datetime, timedelta
from upstox_totp import UpstoxTOTP
class BackgroundRefreshCache:
def __init__(self, refresh_threshold_hours=2):
self.upstox = UpstoxTOTP()
self.refresh_threshold = timedelta(hours=refresh_threshold_hours)
self.token_data = None
self.lock = threading.Lock()
self.refresh_thread = None
self.stop_refresh = threading.Event()
def get_fresh_token(self):
"""Get token with background refresh."""
with self.lock:
# Check if we have a valid token
if self._is_token_valid():
# Check if we need background refresh
if self._needs_refresh():
self._start_background_refresh()
return self.token_data['token']
# No valid token, generate immediately
return self._generate_token()
def _is_token_valid(self):
return (self.token_data and
self.token_data['expiry'] > datetime.now() + timedelta(hours=1))
def _needs_refresh(self):
return (self.token_data and
self.token_data['expiry'] - datetime.now() < self.refresh_threshold)
def _generate_token(self):
"""Generate new token synchronously."""
print("🔄 Generating new token...")
response = self.upstox.app_token.get_access_token()
if response.success and response.data:
self.token_data = {
'token': response.data.access_token,
'expiry': datetime.now() + timedelta(hours=24),
'generated_at': datetime.now()
}
print("💾 Token generated and cached")
return self.token_data['token']
else:
raise Exception(f"Token generation failed: {response.error}")
def _start_background_refresh(self):
"""Start background token refresh if not already running."""
if self.refresh_thread and self.refresh_thread.is_alive():
return # Already refreshing
self.refresh_thread = threading.Thread(target=self._background_refresh)
self.refresh_thread.daemon = True
self.refresh_thread.start()
print("🔄 Background token refresh started")
def _background_refresh(self):
"""Background thread to refresh token."""
try:
time.sleep(1) # Small delay to avoid immediate refresh
with self.lock:
if not self._needs_refresh():
return # No longer needs refresh
print("🔄 Background: Refreshing token...")
self._generate_token()
print("✅ Background: Token refreshed successfully")
except Exception as e:
print(f"❌ Background refresh failed: {e}")
def stop_background_refresh(self):
"""Stop background refresh thread."""
self.stop_refresh.set()
if self.refresh_thread:
self.refresh_thread.join(timeout=5)
# Usage
bg_cache = BackgroundRefreshCache(refresh_threshold_hours=2)
# This will generate a token
token1 = bg_cache.get_fresh_token()
# Later calls will use cached token and start background refresh if needed
token2 = bg_cache.get_fresh_token()
# Clean shutdown
bg_cache.stop_background_refresh()
Cache Monitoring and Metrics
Comprehensive Cache Monitor
import time
import threading
from datetime import datetime, timedelta
from collections import defaultdict
from upstox_totp import UpstoxTOTP
class MonitoredTokenCache:
def __init__(self):
self.upstox = UpstoxTOTP()
self.cache = {}
self.lock = threading.Lock()
# Metrics
self.metrics = {
'cache_hits': 0,
'cache_misses': 0,
'token_generations': 0,
'total_requests': 0,
'average_generation_time': 0,
'last_generation_time': None,
'generation_times': [],
'errors': 0
}
self.request_history = []
def get_fresh_token(self):
"""Get token with comprehensive monitoring."""
start_time = time.time()
request_id = f"req_{int(time.time() * 1000)}"
self.metrics['total_requests'] += 1
with self.lock:
# Check cache
if self._is_token_valid():
self.metrics['cache_hits'] += 1
self._log_request(request_id, 'cache_hit', time.time() - start_time)
return self.cache['token']
# Cache miss - generate new token
self.metrics['cache_misses'] += 1
try:
generation_start = time.time()
response = self.upstox.app_token.get_access_token()
generation_time = time.time() - generation_start
if response.success and response.data:
self.cache = {
'token': response.data.access_token,
'expiry': datetime.now() + timedelta(hours=24),
'generated_at': datetime.now(),
'generation_time': generation_time
}
# Update metrics
self.metrics['token_generations'] += 1
self.metrics['generation_times'].append(generation_time)
self.metrics['average_generation_time'] = (
sum(self.metrics['generation_times']) /
len(self.metrics['generation_times'])
)
self.metrics['last_generation_time'] = datetime.now()
total_time = time.time() - start_time
self._log_request(request_id, 'cache_miss_generated', total_time, generation_time)
return self.cache['token']
else:
self.metrics['errors'] += 1
self._log_request(request_id, 'error', time.time() - start_time)
raise Exception(f"Token generation failed: {response.error}")
except Exception as e:
self.metrics['errors'] += 1
self._log_request(request_id, 'exception', time.time() - start_time)
raise
def _is_token_valid(self):
return (self.cache and 'expiry' in self.cache and
self.cache['expiry'] > datetime.now() + timedelta(hours=1))
def _log_request(self, request_id, result_type, total_time, generation_time=None):
log_entry = {
'request_id': request_id,
'timestamp': datetime.now(),
'result_type': result_type,
'total_time': total_time,
'generation_time': generation_time
}
self.request_history.append(log_entry)
# Keep only last 1000 requests
if len(self.request_history) > 1000:
self.request_history.pop(0)
def get_metrics(self):
"""Get comprehensive cache metrics."""
with self.lock:
total_requests = self.metrics['total_requests']
cache_hits = self.metrics['cache_hits']
hit_ratio = (cache_hits / total_requests * 100) if total_requests > 0 else 0
recent_requests = [
req for req in self.request_history
if req['timestamp'] > datetime.now() - timedelta(minutes=5)
]
return {
'cache_hits': cache_hits,
'cache_misses': self.metrics['cache_misses'],
'hit_ratio': hit_ratio,
'token_generations': self.metrics['token_generations'],
'total_requests': total_requests,
'average_generation_time': self.metrics['average_generation_time'],
'last_generation_time': self.metrics['last_generation_time'],
'errors': self.metrics['errors'],
'recent_requests_5min': len(recent_requests),
'cache_status': 'valid' if self._is_token_valid() else 'invalid'
}
def get_performance_report(self):
"""Get detailed performance report."""
metrics = self.get_metrics()
generation_times = self.metrics['generation_times']
if generation_times:
report = {
**metrics,
'min_generation_time': min(generation_times),
'max_generation_time': max(generation_times),
'median_generation_time': sorted(generation_times)[len(generation_times)//2],
'p95_generation_time': sorted(generation_times)[int(len(generation_times)*0.95)],
'generation_time_samples': len(generation_times)
}
else:
report = metrics
return report
# Usage
monitored_cache = MonitoredTokenCache()
# Generate some requests
for i in range(10):
token = monitored_cache.get_fresh_token()
time.sleep(0.1) # Simulate some delay
# Get metrics
metrics = monitored_cache.get_metrics()
print(f"Cache hit ratio: {metrics['hit_ratio']:.1f}%")
print(f"Average generation time: {metrics['average_generation_time']:.2f}s")
# Get detailed report
report = monitored_cache.get_performance_report()
print(f"Performance report: {report}")
Best Practices
Choose Appropriate TTL: Use 23 hours for tokens (1-hour safety buffer)
Implement Proper Locking: Use threading locks for thread-safe caching
Handle Cache Failures: Always have fallback to token generation
Monitor Cache Performance: Track hit ratios and generation times
Secure Sensitive Data: Encrypt cached tokens when possible
Clean Up Expired Entries: Regularly remove old cache entries
Use Multi-Level Caching: Combine memory and persistent caching
Implement Background Refresh: Refresh tokens before they expire
Add Proper Logging: Log cache operations for debugging
Test Cache Logic: Thoroughly test cache expiry and refresh logic
See Also
Basic Usage Examples - Basic usage examples
Integration Examples - Framework integration examples
Security Best Practices - Security considerations for token storage
Advanced Usage Guide - Advanced usage patterns