Building MCP Servers for Production
The Model Context Protocol (MCP) is an open standard that provides a universal way for AI models to connect to external tools, data sources, and services. Instead of building custom integrations for every model-tool combination, MCP gives you a single protocol that any compatible client can use.
Model Context Protocol (MCP): An open protocol, originally developed by Anthropic, that standardizes how AI applications connect to external data sources and tools. Think of it as a USB-C for AI integrations -- one standard interface that works everywhere.
MCP Architecture
┌─────────────────────────────────────────────────────────────┐
│ MCP Host │
│ (Claude, VS Code, IDE) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ MCP Client │ │
│ │ (manages connections) │ │
│ └──────────┬──────────────┬──────────────┬─────────────┘ │
│ │ │ │ │
└─────────────┼──────────────┼──────────────┼─────────────────┘
│ │ │
┌─────────▼───┐ ┌──────▼──────┐ ┌───▼──────────┐
│ MCP Server │ │ MCP Server │ │ MCP Server │
│ (Database) │ │ (GitHub) │ │ (Slack) │
└──────┬──────┘ └──────┬──────┘ └──────┬───────┘
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼───────┐
│ PostgreSQL │ │ GitHub API │ │ Slack API │
└─────────────┘ └─────────────┘ └──────────────┘
MCP servers expose three core primitives:
- Resources: Read-only data sources (files, database records, API responses)
- Tools: Actions the model can invoke (create, update, delete, execute)
- Prompts: Reusable prompt templates with parameters
Building a Custom MCP Server
Let's build a production MCP server that connects AI models to a PostgreSQL database. We will use the official
mcpProject Setup
# Create project
mkdir mcp-database-server && cd mcp-database-server
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install mcp[cli] asyncpg pydantic python-dotenv
Server Foundation
# server.py
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import Any
import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
Prompt,
PromptArgument,
PromptMessage,
GetPromptResult,
)
from pydantic import BaseModel
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-database")
class DatabaseConfig(BaseModel):
"""Database connection configuration."""
host: str = "localhost"
port: int = 5432
database: str = "myapp"
user: str = "postgres"
password: str = ""
min_connections: int = 2
max_connections: int = 10
class DatabaseServer:
"""MCP server for PostgreSQL database access."""
def __init__(self, config: DatabaseConfig):
self.config = config
self.pool: asyncpg.Pool | None = None
self.server = Server("database-server")
self._register_handlers()
async def connect(self):
"""Establish connection pool."""
self.pool = await asyncpg.create_pool(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.user,
password=self.config.password,
min_size=self.config.min_connections,
max_size=self.config.max_connections,
)
logger.info(f"Connected to {self.config.database}")
async def disconnect(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
logger.info("Disconnected from database")
def _register_handlers(self):
"""Register all MCP handlers."""
self._register_resources()
self._register_tools()
self._register_prompts()
Implementing Resources
Resources expose read-only data. The model can reference them for context without executing any actions.
def _register_resources(self):
"""Register resource handlers."""
@self.server.list_resources()
async def list_resources() -> list[Resource]:
"""List available database resources."""
resources = []
# List all tables as resources
async with self.pool.acquire() as conn:
tables = await conn.fetch(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name
"""
)
for table in tables:
name = table["table_name"]
resources.append(
Resource(
uri=f"db://tables/{name}/schema",
name=f"Table: {name} (schema)",
description=f"Schema definition for the {name} table",
mimeType="application/json",
)
)
# Add a summary resource
resources.append(
Resource(
uri="db://overview",
name="Database Overview",
description="Overview of all tables with row counts",
mimeType="application/json",
)
)
return resources
@self.server.read_resource()
async def read_resource(uri: str) -> str:
"""Read a specific resource."""
if uri == "db://overview":
return await self._get_database_overview()
if uri.startswith("db://tables/") and uri.endswith("/schema"):
table_name = uri.split("/")[3]
return await self._get_table_schema(table_name)
raise ValueError(f"Unknown resource: {uri}")
async def _get_database_overview(self) -> str:
"""Get overview of all tables."""
async with self.pool.acquire() as conn:
tables = await conn.fetch(
"""
SELECT
t.table_name,
(SELECT count(*) FROM information_schema.columns c
WHERE c.table_name = t.table_name) as column_count
FROM information_schema.tables t
WHERE t.table_schema = 'public'
ORDER BY t.table_name
"""
)
overview = {
"database": self.config.database,
"tables": [
{"name": t["table_name"], "columns": t["column_count"]}
for t in tables
],
}
return json.dumps(overview, indent=2)
async def _get_table_schema(self, table_name: str) -> str:
"""Get schema for a specific table."""
async with self.pool.acquire() as conn:
columns = await conn.fetch(
"""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1 AND table_schema = 'public'
ORDER BY ordinal_position
""",
table_name,
)
schema = {
"table": table_name,
"columns": [
{
"name": c["column_name"],
"type": c["data_type"],
"nullable": c["is_nullable"] == "YES",
"default": c["column_default"],
}
for c in columns
],
}
return json.dumps(schema, indent=2)
Implementing Tools
Tools let the model execute actions. This is where you define what the AI can do -- and critically, what guardrails apply.
def _register_tools(self):
"""Register tool handlers."""
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description=(
"Execute a read-only SQL query against the database. "
"Only SELECT statements are allowed."
),
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The SELECT SQL query to execute",
},
"params": {
"type": "array",
"items": {"type": "string"},
"description": "Query parameters for $1, $2, etc.",
"default": [],
},
},
"required": ["sql"],
},
),
Tool(
name="insert_record",
description="Insert a new record into a specified table.",
inputSchema={
"type": "object",
"properties": {
"table": {
"type": "string",
"description": "Target table name",
},
"data": {
"type": "object",
"description": "Column-value pairs to insert",
},
},
"required": ["table", "data"],
},
),
Tool(
name="describe_table",
description="Get detailed information about a table including indexes and constraints.",
inputSchema={
"type": "object",
"properties": {
"table": {
"type": "string",
"description": "Table name to describe",
},
},
"required": ["table"],
},
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
try:
if name == "query":
result = await self._execute_query(
arguments["sql"],
arguments.get("params", []),
)
elif name == "insert_record":
result = await self._insert_record(
arguments["table"],
arguments["data"],
)
elif name == "describe_table":
result = await self._describe_table(arguments["table"])
else:
result = f"Unknown tool: {name}"
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error(f"Tool error ({name}): {e}")
return [TextContent(type="text", text=f"Error: {str(e)}")]
SQL Injection Prevention: Always validate and sanitize SQL input. The
_execute_query async def _execute_query(self, sql: str, params: list) -> str:
"""Execute a read-only SQL query with safety checks."""
# Safety: only allow SELECT statements
normalized = sql.strip().upper()
if not normalized.startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed for safety.")
# Block dangerous keywords even in subqueries
dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "TRUNCATE", "EXEC"]
for keyword in dangerous:
if keyword in normalized:
raise ValueError(f"Query contains disallowed keyword: {keyword}")
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
if not rows:
return "Query returned 0 rows."
# Format results as a readable table
columns = list(rows[0].keys())
result_data = [dict(row) for row in rows[:100]] # Limit to 100 rows
return json.dumps(
{"columns": columns, "rows": result_data, "count": len(rows)},
indent=2,
default=str, # Handle datetime, UUID, etc.
)
async def _insert_record(self, table: str, data: dict) -> str:
"""Insert a record with validation."""
# Validate table name (prevent injection)
if not table.isidentifier():
raise ValueError(f"Invalid table name: {table}")
columns = list(data.keys())
placeholders = [f"${i+1}" for i in range(len(columns))]
values = list(data.values())
sql = f'INSERT INTO "{table}" ({", ".join(columns)}) VALUES ({", ".join(placeholders)}) RETURNING *'
async with self.pool.acquire() as conn:
row = await conn.fetchrow(sql, *values)
return json.dumps({"inserted": dict(row)}, indent=2, default=str)
async def _describe_table(self, table: str) -> str:
"""Get detailed table description including indexes."""
if not table.isidentifier():
raise ValueError(f"Invalid table name: {table}")
async with self.pool.acquire() as conn:
# Get columns
columns = await conn.fetch(
"SELECT * FROM information_schema.columns WHERE table_name = $1 ORDER BY ordinal_position",
table,
)
# Get indexes
indexes = await conn.fetch(
"SELECT indexname, indexdef FROM pg_indexes WHERE tablename = $1",
table,
)
return json.dumps(
{
"table": table,
"columns": [
{"name": c["column_name"], "type": c["data_type"], "nullable": c["is_nullable"]}
for c in columns
],
"indexes": [
{"name": i["indexname"], "definition": i["indexdef"]}
for i in indexes
],
},
indent=2,
)
Implementing Prompts
Prompts are reusable templates the model can invoke with parameters. They standardize common queries.
def _register_prompts(self):
"""Register prompt templates."""
@self.server.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="analyze_table",
description="Generate an analysis prompt for a database table",
arguments=[
PromptArgument(
name="table_name",
description="Name of the table to analyze",
required=True,
),
PromptArgument(
name="focus",
description="Analysis focus (e.g., 'data quality', 'performance')",
required=False,
),
],
),
Prompt(
name="generate_report_query",
description="Generate a SQL query for a business report",
arguments=[
PromptArgument(
name="report_type",
description="Type of report (e.g., 'daily_summary', 'user_growth')",
required=True,
),
],
),
]
@self.server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str] | None) -> GetPromptResult:
if name == "analyze_table":
table_name = arguments.get("table_name", "unknown")
focus = arguments.get("focus", "general overview")
schema = await self._get_table_schema(table_name)
return GetPromptResult(
description=f"Analysis prompt for table: {table_name}",
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=(
f"Analyze the '{table_name}' table with focus on: {focus}\n\n"
f"Table Schema:\n{schema}\n\n"
f"Use the 'query' tool to examine the data. Provide insights on "
f"data distribution, anomalies, and recommendations."
),
),
)
],
)
raise ValueError(f"Unknown prompt: {name}")
Running the Server
# Entry point at the bottom of server.py
async def main():
"""Run the MCP server."""
import os
from dotenv import load_dotenv
load_dotenv()
config = DatabaseConfig(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "myapp"),
user=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASSWORD", ""),
)
db_server = DatabaseServer(config)
await db_server.connect()
try:
async with stdio_server() as (read_stream, write_stream):
await db_server.server.run(
read_stream,
write_stream,
db_server.server.create_initialization_options(),
)
finally:
await db_server.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Testing MCP Servers
Use the MCP Inspector for interactive testing, and write automated tests with pytest:
# Interactive testing with the MCP Inspector
mcp dev server.py
# test_server.py
import pytest
import asyncio
import json
@pytest.fixture
async def db_server():
"""Create a test database server."""
config = DatabaseConfig(
host="localhost",
database="test_db",
user="test_user",
password="test_pass",
)
server = DatabaseServer(config)
await server.connect()
yield server
await server.disconnect()
@pytest.mark.asyncio
async def test_query_rejects_non_select(db_server):
"""Verify that non-SELECT queries are rejected."""
with pytest.raises(ValueError, match="Only SELECT"):
await db_server._execute_query("DROP TABLE users", [])
@pytest.mark.asyncio
async def test_query_blocks_dangerous_keywords(db_server):
"""Verify dangerous keywords are blocked even in subqueries."""
with pytest.raises(ValueError, match="disallowed keyword"):
await db_server._execute_query(
"SELECT * FROM users; DELETE FROM users", []
)
@pytest.mark.asyncio
async def test_insert_validates_table_name(db_server):
"""Verify table name injection is prevented."""
with pytest.raises(ValueError, match="Invalid table name"):
await db_server._insert_record(
"users; DROP TABLE users--",
{"name": "attacker"},
)
@pytest.mark.asyncio
async def test_describe_table_returns_schema(db_server):
"""Verify table description returns valid JSON."""
result = await db_server._describe_table("users")
data = json.loads(result)
assert "columns" in data
assert "indexes" in data
Authentication and Security
Production MCP servers need robust security layers:
import hmac
import hashlib
import time
from functools import wraps
class AuthenticatedServer(DatabaseServer):
"""MCP server with authentication middleware."""
def __init__(self, config: DatabaseConfig, api_keys: dict[str, str]):
super().__init__(config)
self.api_keys = api_keys # {key_id: secret}
self.rate_limits: dict[str, list[float]] = {} # {key_id: [timestamps]}
def verify_request(self, key_id: str, signature: str, body: str) -> bool:
"""Verify HMAC-signed request."""
if key_id not in self.api_keys:
return False
secret = self.api_keys[key_id]
expected = hmac.new(
secret.encode(), body.encode(), hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
def check_rate_limit(self, key_id: str, max_per_minute: int = 60) -> bool:
"""Simple sliding-window rate limiter."""
now = time.time()
window = now - 60
if key_id not in self.rate_limits:
self.rate_limits[key_id] = []
# Remove old entries
self.rate_limits[key_id] = [
t for t in self.rate_limits[key_id] if t > window
]
if len(self.rate_limits[key_id]) >= max_per_minute:
return False
self.rate_limits[key_id].append(now)
return True
Principle of Least Privilege: Your MCP server should only expose the minimum necessary capabilities. A database MCP server for analytics should not allow INSERT or DELETE operations. Segment servers by access level.
Deployment Strategies
Docker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# MCP servers communicate via stdio
CMD ["python", "server.py"]
Configuration for Claude Desktop
{
"mcpServers": {
"database": {
"command": "docker",
"args": [
"run", "-i", "--rm",
"-e", "DB_HOST=host.docker.internal",
"-e", "DB_NAME=myapp",
"-e", "DB_USER=readonly_user",
"-e", "DB_PASSWORD=${DB_PASSWORD}",
"my-mcp-database-server"
]
}
}
}
Remote Deployment with SSE Transport
For shared, remote MCP servers, use the SSE (Server-Sent Events) transport:
# remote_server.py
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
transport = SseServerTransport("/messages/")
async def handle_sse(request):
async with transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await db_server.server.run(
streams[0], streams[1],
db_server.server.create_initialization_options(),
)
app = Starlette(routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=transport.handle_post_message),
])
# Run with: uvicorn remote_server:app --host 0.0.0.0 --port 8000
Key Takeaways
- MCP standardizes how AI models connect to tools and data -- build once, use everywhere
- Three primitives: Resources (read data), Tools (execute actions), Prompts (reusable templates)
- Security is paramount: Validate all inputs, restrict operations, use authentication, and apply rate limiting
- Test thoroughly with both the MCP Inspector for interactive testing and pytest for automated validation
- Deploy flexibly: Use stdio for local desktop integration or SSE transport for shared remote servers