Back
advanced
Production Agentic Systems

Building MCP Servers for Production

Build production-ready Model Context Protocol servers that connect AI models to real-world tools and data

30 min read· MCP· Production· Servers· Protocol

Building MCP Servers for Production

The Model Context Protocol (MCP) is an open standard that provides a universal way for AI models to connect to external tools, data sources, and services. Instead of building custom integrations for every model-tool combination, MCP gives you a single protocol that any compatible client can use.

Model Context Protocol (MCP): An open protocol, originally developed by Anthropic, that standardizes how AI applications connect to external data sources and tools. Think of it as a USB-C for AI integrations -- one standard interface that works everywhere.

MCP Architecture

┌─────────────────────────────────────────────────────────────┐
│                      MCP Host                               │
│                (Claude, VS Code, IDE)                        │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                   MCP Client                          │   │
│  │            (manages connections)                       │   │
│  └──────────┬──────────────┬──────────────┬─────────────┘   │
│             │              │              │                  │
└─────────────┼──────────────┼──────────────┼─────────────────┘
              │              │              │
    ┌─────────▼───┐  ┌──────▼──────┐  ┌───▼──────────┐
    │ MCP Server  │  │ MCP Server  │  │ MCP Server   │
    │ (Database)  │  │ (GitHub)    │  │ (Slack)      │
    └──────┬──────┘  └──────┬──────┘  └──────┬───────┘
           │                │                │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼───────┐
    │  PostgreSQL │  │  GitHub API │  │  Slack API   │
    └─────────────┘  └─────────────┘  └──────────────┘

MCP servers expose three core primitives:

  • Resources: Read-only data sources (files, database records, API responses)
  • Tools: Actions the model can invoke (create, update, delete, execute)
  • Prompts: Reusable prompt templates with parameters

Building a Custom MCP Server

Let's build a production MCP server that connects AI models to a PostgreSQL database. We will use the official

mcp
Python package.

Project Setup

bash
# Create project
mkdir mcp-database-server && cd mcp-database-server
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install mcp[cli] asyncpg pydantic python-dotenv

Server Foundation

python
# server.py
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import Any

import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Resource,
    Tool,
    TextContent,
    Prompt,
    PromptArgument,
    PromptMessage,
    GetPromptResult,
)
from pydantic import BaseModel

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-database")


class DatabaseConfig(BaseModel):
    """Database connection configuration."""
    host: str = "localhost"
    port: int = 5432
    database: str = "myapp"
    user: str = "postgres"
    password: str = ""
    min_connections: int = 2
    max_connections: int = 10


class DatabaseServer:
    """MCP server for PostgreSQL database access."""

    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.pool: asyncpg.Pool | None = None
        self.server = Server("database-server")
        self._register_handlers()

    async def connect(self):
        """Establish connection pool."""
        self.pool = await asyncpg.create_pool(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            user=self.config.user,
            password=self.config.password,
            min_size=self.config.min_connections,
            max_size=self.config.max_connections,
        )
        logger.info(f"Connected to {self.config.database}")

    async def disconnect(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()
            logger.info("Disconnected from database")

    def _register_handlers(self):
        """Register all MCP handlers."""
        self._register_resources()
        self._register_tools()
        self._register_prompts()

Implementing Resources

Resources expose read-only data. The model can reference them for context without executing any actions.

python
    def _register_resources(self):
        """Register resource handlers."""

        @self.server.list_resources()
        async def list_resources() -> list[Resource]:
            """List available database resources."""
            resources = []

            # List all tables as resources
            async with self.pool.acquire() as conn:
                tables = await conn.fetch(
                    """
                    SELECT table_name
                    FROM information_schema.tables
                    WHERE table_schema = 'public'
                    ORDER BY table_name
                    """
                )

            for table in tables:
                name = table["table_name"]
                resources.append(
                    Resource(
                        uri=f"db://tables/{name}/schema",
                        name=f"Table: {name} (schema)",
                        description=f"Schema definition for the {name} table",
                        mimeType="application/json",
                    )
                )

            # Add a summary resource
            resources.append(
                Resource(
                    uri="db://overview",
                    name="Database Overview",
                    description="Overview of all tables with row counts",
                    mimeType="application/json",
                )
            )

            return resources

        @self.server.read_resource()
        async def read_resource(uri: str) -> str:
            """Read a specific resource."""
            if uri == "db://overview":
                return await self._get_database_overview()

            if uri.startswith("db://tables/") and uri.endswith("/schema"):
                table_name = uri.split("/")[3]
                return await self._get_table_schema(table_name)

            raise ValueError(f"Unknown resource: {uri}")

    async def _get_database_overview(self) -> str:
        """Get overview of all tables."""
        async with self.pool.acquire() as conn:
            tables = await conn.fetch(
                """
                SELECT
                    t.table_name,
                    (SELECT count(*) FROM information_schema.columns c
                     WHERE c.table_name = t.table_name) as column_count
                FROM information_schema.tables t
                WHERE t.table_schema = 'public'
                ORDER BY t.table_name
                """
            )

        overview = {
            "database": self.config.database,
            "tables": [
                {"name": t["table_name"], "columns": t["column_count"]}
                for t in tables
            ],
        }
        return json.dumps(overview, indent=2)

    async def _get_table_schema(self, table_name: str) -> str:
        """Get schema for a specific table."""
        async with self.pool.acquire() as conn:
            columns = await conn.fetch(
                """
                SELECT column_name, data_type, is_nullable, column_default
                FROM information_schema.columns
                WHERE table_name = $1 AND table_schema = 'public'
                ORDER BY ordinal_position
                """,
                table_name,
            )

        schema = {
            "table": table_name,
            "columns": [
                {
                    "name": c["column_name"],
                    "type": c["data_type"],
                    "nullable": c["is_nullable"] == "YES",
                    "default": c["column_default"],
                }
                for c in columns
            ],
        }
        return json.dumps(schema, indent=2)

Implementing Tools

Tools let the model execute actions. This is where you define what the AI can do -- and critically, what guardrails apply.

python
    def _register_tools(self):
        """Register tool handlers."""

        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            return [
                Tool(
                    name="query",
                    description=(
                        "Execute a read-only SQL query against the database. "
                        "Only SELECT statements are allowed."
                    ),
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "sql": {
                                "type": "string",
                                "description": "The SELECT SQL query to execute",
                            },
                            "params": {
                                "type": "array",
                                "items": {"type": "string"},
                                "description": "Query parameters for $1, $2, etc.",
                                "default": [],
                            },
                        },
                        "required": ["sql"],
                    },
                ),
                Tool(
                    name="insert_record",
                    description="Insert a new record into a specified table.",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "table": {
                                "type": "string",
                                "description": "Target table name",
                            },
                            "data": {
                                "type": "object",
                                "description": "Column-value pairs to insert",
                            },
                        },
                        "required": ["table", "data"],
                    },
                ),
                Tool(
                    name="describe_table",
                    description="Get detailed information about a table including indexes and constraints.",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "table": {
                                "type": "string",
                                "description": "Table name to describe",
                            },
                        },
                        "required": ["table"],
                    },
                ),
            ]

        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
            try:
                if name == "query":
                    result = await self._execute_query(
                        arguments["sql"],
                        arguments.get("params", []),
                    )
                elif name == "insert_record":
                    result = await self._insert_record(
                        arguments["table"],
                        arguments["data"],
                    )
                elif name == "describe_table":
                    result = await self._describe_table(arguments["table"])
                else:
                    result = f"Unknown tool: {name}"

                return [TextContent(type="text", text=result)]

            except Exception as e:
                logger.error(f"Tool error ({name}): {e}")
                return [TextContent(type="text", text=f"Error: {str(e)}")]

SQL Injection Prevention: Always validate and sanitize SQL input. The

_execute_query
method below rejects anything that is not a SELECT statement. In production, consider using a query allowlist or an AST-based SQL parser for deeper validation.

python
    async def _execute_query(self, sql: str, params: list) -> str:
        """Execute a read-only SQL query with safety checks."""
        # Safety: only allow SELECT statements
        normalized = sql.strip().upper()
        if not normalized.startswith("SELECT"):
            raise ValueError("Only SELECT queries are allowed for safety.")

        # Block dangerous keywords even in subqueries
        dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "TRUNCATE", "EXEC"]
        for keyword in dangerous:
            if keyword in normalized:
                raise ValueError(f"Query contains disallowed keyword: {keyword}")

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql, *params)

        if not rows:
            return "Query returned 0 rows."

        # Format results as a readable table
        columns = list(rows[0].keys())
        result_data = [dict(row) for row in rows[:100]]  # Limit to 100 rows

        return json.dumps(
            {"columns": columns, "rows": result_data, "count": len(rows)},
            indent=2,
            default=str,  # Handle datetime, UUID, etc.
        )

    async def _insert_record(self, table: str, data: dict) -> str:
        """Insert a record with validation."""
        # Validate table name (prevent injection)
        if not table.isidentifier():
            raise ValueError(f"Invalid table name: {table}")

        columns = list(data.keys())
        placeholders = [f"${i+1}" for i in range(len(columns))]
        values = list(data.values())

        sql = f'INSERT INTO "{table}" ({", ".join(columns)}) VALUES ({", ".join(placeholders)}) RETURNING *'

        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(sql, *values)

        return json.dumps({"inserted": dict(row)}, indent=2, default=str)

    async def _describe_table(self, table: str) -> str:
        """Get detailed table description including indexes."""
        if not table.isidentifier():
            raise ValueError(f"Invalid table name: {table}")

        async with self.pool.acquire() as conn:
            # Get columns
            columns = await conn.fetch(
                "SELECT * FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position",
                table,
            )
            # Get indexes
            indexes = await conn.fetch(
                "SELECT indexname, indexdef FROM pg_indexes WHERE tablename = $1",
                table,
            )

        return json.dumps(
            {
                "table": table,
                "columns": [
                    {"name": c["column_name"], "type": c["data_type"], "nullable": c["is_nullable"]}
                    for c in columns
                ],
                "indexes": [
                    {"name": i["indexname"], "definition": i["indexdef"]}
                    for i in indexes
                ],
            },
            indent=2,
        )

Implementing Prompts

Prompts are reusable templates the model can invoke with parameters. They standardize common queries.

python
    def _register_prompts(self):
        """Register prompt templates."""

        @self.server.list_prompts()
        async def list_prompts() -> list[Prompt]:
            return [
                Prompt(
                    name="analyze_table",
                    description="Generate an analysis prompt for a database table",
                    arguments=[
                        PromptArgument(
                            name="table_name",
                            description="Name of the table to analyze",
                            required=True,
                        ),
                        PromptArgument(
                            name="focus",
                            description="Analysis focus (e.g., 'data quality', 'performance')",
                            required=False,
                        ),
                    ],
                ),
                Prompt(
                    name="generate_report_query",
                    description="Generate a SQL query for a business report",
                    arguments=[
                        PromptArgument(
                            name="report_type",
                            description="Type of report (e.g., 'daily_summary', 'user_growth')",
                            required=True,
                        ),
                    ],
                ),
            ]

        @self.server.get_prompt()
        async def get_prompt(name: str, arguments: dict[str, str] | None) -> GetPromptResult:
            if name == "analyze_table":
                table_name = arguments.get("table_name", "unknown")
                focus = arguments.get("focus", "general overview")
                schema = await self._get_table_schema(table_name)

                return GetPromptResult(
                    description=f"Analysis prompt for table: {table_name}",
                    messages=[
                        PromptMessage(
                            role="user",
                            content=TextContent(
                                type="text",
                                text=(
                                    f"Analyze the '{table_name}' table with focus on: {focus}\n\n"
                                    f"Table Schema:\n{schema}\n\n"
                                    f"Use the 'query' tool to examine the data. Provide insights on "
                                    f"data distribution, anomalies, and recommendations."
                                ),
                            ),
                        )
                    ],
                )

            raise ValueError(f"Unknown prompt: {name}")

Running the Server

python
# Entry point at the bottom of server.py
async def main():
    """Run the MCP server."""
    import os
    from dotenv import load_dotenv

    load_dotenv()

    config = DatabaseConfig(
        host=os.getenv("DB_HOST", "localhost"),
        port=int(os.getenv("DB_PORT", "5432")),
        database=os.getenv("DB_NAME", "myapp"),
        user=os.getenv("DB_USER", "postgres"),
        password=os.getenv("DB_PASSWORD", ""),
    )

    db_server = DatabaseServer(config)
    await db_server.connect()

    try:
        async with stdio_server() as (read_stream, write_stream):
            await db_server.server.run(
                read_stream,
                write_stream,
                db_server.server.create_initialization_options(),
            )
    finally:
        await db_server.disconnect()


if __name__ == "__main__":
    asyncio.run(main())

Testing MCP Servers

Use the MCP Inspector for interactive testing, and write automated tests with pytest:

bash
# Interactive testing with the MCP Inspector
mcp dev server.py
python
# test_server.py
import pytest
import asyncio
import json

@pytest.fixture
async def db_server():
    """Create a test database server."""
    config = DatabaseConfig(
        host="localhost",
        database="test_db",
        user="test_user",
        password="test_pass",
    )
    server = DatabaseServer(config)
    await server.connect()
    yield server
    await server.disconnect()


@pytest.mark.asyncio
async def test_query_rejects_non_select(db_server):
    """Verify that non-SELECT queries are rejected."""
    with pytest.raises(ValueError, match="Only SELECT"):
        await db_server._execute_query("DROP TABLE users", [])


@pytest.mark.asyncio
async def test_query_blocks_dangerous_keywords(db_server):
    """Verify dangerous keywords are blocked even in subqueries."""
    with pytest.raises(ValueError, match="disallowed keyword"):
        await db_server._execute_query(
            "SELECT * FROM users; DELETE FROM users", []
        )


@pytest.mark.asyncio
async def test_insert_validates_table_name(db_server):
    """Verify table name injection is prevented."""
    with pytest.raises(ValueError, match="Invalid table name"):
        await db_server._insert_record(
            "users; DROP TABLE users--",
            {"name": "attacker"},
        )


@pytest.mark.asyncio
async def test_describe_table_returns_schema(db_server):
    """Verify table description returns valid JSON."""
    result = await db_server._describe_table("users")
    data = json.loads(result)
    assert "columns" in data
    assert "indexes" in data

Authentication and Security

Production MCP servers need robust security layers:

python
import hmac
import hashlib
import time
from functools import wraps

class AuthenticatedServer(DatabaseServer):
    """MCP server with authentication middleware."""

    def __init__(self, config: DatabaseConfig, api_keys: dict[str, str]):
        super().__init__(config)
        self.api_keys = api_keys  # {key_id: secret}
        self.rate_limits: dict[str, list[float]] = {}  # {key_id: [timestamps]}

    def verify_request(self, key_id: str, signature: str, body: str) -> bool:
        """Verify HMAC-signed request."""
        if key_id not in self.api_keys:
            return False

        secret = self.api_keys[key_id]
        expected = hmac.new(
            secret.encode(), body.encode(), hashlib.sha256
        ).hexdigest()

        return hmac.compare_digest(signature, expected)

    def check_rate_limit(self, key_id: str, max_per_minute: int = 60) -> bool:
        """Simple sliding-window rate limiter."""
        now = time.time()
        window = now - 60

        if key_id not in self.rate_limits:
            self.rate_limits[key_id] = []

        # Remove old entries
        self.rate_limits[key_id] = [
            t for t in self.rate_limits[key_id] if t > window
        ]

        if len(self.rate_limits[key_id]) >= max_per_minute:
            return False

        self.rate_limits[key_id].append(now)
        return True

Principle of Least Privilege: Your MCP server should only expose the minimum necessary capabilities. A database MCP server for analytics should not allow INSERT or DELETE operations. Segment servers by access level.

Deployment Strategies

Docker Deployment

dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# MCP servers communicate via stdio
CMD ["python", "server.py"]

Configuration for Claude Desktop

json
{
  "mcpServers": {
    "database": {
      "command": "docker",
      "args": [
        "run", "-i", "--rm",
        "-e", "DB_HOST=host.docker.internal",
        "-e", "DB_NAME=myapp",
        "-e", "DB_USER=readonly_user",
        "-e", "DB_PASSWORD=${DB_PASSWORD}",
        "my-mcp-database-server"
      ]
    }
  }
}

Remote Deployment with SSE Transport

For shared, remote MCP servers, use the SSE (Server-Sent Events) transport:

python
# remote_server.py
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount

transport = SseServerTransport("/messages/")

async def handle_sse(request):
    async with transport.connect_sse(
        request.scope, request.receive, request._send
    ) as streams:
        await db_server.server.run(
            streams[0], streams[1],
            db_server.server.create_initialization_options(),
        )

app = Starlette(routes=[
    Route("/sse", endpoint=handle_sse),
    Mount("/messages/", app=transport.handle_post_message),
])

# Run with: uvicorn remote_server:app --host 0.0.0.0 --port 8000

Key Takeaways

  1. MCP standardizes how AI models connect to tools and data -- build once, use everywhere
  2. Three primitives: Resources (read data), Tools (execute actions), Prompts (reusable templates)
  3. Security is paramount: Validate all inputs, restrict operations, use authentication, and apply rate limiting
  4. Test thoroughly with both the MCP Inspector for interactive testing and pytest for automated validation
  5. Deploy flexibly: Use stdio for local desktop integration or SSE transport for shared remote servers

Quiz