Saltar al contenido principal

Mejores Prácticas de Producción

Despliega tu integración con CallCov con confianza usando estos patrones y mejores prácticas listas para producción.

Seguridad

Gestión de Claves de API

✅ Sí hacer:

  • Guardar claves de API en variables de entorno o sistemas de gestión de secretos (AWS Secrets Manager, HashiCorp Vault)
  • Usar claves separadas para desarrollo, staging y producción
  • Rotar claves cada 90 días
  • Monitorear el uso de la API para detectar anomalías

❌ No hacer:

  • Codificar claves en el código fuente
  • Hacer commit de claves al control de versiones
  • Compartir claves por email o chat
  • Reusar claves entre entornos
import boto3
import json
def get_api_key():
"""Recuperar clave de API desde AWS Secrets Manager"""
client = boto3.client('secretsmanager', region_name='us-east-1')
try:
response = client.get_secret_value(SecretId='callcov/api-key/production')
secret = json.loads(response['SecretString'])
return secret['api_key']
except Exception as e:
print(f"Error recuperando clave de API: {e}")
raise
API_KEY = get_api_key()

Seguridad de Solicitudes

  • Siempre usar HTTPS (nunca HTTP)
  • Implementar firma de solicitudes para datos sensibles
  • Validar certificados SSL
  • Establecer timeouts razonables para prevenir conexiones colgadas

Rendimiento

Pooling de Conexiones

Reutiliza conexiones HTTP para reducir latencia:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.pool import HTTPPoolManager
class CallCovClient:
def __init__(self, api_key):
self.api_key = api_key
self.session = requests.Session()
# Configurar pooling de conexiones
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=100,
max_retries=3,
pool_block=False
)
self.session.mount('https://', adapter)
self.session.headers.update({'X-API-Key': api_key})
def analyze(self, audio_file, agent_id, contact_id):
files = {"audio_file": audio_file}
data = {"agent_id": agent_id, "contact_id": contact_id}
response = self.session.post(
"https://api.callcov.com/api/v1/calls/analyze",
files=files,
data=data
)
return response.json()
# Crear una instancia de cliente y reutilizarla
client = CallCovClient("your_api_key")

Cacheo

Cachea resultados de análisis para evitar reprocesamiento:

import redis
import hashlib
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_call_hash(audio_content):
"""Generar hash único para archivo de audio"""
return hashlib.sha256(audio_content).hexdigest()
def analyze_with_cache(audio_content, agent_id, contact_id):
# Verificar caché primero
call_hash = get_call_hash(audio_content)
cache_key = f"callcov:analysis:{call_hash}"
cached = redis_client.get(cache_key)
if cached:
print("¡Caché encontrado!")
return json.loads(cached)
# No está en caché - llamar a la API
result = api_client.analyze(audio_content, agent_id, contact_id)
# Cachear resultado por 7 días
redis_client.setex(
cache_key,
604800, # 7 días en segundos
json.dumps(result)
)
return result

Procesamiento Asíncrono

Usa async/await para mejor concurrencia:

import asyncio
import aiohttp
async def analyze_call_async(session, audio_path, agent_id, contact_id):
"""Envío de llamada asíncrono"""
with open(audio_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('audio_file', f, filename=audio_path)
data.add_field('agent_id', agent_id)
data.add_field('contact_id', contact_id)
async with session.post(
'https://api.callcov.com/api/v1/calls/analyze',
data=data
) as response:
return await response.json()
async def process_batch(calls):
"""Procesar múltiples llamadas concurrentemente"""
async with aiohttp.ClientSession(
headers={'X-API-Key': API_KEY}
) as session:
tasks = [
analyze_call_async(session, path, agent, contact)
for path, agent, contact in calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Uso
calls_to_process = [
('call1.wav', 'AGENT_001', 'CONTACT_001'),
('call2.wav', 'AGENT_001', 'CONTACT_002'),
('call3.wav', 'AGENT_002', 'CONTACT_003'),
]
results = asyncio.run(process_batch(calls_to_process))

Monitoreo y Logging

Logging Estructurado

import logging
import json
from datetime import datetime
class CallCovLogger:
def __init__(self):
self.logger = logging.getLogger('callcov')
self.logger.setLevel(logging.INFO)
def log_api_call(self, endpoint, method, status_code, duration_ms, **kwargs):
"""Registrar llamada a API con datos estructurados"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'service': 'callcov',
'endpoint': endpoint,
'method': method,
'status_code': status_code,
'duration_ms': duration_ms,
**kwargs
}
if status_code >= 400:
self.logger.error(json.dumps(log_data))
else:
self.logger.info(json.dumps(log_data))
# Uso
logger = CallCovLogger()
start = time.time()
try:
response = api_client.analyze(...)
duration = (time.time() - start) * 1000
logger.log_api_call(
endpoint='/calls/analyze',
method='POST',
status_code=response.status_code,
duration_ms=duration,
analysis_id=response.json().get('analysis_id')
)
except Exception as e:
duration = (time.time() - start) * 1000
logger.log_api_call(
endpoint='/calls/analyze',
method='POST',
status_code=getattr(e.response, 'status_code', 0),
duration_ms=duration,
error=str(e)
)

Métricas a Monitorear

Monitorea estas métricas clave en producción:

MétricaObjetivoUmbral de Alerta
Latencia de solicitud (p95)< 5s> 10s
Tasa de error< 1%> 5%
Hits de límite de frecuencia0> 0
Tasa de entrega de webhook> 99%< 95%
Tasa de hit de caché> 80%< 50%

Health Checks

Implementa health checks para tu integración:

def health_check():
"""Verificar conectividad con la API de CallCov"""
try:
response = requests.get(
"https://api.callcov.com/api/v1/health",
headers={"X-API-Key": API_KEY},
timeout=5
)
if response.status_code == 200:
return {"status": "healthy", "callcov_api": "reachable"}
else:
return {
"status": "degraded",
"callcov_api": "error",
"status_code": response.status_code
}
except Exception as e:
return {
"status": "unhealthy",
"callcov_api": "unreachable",
"error": str(e)
}

Escalabilidad

Procesamiento Basado en Colas

Para cargas de trabajo de alto volumen, usa una cola:

from celery import Celery
import requests
app = Celery('callcov', broker='redis://localhost:6379/0')
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def analyze_call_task(self, audio_path, agent_id, contact_id):
"""Tarea Celery para análisis de llamadas asíncrono"""
try:
with open(audio_path, 'rb') as f:
files = {"audio_file": f}
data = {"agent_id": agent_id, "contact_id": contact_id}
headers = {"X-API-Key": API_KEY}
response = requests.post(
"https://api.callcov.com/api/v1/calls/analyze",
headers=headers,
files=files,
data=data
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
# Reintentar con backoff exponencial
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# Uso: enviar tareas a la cola
analyze_call_task.delay('call.wav', 'AGENT_001', 'CONTACT_001')

Limitación de Frecuencia (Lado del Cliente)

Implementa limitación de frecuencia del lado del cliente para evitar alcanzar límites de la API:

import time
from threading import Lock
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
self.lock = Lock()
def wait_if_needed(self):
"""Bloquear si se excedería el límite de frecuencia"""
with self.lock:
now = time.time()
# Eliminar solicitudes antiguas fuera de la ventana de tiempo
self.requests = [
req_time for req_time in self.requests
if now - req_time < self.time_window
]
if len(self.requests) >= self.max_requests:
# Calcular tiempo de espera
oldest_request = self.requests[0]
wait_time = self.time_window - (now - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
self.requests = []
self.requests.append(time.time())
# Uso: 60 solicitudes por minuto
limiter = RateLimiter(max_requests=60, time_window=60)
def analyze_with_rate_limit(audio_file, agent_id, contact_id):
limiter.wait_if_needed()
return api_client.analyze(audio_file, agent_id, contact_id)

Webhooks (Recomendado)

Usa webhooks en lugar de consultas para mejor rendimiento:

from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"
@app.route('/webhooks/callcov', methods=['POST'])
def handle_callcov_webhook():
"""Manejar notificaciones de webhook de CallCov"""
# Verificar firma del webhook
signature = request.headers.get('X-CallCov-Signature')
body = request.get_data()
expected_signature = hmac.new(
WEBHOOK_SECRET.encode(),
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return jsonify({"error": "Firma inválida"}), 401
# Procesar webhook
data = request.json
event_type = data.get('event')
if event_type == 'analysis.completed':
analysis_id = data['analysis_id']
result = data['result']
# Guardar o procesar resultado
process_analysis_result(analysis_id, result)
elif event_type == 'analysis.failed':
analysis_id = data['analysis_id']
error = data['error']
# Manejar fallo
handle_analysis_failure(analysis_id, error)
return jsonify({"status": "received"}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

Testing

Tests de Integración

Prueba tu integración con CallCov exhaustivamente:

import pytest
import responses
@responses.activate
def test_analyze_call_success():
"""Probar análisis de llamada exitoso"""
responses.add(
responses.POST,
'https://api.callcov.com/api/v1/calls/analyze',
json={'analysis_id': 'anl_test123', 'status': 'processing'},
status=200
)
result = api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert result['analysis_id'] == 'anl_test123'
assert result['status'] == 'processing'
@responses.activate
def test_analyze_call_rate_limit():
"""Probar manejo de límite de frecuencia"""
responses.add(
responses.POST,
'https://api.callcov.com/api/v1/calls/analyze',
json={'error': {'code': 'rate_limit_exceeded'}},
status=429,
headers={'Retry-After': '60'}
)
with pytest.raises(RateLimitException) as exc_info:
api_client.analyze('test.wav', 'AGENT_001', 'CONTACT_001')
assert exc_info.value.retry_after == 60

Lista de Verificación de Despliegue

Antes de poner en producción, verifica:

  • Claves de API guardadas de forma segura en variables de entorno o gestor de secretos
  • Claves separadas para staging y producción
  • Manejo de errores y lógica de reintento implementados
  • Limitación de frecuencia (lado del cliente) configurada
  • Pooling de conexiones habilitado
  • Timeouts configurados (30-60s para cargas)
  • Logging y monitoreo en su lugar
  • Webhooks configurados (recomendado)
  • Health checks implementados
  • Tests de integración pasando
  • Load testing completado
  • Umbrales de alerta configurados
  • Documentación para equipo de guardia
  • Plan de rollback preparado

Próximos Pasos

¿Necesitas Ayuda?