Mejores Prácticas de Producción
Despliega tu integración con CallCov con confianza usando estos patrones y mejores prácticas listas para producción.
Seguridad
Gestión de Claves de API
✅ Sí hacer:
- Guardar claves de API en variables de entorno o sistemas de gestión de secretos (AWS Secrets Manager, HashiCorp Vault)
- Usar claves separadas para desarrollo, staging y producción
- Rotar claves cada 90 días
- Monitorear el uso de la API para detectar anomalías
❌ No hacer:
- Codificar claves en el código fuente
- Hacer commit de claves al control de versiones
- Compartir claves por email o chat
- Reusar claves entre entornos
import boto3import json
def get_api_key(): """Recuperar clave de API desde AWS Secrets Manager""" client = boto3.client('secretsmanager', region_name='us-east-1')
try: response = client.get_secret_value(SecretId='callcov/api-key/production') secret = json.loads(response['SecretString']) return secret['api_key'] except Exception as e: print(f"Error recuperando clave de API: {e}") raise
API_KEY = get_api_key()Seguridad de Solicitudes
- Siempre usar HTTPS (nunca HTTP)
- Implementar firma de solicitudes para datos sensibles
- Validar certificados SSL
- Establecer timeouts razonables para prevenir conexiones colgadas
Rendimiento
Pooling de Conexiones
Reutiliza conexiones HTTP para reducir latencia:
import requestsfrom requests.adapters import HTTPAdapterfrom requests.packages.urllib3.util.pool import HTTPPoolManager
class CallCovClient: def __init__(self, api_key): self.api_key = api_key self.session = requests.Session()
# Configurar pooling de conexiones adapter = HTTPAdapter( pool_connections=20, pool_maxsize=100, max_retries=3, pool_block=False )
self.session.mount('https://', adapter) self.session.headers.update({'X-API-Key': api_key})
def analyze(self, audio_file, agent_id, contact_id): files = {"audio_file": audio_file} data = {"agent_id": agent_id, "contact_id": contact_id}
response = self.session.post( "https://api.callcov.com/api/v1/calls/analyze", files=files, data=data )
return response.json()
# Crear una instancia de cliente y reutilizarlaclient = CallCovClient("your_api_key")Cacheo
Cachea resultados de análisis para evitar reprocesamiento:
import redisimport hashlibimport json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_call_hash(audio_content): """Generar hash único para archivo de audio""" return hashlib.sha256(audio_content).hexdigest()
def analyze_with_cache(audio_content, agent_id, contact_id): # Verificar caché primero call_hash = get_call_hash(audio_content) cache_key = f"callcov:analysis:{call_hash}"
cached = redis_client.get(cache_key) if cached: print("¡Caché encontrado!") return json.loads(cached)
# No está en caché - llamar a la API result = api_client.analyze(audio_content, agent_id, contact_id)
# Cachear resultado por 7 días redis_client.setex( cache_key, 604800, # 7 días en segundos json.dumps(result) )
return resultProcesamiento Asíncrono
Usa async/await para mejor concurrencia:
import asyncioimport aiohttp
async def analyze_call_async(session, audio_path, agent_id, contact_id): """Envío de llamada asíncrono""" with open(audio_path, 'rb') as f: data = aiohttp.FormData() data.add_field('audio_file', f, filename=audio_path) data.add_field('agent_id', agent_id) data.add_field('contact_id', contact_id)
async with session.post( 'https://api.callcov.com/api/v1/calls/analyze', data=data ) as response: return await response.json()
async def process_batch(calls): """Procesar múltiples llamadas concurrentemente""" async with aiohttp.ClientSession( headers={'X-API-Key': API_KEY} ) as session: tasks = [ analyze_call_async(session, path, agent, contact) for path, agent, contact in calls ]
results = await asyncio.gather(*tasks, return_exceptions=True) return results
# Usocalls_to_process = [ ('call1.wav', 'AGENT_001', 'CONTACT_001'), ('call2.wav', 'AGENT_001', 'CONTACT_002'), ('call3.wav', 'AGENT_002', 'CONTACT_003'),]
results = asyncio.run(process_batch(calls_to_process))Monitoreo y Logging
Logging Estructurado
import loggingimport jsonfrom datetime import datetime
class CallCovLogger: def __init__(self): self.logger = logging.getLogger('callcov') self.logger.setLevel(logging.INFO)
def log_api_call(self, endpoint, method, status_code, duration_ms, **kwargs): """Registrar llamada a API con datos estructurados""" log_data = { 'timestamp': datetime.utcnow().isoformat(), 'service': 'callcov', 'endpoint': endpoint, 'method': method, 'status_code': status_code, 'duration_ms': duration_ms, **kwargs }
if status_code >= 400: self.logger.error(json.dumps(log_data)) else: self.logger.info(json.dumps(log_data))
# Usologger = CallCovLogger()
start = time.time()try: response = api_client.analyze(...) duration = (time.time() - start) * 1000
logger.log_api_call( endpoint='/calls/analyze', method='POST', status_code=response.status_code, duration_ms=duration, analysis_id=response.json().get('analysis_id') )except Exception as e: duration = (time.time() - start) * 1000 logger.log_api_call( endpoint='/calls/analyze', method='POST', status_code=getattr(e.response, 'status_code', 0), duration_ms=duration, error=str(e) )Métricas a Monitorear
Monitorea estas métricas clave en producción:
| Métrica | Objetivo | Umbral de Alerta |
|---|---|---|
| Latencia de solicitud (p95) | < 5s | > 10s |
| Tasa de error | < 1% | > 5% |
| Hits de límite de frecuencia | 0 | > 0 |
| Tasa de entrega de webhook | > 99% | < 95% |
| Tasa de hit de caché | > 80% | < 50% |
Health Checks
Implementa health checks para tu integración:
def health_check(): """Verificar conectividad con la API de CallCov""" try: response = requests.get( "https://api.callcov.com/api/v1/health", headers={"X-API-Key": API_KEY}, timeout=5 )
if response.status_code == 200: return {"status": "healthy", "callcov_api": "reachable"} else: return { "status": "degraded", "callcov_api": "error", "status_code": response.status_code } except Exception as e: return { "status": "unhealthy", "callcov_api": "unreachable", "error": str(e) }Escalabilidad
Procesamiento Basado en Colas
Para cargas de trabajo de alto volumen, usa una cola:
from celery import Celeryimport requests
app = Celery('callcov', broker='redis://localhost:6379/0')
@app.task( bind=True, max_retries=3, default_retry_delay=60)def analyze_call_task(self, audio_path, agent_id, contact_id): """Tarea Celery para análisis de llamadas asíncrono""" try: with open(audio_path, 'rb') as f: files = {"audio_file": f} data = {"agent_id": agent_id, "contact_id": contact_id} headers = {"X-API-Key": API_KEY}
response = requests.post( "https://api.callcov.com/api/v1/calls/analyze", headers=headers, files=files, data=data )
response.raise_for_status() return response.json()
except requests.exceptions.RequestException as exc: # Reintentar con backoff exponencial raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Uso: enviar tareas a la colaanalyze_call_task.delay('call.wav', 'AGENT_001', 'CONTACT_001')Limitación de Frecuencia (Lado del Cliente)
Implementa limitación de frecuencia del lado del cliente para evitar alcanzar límites de la API:
import timefrom threading import Lock
class RateLimiter: def __init__(self, max_requests, time_window): self.max_requests = max_requests self.time_window = time_window self.requests = [] self.lock = Lock()
def wait_if_needed(self): """Bloquear si se excedería el límite de frecuencia""" with self.lock: now = time.time()
# Eliminar solicitudes antiguas fuera de la ventana de tiempo self.requests = [ req_time for req_time in self.requests if now - req_time < self.time_window ]
if len(self.requests) >= self.max_requests: # Calcular tiempo de espera oldest_request = self.requests[0] wait_time = self.time_window - (now - oldest_request)
if wait_time > 0: time.sleep(wait_time) self.requests = []
self.requests.append(time.time())
# Uso: 60 solicitudes por minutolimiter = RateLimiter(max_requests=60, time_window=60)
def analyze_with_rate_limit(audio_file, agent_id, contact_id): limiter.wait_if_needed() return api_client.analyze(audio_file, agent_id, contact_id)Webhooks (Recomendado)
Usa webhooks en lugar de consultas para mejor rendimiento:
from flask import Flask, request, jsonifyimport hmacimport hashlib
app = Flask(__name__)WEBHOOK_SECRET = "your_webhook_secret"
@app.route('/webhooks/callcov', methods=['POST'])def handle_callcov_webhook(): """Manejar notificaciones de webhook de CallCov"""
# Verificar firma del webhook signature = request.headers.get('X-CallCov-Signature') body = request.get_data()
expected_signature = hmac.new( WEBHOOK_SECRET.encode(), body, hashlib.sha256 ).hexdigest()
if not hmac.compare_digest(signature, expected_signature): return jsonify({"error": "Firma inválida"}), 401
# Procesar webhook data = request.json event_type = data.get('event')
if event_type == 'analysis.completed': analysis_id = data['analysis_id'] result = data['result']
# Guardar o procesar resultado process_analysis_result(analysis_id, result)
elif event_type == 'analysis.failed': analysis_id = data['analysis_id'] error = data['error']
# Manejar fallo handle_analysis_failure(analysis_id, error)
return jsonify({"status": "received"}), 200
if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)Testing
Tests de Integración
Prueba tu integración con CallCov exhaustivamente:
import pytestimport responses
@responses.activatedef test_analyze_call_success(): """Probar análisis de llamada exitoso""" responses.add( responses.POST, 'https://api.callcov.com/api/v1/calls/analyze', json={'analysis_id': 'anl_test123', 'status': 'processing'}, status=200 )
result = api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert result['analysis_id'] == 'anl_test123' assert result['status'] == 'processing'
@responses.activatedef test_analyze_call_rate_limit(): """Probar manejo de límite de frecuencia""" responses.add( responses.POST, 'https://api.callcov.com/api/v1/calls/analyze', json={'error': {'code': 'rate_limit_exceeded'}}, status=429, headers={'Retry-After': '60'} )
with pytest.raises(RateLimitException) as exc_info: api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert exc_info.value.retry_after == 60Lista de Verificación de Despliegue
Antes de poner en producción, verifica:
- Claves de API guardadas de forma segura en variables de entorno o gestor de secretos
- Claves separadas para staging y producción
- Manejo de errores y lógica de reintento implementados
- Limitación de frecuencia (lado del cliente) configurada
- Pooling de conexiones habilitado
- Timeouts configurados (30-60s para cargas)
- Logging y monitoreo en su lugar
- Webhooks configurados (recomendado)
- Health checks implementados
- Tests de integración pasando
- Load testing completado
- Umbrales de alerta configurados
- Documentación para equipo de guardia
- Plan de rollback preparado
Próximos Pasos
- Guía de Webhooks - Configura notificaciones en tiempo real
- Manejo de Errores - Maneja errores elegantemente
- Guía de Autenticación - Asegura tus claves de API
¿Necesitas Ayuda?
- Email: support@callcov.com
- Documentación: docs.callcov.com
- Panel de Control: app.callcov.com