Production Best Practices
Deploy your CallCov integration with confidence using these production-ready patterns and best practices.
Securityβ
API Key Managementβ
β Do:
- Store API keys in environment variables or secret management systems (AWS Secrets Manager, HashiCorp Vault)
- Use separate keys for development, staging, and production
- Rotate keys every 90 days
- Monitor API usage for anomalies
β Don't:
- Hardcode keys in source code
- Commit keys to version control
- Share keys via email or chat
- Reuse keys across environments
import boto3import json
def get_api_key(): """Retrieve API key from AWS Secrets Manager""" client = boto3.client('secretsmanager', region_name='us-east-1')
try: response = client.get_secret_value(SecretId='callcov/api-key/production') secret = json.loads(response['SecretString']) return secret['api_key'] except Exception as e: print(f"Error retrieving API key: {e}") raise
API_KEY = get_api_key()Request Securityβ
- Always use HTTPS (never HTTP)
- Implement request signing for sensitive data
- Validate SSL certificates
- Set reasonable timeouts to prevent hanging connections
Performanceβ
Connection Poolingβ
Reuse HTTP connections to reduce latency:
import requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.pool import HTTPPoolManager
class CallCovClient: def __init__(self, api_key): self.api_key = api_key self.session = requests.Session()
# Configure connection pooling adapter = HTTPAdapter( pool_connections=20, pool_maxsize=100, max_retries=3, pool_block=False )
self.session.mount('https://', adapter) self.session.headers.update({'X-API-Key': api_key})
def analyze(self, audio_file, agent_id, contact_id): files = {"audio_file": audio_file} data = {"agent_id": agent_id, "contact_id": contact_id}
response = self.session.post( "https://api.callcov.com/api/v1/calls/analyze", files=files, data=data )
return response.json()
# Create one client instance and reuse itclient = CallCovClient("your_api_key")Cachingβ
Cache analysis results to avoid reprocessing:
import redisimport hashlibimport json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_call_hash(audio_content): """Generate unique hash for audio file""" return hashlib.sha256(audio_content).hexdigest()
def analyze_with_cache(audio_content, agent_id, contact_id): # Check cache first call_hash = get_call_hash(audio_content) cache_key = f"callcov:analysis:{call_hash}"
cached = redis_client.get(cache_key) if cached: print("Cache hit!") return json.loads(cached)
# Not in cache - call API result = api_client.analyze(audio_content, agent_id, contact_id)
# Cache result for 7 days redis_client.setex( cache_key, 604800, # 7 days in seconds json.dumps(result) )
return resultAsync Processingβ
Use async/await for better concurrency:
import asyncioimport aiohttp
async def analyze_call_async(session, audio_path, agent_id, contact_id): """Async call submission""" with open(audio_path, 'rb') as f: data = aiohttp.FormData() data.add_field('audio_file', f, filename=audio_path) data.add_field('agent_id', agent_id) data.add_field('contact_id', contact_id)
async with session.post( 'https://api.callcov.com/api/v1/calls/analyze', data=data ) as response: return await response.json()
async def process_batch(calls): """Process multiple calls concurrently""" async with aiohttp.ClientSession( headers={'X-API-Key': API_KEY} ) as session: tasks = [ analyze_call_async(session, path, agent, contact) for path, agent, contact in calls ]
results = await asyncio.gather(*tasks, return_exceptions=True) return results
# Usagecalls_to_process = [ ('call1.wav', 'AGENT_001', 'CONTACT_001'), ('call2.wav', 'AGENT_001', 'CONTACT_002'), ('call3.wav', 'AGENT_002', 'CONTACT_003'),]
results = asyncio.run(process_batch(calls_to_process))Monitoring & Loggingβ
Structured Loggingβ
import loggingimport jsonfrom datetime import datetime
class CallCovLogger: def __init__(self): self.logger = logging.getLogger('callcov') self.logger.setLevel(logging.INFO)
def log_api_call(self, endpoint, method, status_code, duration_ms, **kwargs): """Log API call with structured data""" log_data = { 'timestamp': datetime.utcnow().isoformat(), 'service': 'callcov', 'endpoint': endpoint, 'method': method, 'status_code': status_code, 'duration_ms': duration_ms, **kwargs }
if status_code >= 400: self.logger.error(json.dumps(log_data)) else: self.logger.info(json.dumps(log_data))
# Usagelogger = CallCovLogger()
start = time.time()try: response = api_client.analyze(...) duration = (time.time() - start) * 1000
logger.log_api_call( endpoint='/calls/analyze', method='POST', status_code=response.status_code, duration_ms=duration, analysis_id=response.json().get('analysis_id') )except Exception as e: duration = (time.time() - start) * 1000 logger.log_api_call( endpoint='/calls/analyze', method='POST', status_code=getattr(e.response, 'status_code', 0), duration_ms=duration, error=str(e) )Metrics to Trackβ
Monitor these key metrics in production:
| Metric | Target | Alert Threshold |
|---|---|---|
| Request latency (p95) | < 5s | > 10s |
| Error rate | < 1% | > 5% |
| Rate limit hits | 0 | > 0 |
| Webhook delivery rate | > 99% | < 95% |
| Cache hit rate | > 80% | < 50% |
Health Checksβ
Implement health checks for your integration:
def health_check(): """Check CallCov API connectivity""" try: response = requests.get( "https://api.callcov.com/api/v1/health", headers={"X-API-Key": API_KEY}, timeout=5 )
if response.status_code == 200: return {"status": "healthy", "callcov_api": "reachable"} else: return { "status": "degraded", "callcov_api": "error", "status_code": response.status_code } except Exception as e: return { "status": "unhealthy", "callcov_api": "unreachable", "error": str(e) }Scalabilityβ
Queue-Based Processingβ
For high-volume workloads, use a queue:
from celery import Celeryimport requests
app = Celery('callcov', broker='redis://localhost:6379/0')
@app.task( bind=True, max_retries=3, default_retry_delay=60)def analyze_call_task(self, audio_path, agent_id, contact_id): """Celery task for async call analysis""" try: with open(audio_path, 'rb') as f: files = {"audio_file": f} data = {"agent_id": agent_id, "contact_id": contact_id} headers = {"X-API-Key": API_KEY}
response = requests.post( "https://api.callcov.com/api/v1/calls/analyze", headers=headers, files=files, data=data )
response.raise_for_status() return response.json()
except requests.exceptions.RequestException as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Usage: submit tasks to queueanalyze_call_task.delay('call.wav', 'AGENT_001', 'CONTACT_001')Rate Limiting (Client-Side)β
Implement client-side rate limiting to avoid hitting API limits:
import timefrom threading import Lock
class RateLimiter: def __init__(self, max_requests, time_window): self.max_requests = max_requests self.time_window = time_window self.requests = [] self.lock = Lock()
def wait_if_needed(self): """Block if rate limit would be exceeded""" with self.lock: now = time.time()
# Remove old requests outside time window self.requests = [ req_time for req_time in self.requests if now - req_time < self.time_window ]
if len(self.requests) >= self.max_requests: # Calculate wait time oldest_request = self.requests[0] wait_time = self.time_window - (now - oldest_request)
if wait_time > 0: time.sleep(wait_time) self.requests = []
self.requests.append(time.time())
# Usage: 60 requests per minutelimiter = RateLimiter(max_requests=60, time_window=60)
def analyze_with_rate_limit(audio_file, agent_id, contact_id): limiter.wait_if_needed() return api_client.analyze(audio_file, agent_id, contact_id)Webhooks (Recommended)β
Use webhooks instead of polling for better performance:
from flask import Flask, request, jsonifyimport hmacimport hashlib
app = Flask(__name__)WEBHOOK_SECRET = "your_webhook_secret"
@app.route('/webhooks/callcov', methods=['POST'])def handle_callcov_webhook(): """Handle CallCov webhook notifications"""
# Verify webhook signature signature = request.headers.get('X-CallCov-Signature') body = request.get_data()
expected_signature = hmac.new( WEBHOOK_SECRET.encode(), body, hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(signature, expected_signature): return jsonify({"error": "Invalid signature"}), 401
# Process webhook data = request.json event_type = data.get('event')
if event_type == 'analysis.completed': analysis_id = data['analysis_id'] result = data['result']
# Store or process result process_analysis_result(analysis_id, result)
elif event_type == 'analysis.failed': analysis_id = data['analysis_id'] error = data['error']
# Handle failure handle_analysis_failure(analysis_id, error)
return jsonify({"status": "received"}), 200
if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)Testingβ
Integration Testsβ
Test your CallCov integration thoroughly:
import pytestimport responses
@responses.activatedef test_analyze_call_success(): """Test successful call analysis""" responses.add( responses.POST, 'https://api.callcov.com/api/v1/calls/analyze', json={'analysis_id': 'anl_test123', 'status': 'processing'}, status=200 )
result = api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert result['analysis_id'] == 'anl_test123' assert result['status'] == 'processing'
@responses.activatedef test_analyze_call_rate_limit(): """Test rate limit handling""" responses.add( responses.POST, 'https://api.callcov.com/api/v1/calls/analyze', json={'error': {'code': 'rate_limit_exceeded'}}, status=429, headers={'Retry-After': '60'} )
with pytest.raises(RateLimitException) as exc_info: api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert exc_info.value.retry_after == 60Deployment Checklistβ
Before going live, verify:
- API keys securely stored in environment variables or secret manager
- Separate keys for staging and production
- Error handling and retry logic implemented
- Rate limiting (client-side) configured
- Connection pooling enabled
- Timeouts configured (30-60s for uploads)
- Logging and monitoring in place
- Webhooks configured (recommended)
- Health checks implemented
- Integration tests passing
- Load testing completed
- Alert thresholds configured
- Documentation for on-call team
- Rollback plan prepared
Next Stepsβ
- Webhooks Guide - Set up real-time notifications
- Error Handling - Handle errors gracefully
- Authentication Guide - Secure your API keys
Need Help?β
- Email: support@callcov.com
- Documentation: docs.callcov.com
- Dashboard: app.callcov.com