Operations Guide¶
Operating MCP Servers in Production¶
Effective operations ensure MCP servers maintain high availability, performance, and reliability in production environments.
Operational Excellence¶
Key Principles¶
- Observability - Monitor, log, trace everything
- Automation - Automate repetitive tasks
- Reliability - Design for failure
- Performance - Optimize continuously
- Security - Defense in depth
Operational Readiness¶
Pre-Production Checklist¶
- Monitoring dashboards configured
- Alerts and notifications set up
- Logging pipeline established
- Backup and recovery tested
- Runbooks documented
- On-call rotation scheduled
- Incident response plan ready
- Capacity planning completed
Monitoring Strategy¶
Four Golden Signals¶
- Latency - Response time
- Traffic - Request rate
- Errors - Failure rate
- Saturation - Resource utilization
Key Metrics¶
# Application metrics
mcp_requests_total
mcp_request_duration_seconds
mcp_errors_total
mcp_active_connections
# System metrics
cpu_utilization_percent
memory_usage_bytes
disk_io_operations
network_throughput_bytes
# Business metrics
tools_executed_total
resources_accessed_total
user_sessions_active
api_quota_remaining
Health Checks¶
Liveness Probe¶
@app.route('/health/live')
def liveness():
"""Basic liveness check - is the process running?"""
return {'status': 'alive'}, 200
Readiness Probe¶
@app.route('/health/ready')
def readiness():
"""Readiness check - can we serve traffic?"""
checks = {
'database': check_database_connection(),
'cache': check_cache_connection(),
'dependencies': check_external_services()
}
if all(checks.values()):
return {'status': 'ready', 'checks': checks}, 200
else:
return {'status': 'not_ready', 'checks': checks}, 503
Logging Best Practices¶
Structured Logging¶
import structlog
logger = structlog.get_logger()
def process_request(request_id, tool_name, params):
logger.info(
"request_started",
request_id=request_id,
tool=tool_name,
params_size=len(str(params))
)
try:
result = execute_tool(tool_name, params)
logger.info(
"request_completed",
request_id=request_id,
tool=tool_name,
duration_ms=elapsed_time,
result_size=len(str(result))
)
return result
except Exception as e:
logger.error(
"request_failed",
request_id=request_id,
tool=tool_name,
error=str(e),
error_type=type(e).__name__
)
raise
Log Aggregation¶
# Fluentd configuration
<source>
@type tail
path /var/log/mcp-server/*.log
pos_file /var/log/td-agent/mcp-server.pos
tag mcp.server
<parse>
@type json
</parse>
</source>
<match mcp.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
logstash_format true
logstash_prefix mcp
<buffer>
@type file
path /var/log/td-agent/buffer/mcp
flush_interval 10s
</buffer>
</match>
Performance Management¶
Performance Baseline¶
# Establish baseline metrics
PERFORMANCE_BASELINE = {
'p50_latency_ms': 100,
'p95_latency_ms': 500,
'p99_latency_ms': 1000,
'error_rate_percent': 0.1,
'requests_per_second': 1000
}
def check_performance():
current_metrics = get_current_metrics()
for metric, baseline in PERFORMANCE_BASELINE.items():
current = current_metrics[metric]
if current > baseline * 1.5: # 50% degradation
alert(f"Performance degradation: {metric} = {current}")
Performance Tuning¶
# Connection pooling
from sqlalchemy import create_engine
engine = create_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600
)
# Caching strategy
from functools import lru_cache
import redis
redis_client = redis.Redis(decode_responses=True)
@lru_cache(maxsize=1000)
def get_cached_result(key):
# Memory cache first
result = redis_client.get(key)
if result:
return json.loads(result)
# Compute if not cached
result = compute_expensive_operation(key)
redis_client.setex(key, 3600, json.dumps(result))
return result
Capacity Planning¶
Resource Calculation¶
def calculate_capacity():
# Current metrics
avg_request_time = 100 # ms
peak_rps = 1000
cpu_per_request = 0.01 # cores
memory_per_connection = 10 # MB
# Required resources
required_cores = peak_rps * cpu_per_request * 1.5 # 50% buffer
required_memory = peak_rps * memory_per_connection * 1.5
# Instance sizing
instances_needed = math.ceil(required_cores / 4) # 4 cores per instance
return {
'cores_needed': required_cores,
'memory_needed_mb': required_memory,
'instances': instances_needed
}
Backup and Recovery¶
Backup Strategy¶
#!/bin/bash
# backup.sh
BACKUP_DIR="/backups/$(date +%Y%m%d)"
mkdir -p "$BACKUP_DIR"
# Database backup
pg_dump $DATABASE_URL > "$BACKUP_DIR/database.sql"
# Configuration backup
tar -czf "$BACKUP_DIR/config.tar.gz" /etc/mcp-server/
# Application state backup
redis-cli --rdb "$BACKUP_DIR/redis.rdb"
# Upload to S3
aws s3 sync "$BACKUP_DIR" "s3://backups/mcp-server/$(date +%Y%m%d)/"
# Cleanup old backups (keep 30 days)
find /backups -type d -mtime +30 -exec rm -rf {} +
Disaster Recovery¶
class DisasterRecovery:
def __init__(self):
self.backup_location = "s3://backups/mcp-server/"
self.recovery_point_objective = timedelta(hours=1)
self.recovery_time_objective = timedelta(minutes=30)
def initiate_recovery(self, target_time):
# Find nearest backup
backup = self.find_backup(target_time)
# Restore database
self.restore_database(backup['database'])
# Restore configuration
self.restore_config(backup['config'])
# Restore cache if needed
if self.should_restore_cache():
self.restore_cache(backup['cache'])
# Verify recovery
if self.verify_recovery():
logger.info("Recovery successful")
return True
else:
logger.error("Recovery failed")
return False
Maintenance Windows¶
Zero-Downtime Maintenance¶
def rolling_update(new_version):
instances = get_all_instances()
batch_size = max(1, len(instances) // 4) # Update 25% at a time
for batch in chunks(instances, batch_size):
# Remove from load balancer
for instance in batch:
remove_from_lb(instance)
# Wait for connections to drain
time.sleep(30)
# Update instances
for instance in batch:
update_instance(instance, new_version)
# Health check
for instance in batch:
wait_for_healthy(instance)
# Add back to load balancer
for instance in batch:
add_to_lb(instance)
# Monitor for issues
time.sleep(60)
if detect_issues():
rollback()
return False
return True
Troubleshooting¶
Common Issues and Solutions¶
High Latency¶
# Check slow queries
SELECT query, mean_time, calls
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10;
# Check connection pool
netstat -an | grep :8000 | wc -l
# Check CPU throttling
top -H -p $(pgrep mcp-server)
Memory Leaks¶
import tracemalloc
import gc
# Start tracing
tracemalloc.start()
# Take snapshot
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
# Force garbage collection
gc.collect()
Connection Errors¶
# Check network connectivity
ping -c 4 database.example.com
traceroute database.example.com
# Check DNS resolution
nslookup database.example.com
# Check firewall rules
iptables -L -n -v
Runbooks¶
Sample Runbook: High Error Rate¶
## Alert: High Error Rate
### Symptoms
- Error rate > 1%
- HTTP 5xx responses increasing
### Initial Response (5 min)
1. Check dashboards for error patterns
2. Review recent deployments
3. Check system resources
### Investigation (15 min)
1. Analyze error logs
2. Check database connectivity
3. Verify external dependencies
4. Review recent changes
### Mitigation
1. If deployment issue: Rollback
2. If resource issue: Scale up
3. If dependency issue: Enable circuit breaker
4. If database issue: Failover to replica
### Resolution
1. Fix root cause
2. Deploy fix
3. Monitor for recurrence
4. Update documentation
### Post-Incident
1. Write incident report
2. Update runbook if needed
3. Schedule post-mortem
Compliance and Auditing¶
Audit Logging¶
def audit_log(event_type, user, action, resource, result):
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'user': user,
'action': action,
'resource': resource,
'result': result,
'ip_address': request.remote_addr,
'user_agent': request.user_agent.string
}
# Write to audit log
audit_logger.info(json.dumps(audit_entry))
# Store in audit database
audit_db.insert(audit_entry)
Next Steps¶
- ๐ Monitoring Deep Dive
- ๐ Logging Strategy
- ๐ฏ Performance Optimization
- ๐ก๏ธ Scaling Operations
- ๐ Incident Response