Python Best Practices¶
Python-Specific Best Practices for MCP Servers¶
Following Python best practices ensures your MCP server is maintainable, performant, and follows community standards.
Code Style and Formatting¶
Use Black and Ruff¶
# Install formatting tools
uv add --dev black ruff
# Format code
black src tests
# Lint and fix issues
ruff check --fix src tests
Configuration¶
# pyproject.toml
[tool.black]
line-length = 88
target-version = ['py311']
include = '\.pyi?$'
extend-exclude = '''
/(
\.eggs
| \.git
| \.venv
| build
| dist
)/
'''
[tool.ruff]
target-version = "py311"
line-length = 88
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long (handled by black)
"B008", # do not perform function calls in argument defaults
]
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"]
"tests/**/*.py" = ["S101"] # assert allowed in tests
Type Hints and MyPy¶
Comprehensive Type Hints¶
from typing import Any, Dict, List, Optional, Union
from mcp.types import Tool, TextContent
import asyncio
class DatabaseTool:
def __init__(self, connection_string: str) -> None:
self.connection_string = connection_string
self._connection: Optional[Any] = None
async def execute_query(
self,
query: str,
params: Optional[Dict[str, Any]] = None
) -> List[TextContent]:
"""Execute database query with type safety."""
results = await self._execute(query, params or {})
return [TextContent(type="text", text=str(result)) for result in results]
async def _execute(
self,
query: str,
params: Dict[str, Any]
) -> List[Dict[str, Any]]:
# Implementation details
return []
MyPy Configuration¶
# pyproject.toml
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
show_error_codes = true
[[tool.mypy.overrides]]
module = "tests.*"
ignore_errors = true
Error Handling¶
Custom Exception Hierarchy¶
# src/my_mcp_server/exceptions.py
class MCPServerError(Exception):
"""Base exception for MCP server errors."""
pass
class ValidationError(MCPServerError):
"""Raised when input validation fails."""
def __init__(self, message: str, field: Optional[str] = None):
super().__init__(message)
self.field = field
class ExternalServiceError(MCPServerError):
"""Raised when external service calls fail."""
def __init__(self, message: str, service: str, status_code: Optional[int] = None):
super().__init__(message)
self.service = service
self.status_code = status_code
class ConfigurationError(MCPServerError):
"""Raised when configuration is invalid."""
pass
Error Handling Patterns¶
import logging
from typing import NoReturn
logger = logging.getLogger(__name__)
async def safe_tool_execution(
tool_name: str,
arguments: Dict[str, Any]
) -> List[TextContent]:
"""Execute tool with comprehensive error handling."""
try:
# Validate inputs first
validated_args = validate_tool_arguments(tool_name, arguments)
# Execute with timeout
result = await asyncio.wait_for(
execute_tool_impl(tool_name, validated_args),
timeout=30.0
)
return result
except ValidationError as e:
logger.warning(f"Validation failed for {tool_name}: {e}")
raise # Re-raise for proper MCP error response
except ExternalServiceError as e:
logger.error(f"External service error in {tool_name}: {e}")
# Convert to user-friendly message
raise MCPServerError(f"Service temporarily unavailable: {e.service}")
except asyncio.TimeoutError:
logger.error(f"Timeout executing {tool_name}")
raise MCPServerError("Operation timed out")
except Exception as e:
logger.exception(f"Unexpected error in {tool_name}")
raise MCPServerError("Internal server error")
def validate_tool_arguments(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize tool arguments."""
if not isinstance(arguments, dict):
raise ValidationError("Arguments must be a dictionary")
# Tool-specific validation
validators = {
"database_query": validate_database_query,
"file_operation": validate_file_operation,
}
validator = validators.get(tool_name)
if validator:
return validator(arguments)
return arguments
Async Programming¶
Proper Async Patterns¶
import asyncio
import aiohttp
import aiofiles
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class AsyncResourceManager:
"""Manage async resources properly."""
def __init__(self):
self._session: Optional[aiohttp.ClientSession] = None
self._db_pool: Optional[Any] = None
async def __aenter__(self):
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.cleanup()
async def initialize(self):
"""Initialize async resources."""
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
# Initialize database pool
async def cleanup(self):
"""Clean up async resources."""
if self._session:
await self._session.close()
# Clean up database pool
@asynccontextmanager
async def get_file_content(file_path: str) -> AsyncGenerator[str, None]:
"""Async context manager for file operations."""
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
yield content
# Concurrent operations
async def process_multiple_requests(requests: List[Dict[str, Any]]) -> List[Any]:
"""Process multiple requests concurrently."""
semaphore = asyncio.Semaphore(10) # Limit concurrent operations
async def process_single(request: Dict[str, Any]) -> Any:
async with semaphore:
return await process_request(request)
# Use gather for concurrent execution
results = await asyncio.gather(
*[process_single(req) for req in requests],
return_exceptions=True # Handle exceptions gracefully
)
return results
Configuration Management¶
Pydantic Settings¶
# src/my_mcp_server/config.py
from pydantic import BaseSettings, Field, validator
from typing import Optional, List
import os
class Settings(BaseSettings):
"""Application settings with validation."""
# Server settings
server_name: str = Field(default="my-mcp-server", description="Server name")
debug: bool = Field(default=False, description="Debug mode")
log_level: str = Field(default="INFO", description="Log level")
# Database settings
database_url: Optional[str] = Field(None, description="Database connection URL")
database_pool_size: int = Field(default=10, ge=1, le=50)
database_timeout: int = Field(default=30, ge=1, le=300)
# API settings
api_key: Optional[str] = Field(None, description="External API key")
api_base_url: str = Field(default="https://api.example.com")
api_timeout: int = Field(default=30, ge=1, le=300)
# Feature flags
enable_caching: bool = Field(default=True)
enable_metrics: bool = Field(default=False)
allowed_operations: List[str] = Field(default_factory=lambda: ["read", "write"])
@validator('log_level')
def validate_log_level(cls, v):
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if v.upper() not in valid_levels:
raise ValueError(f'Invalid log level. Must be one of: {valid_levels}')
return v.upper()
@validator('database_url')
def validate_database_url(cls, v):
if v and not v.startswith(('postgresql://', 'sqlite:///', 'mysql://')):
raise ValueError('Invalid database URL scheme')
return v
class Config:
env_prefix = 'MCP_'
env_file = '.env'
case_sensitive = False
# Global settings instance
settings = Settings()
Logging¶
Structured Logging¶
# src/my_mcp_server/logging_config.py
import logging
import sys
from typing import Any, Dict
import json
from datetime import datetime
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_data: Dict[str, Any] = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add exception info if present
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'extra_fields'):
log_data.update(record.extra_fields)
return json.dumps(log_data, separators=(',', ':'))
def setup_logging(level: str = 'INFO') -> None:
"""Configure application logging."""
root_logger = logging.getLogger()
root_logger.setLevel(level)
# Remove existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Console handler with structured format
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(StructuredFormatter())
root_logger.addHandler(handler)
# Suppress noisy third-party loggers
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.WARNING)
# Usage
logger = logging.getLogger(__name__)
async def example_with_logging():
"""Example function with structured logging."""
logger.info(
"Processing request",
extra={'extra_fields': {
'tool_name': 'database_query',
'user_id': 'user123',
'request_id': 'req-456'
}}
)
Testing Best Practices¶
Fixtures and Test Organization¶
# tests/conftest.py
import pytest
import asyncio
from unittest.mock import AsyncMock
from my_mcp_server.config import Settings
from my_mcp_server.server import MCPServer
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def test_settings():
"""Test configuration settings."""
return Settings(
server_name="test-server",
debug=True,
database_url="sqlite:///:memory:",
api_key="test-key"
)
@pytest.fixture
async def mock_external_service():
"""Mock external service dependencies."""
mock = AsyncMock()
mock.get_data.return_value = {"test": "data"}
return mock
@pytest.fixture
async def test_server(test_settings, mock_external_service):
"""Create test server instance."""
server = MCPServer(test_settings.server_name)
# Inject mocked dependencies
server._external_service = mock_external_service
return server
Test Categories¶
# tests/test_tools.py
import pytest
from my_mcp_server.tools import DatabaseTool
class TestDatabaseTool:
"""Test database tool functionality."""
@pytest.mark.asyncio
async def test_valid_query(self, test_server):
"""Test successful query execution."""
result = await test_server.execute_query("SELECT 1")
assert len(result) > 0
@pytest.mark.asyncio
async def test_invalid_query_raises_error(self, test_server):
"""Test error handling for invalid queries."""
with pytest.raises(ValidationError):
await test_server.execute_query("INVALID SQL")
@pytest.mark.parametrize("query,expected_error", [
("", "Query cannot be empty"),
("DROP TABLE users", "DROP statements not allowed"),
("SELECT * FROM users; DELETE FROM users", "Multiple statements not allowed"),
])
@pytest.mark.asyncio
async def test_query_validation(self, test_server, query, expected_error):
"""Test various query validation scenarios."""
with pytest.raises(ValidationError, match=expected_error):
await test_server.execute_query(query)
Performance Optimization¶
Caching¶
from functools import lru_cache
from typing import Dict, Any
import asyncio
import time
# Simple in-memory cache
class TTLCache:
"""Time-to-live cache implementation."""
def __init__(self, ttl: int = 300):
self.ttl = ttl
self._cache: Dict[str, tuple] = {}
def get(self, key: str) -> Any:
if key in self._cache:
value, timestamp = self._cache[key]
if time.time() - timestamp < self.ttl:
return value
else:
del self._cache[key]
return None
def set(self, key: str, value: Any) -> None:
self._cache[key] = (value, time.time())
def clear(self) -> None:
self._cache.clear()
# Usage with async functions
cache = TTLCache(ttl=300) # 5 minutes
async def cached_api_call(endpoint: str) -> Dict[str, Any]:
"""API call with caching."""
cached_result = cache.get(endpoint)
if cached_result is not None:
return cached_result
result = await make_api_call(endpoint)
cache.set(endpoint, result)
return result
Connection Pooling¶
import aiohttp
from contextlib import asynccontextmanager
class ConnectionManager:
"""Manage HTTP connection pools."""
def __init__(self):
self._session: Optional[aiohttp.ClientSession] = None
async def get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Per-host connection limit
keepalive_timeout=30,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_read=10
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
# Global connection manager
connection_manager = ConnectionManager()
Security Best Practices¶
Input Sanitization¶
import re
from pathlib import Path
from typing import Any
def sanitize_file_path(file_path: str, allowed_dirs: List[str]) -> Path:
"""Sanitize file path to prevent directory traversal."""
# Remove any path traversal attempts
clean_path = re.sub(r'\.\./+', '', file_path)
# Convert to absolute path
abs_path = Path(clean_path).resolve()
# Check if path is within allowed directories
for allowed_dir in allowed_dirs:
allowed_path = Path(allowed_dir).resolve()
try:
abs_path.relative_to(allowed_path)
return abs_path
except ValueError:
continue
raise ValidationError(f"File path not allowed: {file_path}")
def sanitize_sql_identifier(identifier: str) -> str:
"""Sanitize SQL identifier to prevent injection."""
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
raise ValidationError(f"Invalid SQL identifier: {identifier}")
return identifier
Environment Variable Handling¶
# Never log sensitive data
def safe_log_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""Log configuration with sensitive data masked."""
sensitive_keys = {'password', 'token', 'key', 'secret'}
safe_config = {}
for key, value in config.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
safe_config[key] = "***MASKED***"
else:
safe_config[key] = value
return safe_config
Following these Python-specific best practices will help you create robust, maintainable, and secure MCP servers that integrate well with the Python ecosystem.