Skip to main content

Production Best Practices

Deploy your CallCov integration with confidence using these production-ready patterns and best practices.

Security​

API Key Management​

βœ… Do:

  • Store API keys in environment variables or secret management systems (AWS Secrets Manager, HashiCorp Vault)
  • Use separate keys for development, staging, and production
  • Rotate keys every 90 days
  • Monitor API usage for anomalies

❌ Don't:

  • Hardcode keys in source code
  • Commit keys to version control
  • Share keys via email or chat
  • Reuse keys across environments
import boto3
import json
def get_api_key():
"""Retrieve API key from AWS Secrets Manager"""
client = boto3.client('secretsmanager', region_name='us-east-1')
try:
response = client.get_secret_value(SecretId='callcov/api-key/production')
secret = json.loads(response['SecretString'])
return secret['api_key']
except Exception as e:
print(f"Error retrieving API key: {e}")
raise
API_KEY = get_api_key()

Request Security​

  • Always use HTTPS (never HTTP)
  • Implement request signing for sensitive data
  • Validate SSL certificates
  • Set reasonable timeouts to prevent hanging connections

Performance​

Connection Pooling​

Reuse HTTP connections to reduce latency:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.pool import HTTPPoolManager
class CallCovClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = requests.Session()
# Configure connection pooling
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=100,
max_retries=3,
pool_block=False
)
self.session.mount('https://', adapter)
self.session.headers.update({'X-API-Key': api_key})
def analyze(self, audio_file, agent_id, contact_id):
files = {"audio_file": audio_file}
data = {"agent_id": agent_id, "contact_id": contact_id}
response = self.session.post(
"https://api.callcov.com/api/v1/calls/analyze",
files=files,
data=data
)
return response.json()
# Create one client instance and reuse it
client = CallCovClient("your_api_key")

Caching​

Cache analysis results to avoid reprocessing:

import redis
import hashlib
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_call_hash(audio_content):
"""Generate unique hash for audio file"""
return hashlib.sha256(audio_content).hexdigest()
def analyze_with_cache(audio_content, agent_id, contact_id):
# Check cache first
call_hash = get_call_hash(audio_content)
cache_key = f"callcov:analysis:{call_hash}"
cached = redis_client.get(cache_key)
if cached:
print("Cache hit!")
return json.loads(cached)
# Not in cache - call API
result = api_client.analyze(audio_content, agent_id, contact_id)
# Cache result for 7 days
redis_client.setex(
cache_key,
604800, # 7 days in seconds
json.dumps(result)
)
return result

Async Processing​

Use async/await for better concurrency:

import asyncio
import aiohttp
async def analyze_call_async(session, audio_path, agent_id, contact_id):
"""Async call submission"""
with open(audio_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('audio_file', f, filename=audio_path)
data.add_field('agent_id', agent_id)
data.add_field('contact_id', contact_id)
async with session.post(
'https://api.callcov.com/api/v1/calls/analyze',
data=data
) as response:
return await response.json()
async def process_batch(calls):
"""Process multiple calls concurrently"""
async with aiohttp.ClientSession(
headers={'X-API-Key': API_KEY}
) as session:
tasks = [
analyze_call_async(session, path, agent, contact)
for path, agent, contact in calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage
calls_to_process = [
('call1.wav', 'AGENT_001', 'CONTACT_001'),
('call2.wav', 'AGENT_001', 'CONTACT_002'),
('call3.wav', 'AGENT_002', 'CONTACT_003'),
]
results = asyncio.run(process_batch(calls_to_process))

Monitoring & Logging​

Structured Logging​

import logging
import json
from datetime import datetime
class CallCovLogger:
def __init__(self):
self.logger = logging.getLogger('callcov')
self.logger.setLevel(logging.INFO)
def log_api_call(self, endpoint, method, status_code, duration_ms, **kwargs):
"""Log API call with structured data"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'service': 'callcov',
'endpoint': endpoint,
'method': method,
'status_code': status_code,
'duration_ms': duration_ms,
**kwargs
}
if status_code >= 400:
self.logger.error(json.dumps(log_data))
else:
self.logger.info(json.dumps(log_data))
# Usage
logger = CallCovLogger()
start = time.time()
try:
response = api_client.analyze(...)
duration = (time.time() - start) * 1000
logger.log_api_call(
endpoint='/calls/analyze',
method='POST',
status_code=response.status_code,
duration_ms=duration,
analysis_id=response.json().get('analysis_id')
)
except Exception as e:
duration = (time.time() - start) * 1000
logger.log_api_call(
endpoint='/calls/analyze',
method='POST',
status_code=getattr(e.response, 'status_code', 0),
duration_ms=duration,
error=str(e)
)

Metrics to Track​

Monitor these key metrics in production:

MetricTargetAlert Threshold
Request latency (p95)< 5s> 10s
Error rate< 1%> 5%
Rate limit hits0> 0
Webhook delivery rate> 99%< 95%
Cache hit rate> 80%< 50%

Health Checks​

Implement health checks for your integration:

def health_check():
"""Check CallCov API connectivity"""
try:
response = requests.get(
"https://api.callcov.com/api/v1/health",
headers={"X-API-Key": API_KEY},
timeout=5
)
if response.status_code == 200:
return {"status": "healthy", "callcov_api": "reachable"}
else:
return {
"status": "degraded",
"callcov_api": "error",
"status_code": response.status_code
}
except Exception as e:
return {
"status": "unhealthy",
"callcov_api": "unreachable",
"error": str(e)
}

Scalability​

Queue-Based Processing​

For high-volume workloads, use a queue:

from celery import Celery
import requests
app = Celery('callcov', broker='redis://localhost:6379/0')
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def analyze_call_task(self, audio_path, agent_id, contact_id):
"""Celery task for async call analysis"""
try:
with open(audio_path, 'rb') as f:
files = {"audio_file": f}
data = {"agent_id": agent_id, "contact_id": contact_id}
headers = {"X-API-Key": API_KEY}
response = requests.post(
"https://api.callcov.com/api/v1/calls/analyze",
headers=headers,
files=files,
data=data
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Usage: submit tasks to queue
analyze_call_task.delay('call.wav', 'AGENT_001', 'CONTACT_001')

Rate Limiting (Client-Side)​

Implement client-side rate limiting to avoid hitting API limits:

import time
from threading import Lock
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
self.lock = Lock()
def wait_if_needed(self):
"""Block if rate limit would be exceeded"""
with self.lock:
now = time.time()
# Remove old requests outside time window
self.requests = [
req_time for req_time in self.requests
if now - req_time < self.time_window
]
if len(self.requests) >= self.max_requests:
# Calculate wait time
oldest_request = self.requests[0]
wait_time = self.time_window - (now - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
self.requests = []
self.requests.append(time.time())
# Usage: 60 requests per minute
limiter = RateLimiter(max_requests=60, time_window=60)
def analyze_with_rate_limit(audio_file, agent_id, contact_id):
limiter.wait_if_needed()
return api_client.analyze(audio_file, agent_id, contact_id)

Use webhooks instead of polling for better performance:

from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"
@app.route('/webhooks/callcov', methods=['POST'])
def handle_callcov_webhook():
"""Handle CallCov webhook notifications"""
# Verify webhook signature
signature = request.headers.get('X-CallCov-Signature')
body = request.get_data()
expected_signature = hmac.new(
WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return jsonify({"error": "Invalid signature"}), 401
# Process webhook
data = request.json
event_type = data.get('event')
if event_type == 'analysis.completed':
analysis_id = data['analysis_id']
result = data['result']
# Store or process result
process_analysis_result(analysis_id, result)
elif event_type == 'analysis.failed':
analysis_id = data['analysis_id']
error = data['error']
# Handle failure
handle_analysis_failure(analysis_id, error)
return jsonify({"status": "received"}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

Testing​

Integration Tests​

Test your CallCov integration thoroughly:

import pytest
import responses
@responses.activate
def test_analyze_call_success():
"""Test successful call analysis"""
responses.add(
responses.POST,
'https://api.callcov.com/api/v1/calls/analyze',
json={'analysis_id': 'anl_test123', 'status': 'processing'},
status=200
)
result = api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert result['analysis_id'] == 'anl_test123'
assert result['status'] == 'processing'
@responses.activate
def test_analyze_call_rate_limit():
"""Test rate limit handling"""
responses.add(
responses.POST,
'https://api.callcov.com/api/v1/calls/analyze',
json={'error': {'code': 'rate_limit_exceeded'}},
status=429,
headers={'Retry-After': '60'}
)
with pytest.raises(RateLimitException) as exc_info:
api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert exc_info.value.retry_after == 60

Deployment Checklist​

Before going live, verify:

  • API keys securely stored in environment variables or secret manager
  • Separate keys for staging and production
  • Error handling and retry logic implemented
  • Rate limiting (client-side) configured
  • Connection pooling enabled
  • Timeouts configured (30-60s for uploads)
  • Logging and monitoring in place
  • Webhooks configured (recommended)
  • Health checks implemented
  • Integration tests passing
  • Load testing completed
  • Alert thresholds configured
  • Documentation for on-call team
  • Rollback plan prepared

Next Steps​

Need Help?​