Multi-Agent Systems
Multi-agent systems enable multiple AI agents to work together, each with specialized roles and capabilities, to solve complex problems that would be difficult for a single agent.
Multi-Agent Systems: Architectures where multiple AI agents collaborate, communicate, and coordinate to accomplish tasks that require diverse skills, perspectives, or parallel processing.
Why Multi-Agent Systems?
Single agents have limitations:
- Specialization vs. Generalization: Hard to be expert at everything
- Complex workflows: Multi-step tasks need coordination
- Parallel processing: Some tasks benefit from concurrent work
- Verification: Multiple perspectives reduce errors
Multi-agent systems provide:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Planner │────▶│ Executor │────▶│ Reviewer │
│ Agent │ │ Agent │ │ Agent │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└────────────────────┼────────────────────┘
Coordination
Core Concepts
1. Agent Roles
Define specialized agents for different tasks:
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
class AgentRole(Enum):
RESEARCHER = "researcher"
WRITER = "writer"
CRITIC = "critic"
PLANNER = "planner"
EXECUTOR = "executor"
@dataclass
class AgentConfig:
"""Configuration for an agent."""
role: AgentRole
name: str
description: str
system_prompt: str
tools: List[str]
temperature: float = 0.7
class Agent:
"""Base agent class."""
def __init__(self, config: AgentConfig, model: str = "gpt-4"):
self.config = config
self.model = model
self.memory: List[Dict[str, str]] = []
def process(self, message: str, context: Optional[Dict] = None) -> str:
"""Process a message and return response."""
# Build messages with system prompt
messages = [
{"role": "system", "content": self.config.system_prompt}
]
# Add conversation history
messages.extend(self.memory)
# Add current message with context
if context:
message = f"{message}\n\nContext: {context}"
messages.append({"role": "user", "content": message})
# Get response
import openai
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
temperature=self.config.temperature
)
reply = response.choices[0].message.content
# Update memory
self.memory.append({"role": "user", "content": message})
self.memory.append({"role": "assistant", "content": reply})
return reply
def reset_memory(self):
"""Clear agent's conversation memory."""
self.memory.clear()
# Create specialized agents
researcher_config = AgentConfig(
role=AgentRole.RESEARCHER,
name="Research Agent",
description="Gathers and synthesizes information",
system_prompt="""You are a research agent. Your role is to:
1. Gather relevant information on topics
2. Synthesize findings from multiple sources
3. Present factual, well-organized research
4. Cite sources when applicable
Be thorough, accurate, and objective.""",
tools=["web_search", "database_query"],
temperature=0.3
)
writer_config = AgentConfig(
role=AgentRole.WRITER,
name="Writer Agent",
description="Creates content based on research",
system_prompt="""You are a writer agent. Your role is to:
1. Transform research into engaging content
2. Maintain clarity and readability
3. Adapt tone to the audience
4. Structure information effectively
Be creative but accurate.""",
tools=["text_generation"],
temperature=0.7
)
critic_config = AgentConfig(
role=AgentRole.CRITIC,
name="Critic Agent",
description="Reviews and improves content",
system_prompt="""You are a critic agent. Your role is to:
1. Identify errors, inconsistencies, or gaps
2. Suggest specific improvements
3. Ensure quality and accuracy
4. Provide constructive feedback
Be thorough but constructive.""",
tools=["analysis"],
temperature=0.5
)
2. Agent Communication
Implement message passing between agents:
from typing import Protocol
from queue import Queue
from datetime import datetime
@dataclass
class Message:
"""Message passed between agents."""
sender: str
receiver: str
content: str
timestamp: datetime
metadata: Optional[Dict] = None
class MessageBus:
"""Central message bus for agent communication."""
def __init__(self):
self.queues: Dict[str, Queue] = {}
self.history: List[Message] = []
def register_agent(self, agent_name: str):
"""Register an agent to receive messages."""
self.queues[agent_name] = Queue()
def send(self, message: Message):
"""Send a message to an agent."""
if message.receiver not in self.queues:
raise ValueError(f"Unknown receiver: {message.receiver}")
self.queues[message.receiver].put(message)
self.history.append(message)
def receive(self, agent_name: str, timeout: Optional[float] = None) -> Optional[Message]:
"""Receive a message for an agent."""
if agent_name not in self.queues:
raise ValueError(f"Unknown agent: {agent_name}")
try:
return self.queues[agent_name].get(timeout=timeout)
except:
return None
def get_history(self, agent_name: Optional[str] = None) -> List[Message]:
"""Get message history, optionally filtered by agent."""
if agent_name:
return [
msg for msg in self.history
if msg.sender == agent_name or msg.receiver == agent_name
]
return self.history
class CommunicatingAgent(Agent):
"""Agent with communication capabilities."""
def __init__(self, config: AgentConfig, message_bus: MessageBus, model: str = "gpt-4"):
super().__init__(config, model)
self.message_bus = message_bus
self.message_bus.register_agent(config.name)
def send_message(self, receiver: str, content: str, metadata: Optional[Dict] = None):
"""Send a message to another agent."""
message = Message(
sender=self.config.name,
receiver=receiver,
content=content,
timestamp=datetime.now(),
metadata=metadata
)
self.message_bus.send(message)
def receive_message(self, timeout: Optional[float] = None) -> Optional[Message]:
"""Receive a message from another agent."""
return self.message_bus.receive(self.config.name, timeout)
Building a Multi-Agent Workflow
Sequential Workflow
Agents work in sequence, each building on the previous:
class SequentialWorkflow:
"""Execute agents in sequence."""
def __init__(self, agents: List[Agent]):
self.agents = agents
def run(self, initial_input: str) -> Dict[str, Any]:
"""Run the workflow."""
results = {}
current_input = initial_input
for i, agent in enumerate(self.agents):
print(f"\n{'='*60}")
print(f"Step {i+1}: {agent.config.name}")
print(f"{'='*60}")
# Process with current input
output = agent.process(current_input)
results[agent.config.name] = output
print(f"\nOutput:\n{output[:200]}...")
# Output becomes input for next agent
current_input = output
return results
# Create workflow
researcher = Agent(researcher_config)
writer = Agent(writer_config)
critic = Agent(critic_config)
workflow = SequentialWorkflow([researcher, writer, critic])
# Run workflow
results = workflow.run(
"Research and write about the benefits of multi-agent AI systems"
)
print(f"\n\nFinal Output:\n{results['Critic Agent']}")
Collaborative Workflow
Agents collaborate with discussion and refinement:
class CollaborativeWorkflow:
"""Agents collaborate through discussion."""
def __init__(self, agents: List[CommunicatingAgent], message_bus: MessageBus):
self.agents = agents
self.message_bus = message_bus
def run(self, task: str, max_rounds: int = 3) -> str:
"""Run collaborative workflow."""
print(f"Starting collaborative task: {task}\n")
# Initial assignment to first agent
first_agent = self.agents[0]
current_result = first_agent.process(task)
for round_num in range(max_rounds):
print(f"\n{'='*60}")
print(f"Round {round_num + 1}")
print(f"{'='*60}\n")
# Each agent reviews and improves
for i, agent in enumerate(self.agents):
if i == 0 and round_num == 0:
continue # Skip first agent in first round
# Get feedback/improvement
prompt = f"""Review the following work and provide improvements:
{current_result}
Provide specific suggestions or an improved version."""
improvement = agent.process(prompt)
print(f"\n{agent.config.name}:")
print(f"{improvement[:200]}...")
# Update result
current_result = improvement
return current_result
# Use collaborative workflow
message_bus = MessageBus()
collab_agents = [
CommunicatingAgent(researcher_config, message_bus),
CommunicatingAgent(writer_config, message_bus),
CommunicatingAgent(critic_config, message_bus)
]
collab_workflow = CollaborativeWorkflow(collab_agents, message_bus)
final_result = collab_workflow.run(
"Write a technical blog post about RAG systems",
max_rounds=2
)
Design Pattern: Use sequential workflows for linear tasks and collaborative workflows when multiple perspectives improve quality.
AutoGen-Style Implementation
Implement a simplified version of Microsoft's AutoGen framework:
from typing import Callable, Optional
class ConversableAgent:
"""AutoGen-style conversable agent."""
def __init__(
self,
name: str,
system_message: str,
model: str = "gpt-4",
human_input_mode: str = "NEVER"
):
self.name = name
self.system_message = system_message
self.model = model
self.human_input_mode = human_input_mode
self.conversation_history: List[Dict] = []
def generate_reply(self, messages: List[Dict]) -> str:
"""Generate a reply to messages."""
# Build full conversation
full_messages = [
{"role": "system", "content": self.system_message}
] + messages
import openai
response = openai.ChatCompletion.create(
model=self.model,
messages=full_messages,
temperature=0.7
)
return response.choices[0].message.content
def send(self, message: str, recipient: 'ConversableAgent') -> str:
"""Send a message to another agent."""
# Add to conversation history
self.conversation_history.append({
"role": "assistant",
"content": message,
"name": self.name
})
# Recipient processes and replies
return recipient.receive(message, self)
def receive(self, message: str, sender: 'ConversableAgent') -> str:
"""Receive and respond to a message."""
# Add to history
self.conversation_history.append({
"role": "user",
"content": message,
"name": sender.name
})
# Generate reply
reply = self.generate_reply(self.conversation_history)
return reply
class GroupChat:
"""Manage multi-agent group conversation."""
def __init__(
self,
agents: List[ConversableAgent],
messages: List[Dict],
max_round: int = 10
):
self.agents = agents
self.messages = messages
self.max_round = max_round
def select_speaker(self, last_speaker: Optional[ConversableAgent] = None) -> ConversableAgent:
"""Select next speaker (simplified - could use LLM for smart selection)."""
if not last_speaker:
return self.agents[0]
# Simple round-robin
idx = self.agents.index(last_speaker)
return self.agents[(idx + 1) % len(self.agents)]
def run(self):
"""Run the group chat."""
current_speaker = None
for round_num in range(self.max_round):
# Select next speaker
current_speaker = self.select_speaker(current_speaker)
print(f"\n{'='*60}")
print(f"Round {round_num + 1} - Speaker: {current_speaker.name}")
print(f"{'='*60}\n")
# Generate message
message = current_speaker.generate_reply(self.messages)
print(f"{message}\n")
# Add to messages
self.messages.append({
"role": "assistant",
"content": message,
"name": current_speaker.name
})
# Check if task is complete
if "TERMINATE" in message or "task complete" in message.lower():
print("Task completed!")
break
# Create AutoGen-style agents
planner = ConversableAgent(
name="Planner",
system_message="""You are a planner. Create detailed plans to solve tasks.
Break down complex problems into clear steps. When done, say TERMINATE."""
)
coder = ConversableAgent(
name="Coder",
system_message="""You are a coder. Implement solutions based on plans.
Write clean, well-documented code. When done, say TERMINATE."""
)
reviewer = ConversableAgent(
name="Reviewer",
system_message="""You are a code reviewer. Review code for correctness,
efficiency, and best practices. When satisfied, say TERMINATE."""
)
# Run group chat
group_chat = GroupChat(
agents=[planner, coder, reviewer],
messages=[{
"role": "user",
"content": "Create a function to calculate fibonacci numbers efficiently"
}],
max_round=6
)
group_chat.run()
CrewAI-Style Implementation
Implement a CrewAI-inspired task delegation system:
@dataclass
class Task:
"""A task to be completed by an agent."""
description: str
agent: Optional[Agent] = None
expected_output: Optional[str] = None
context: Optional[List['Task']] = None
class Crew:
"""Manages a crew of agents working on tasks."""
def __init__(
self,
agents: List[Agent],
tasks: List[Task],
process: str = "sequential"
):
self.agents = agents
self.tasks = tasks
self.process = process
self.results: Dict[Task, str] = {}
def kickoff(self) -> Dict[Task, str]:
"""Start the crew's work."""
if self.process == "sequential":
return self._run_sequential()
elif self.process == "hierarchical":
return self._run_hierarchical()
else:
raise ValueError(f"Unknown process: {self.process}")
def _run_sequential(self) -> Dict[Task, str]:
"""Run tasks sequentially."""
for task in self.tasks:
print(f"\n{'='*60}")
print(f"Task: {task.description}")
print(f"{'='*60}\n")
# Get context from previous tasks
context = {}
if task.context:
context = {
t.description: self.results[t]
for t in task.context
if t in self.results
}
# Execute task
agent = task.agent or self.agents[0]
result = agent.process(task.description, context)
print(f"Result:\n{result[:200]}...\n")
self.results[task] = result
return self.results
def _run_hierarchical(self) -> Dict[Task, str]:
"""Run tasks with a manager delegating work."""
# Create manager agent
manager = Agent(
AgentConfig(
role=AgentRole.PLANNER,
name="Manager",
description="Coordinates other agents",
system_prompt="""You are a manager coordinating a team.
Delegate tasks to appropriate agents and synthesize results.""",
tools=["delegate"],
temperature=0.7
)
)
# Manager coordinates execution
for task in self.tasks:
# Manager decides which agent should handle task
decision_prompt = f"""
Task: {task.description}
Available agents:
{chr(10).join(f"- {a.config.name}: {a.config.description}" for a in self.agents)}
Which agent should handle this task and what specific instructions should they receive?
"""
decision = manager.process(decision_prompt)
# Execute with chosen agent (simplified - parse decision)
agent = task.agent or self.agents[0]
result = agent.process(task.description)
self.results[task] = result
return self.results
# Create crew
task1 = Task(
description="Research the latest developments in multi-agent AI systems",
)
task2 = Task(
description="Write a summary of the research findings",
context=[task1]
)
task3 = Task(
description="Create recommendations based on the summary",
context=[task2]
)
crew = Crew(
agents=[
Agent(researcher_config),
Agent(writer_config),
Agent(critic_config)
],
tasks=[task1, task2, task3],
process="sequential"
)
results = crew.kickoff()
Best Practice: Assign tasks to agents based on their specializations. A research agent for gathering information, a writer for content creation, and a critic for review.
Advanced: Dynamic Agent Creation
Create agents dynamically based on task requirements:
class AgentFactory:
"""Factory for creating specialized agents."""
@staticmethod
def create_specialized_agent(
task_description: str,
model: str = "gpt-4"
) -> Agent:
"""Create an agent specialized for a specific task."""
# Use LLM to determine optimal agent configuration
analysis_prompt = f"""
Analyze this task and determine the optimal agent configuration:
Task: {task_description}
Provide:
1. Agent role/specialization
2. Key capabilities needed
3. Appropriate system prompt
4. Recommended temperature (0.0-1.0)
"""
import openai
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": analysis_prompt}],
temperature=0.3
)
analysis = response.choices[0].message.content
# Parse and create agent (simplified)
config = AgentConfig(
role=AgentRole.EXECUTOR,
name=f"Specialist-{hash(task_description) % 1000}",
description=f"Specialized for: {task_description[:50]}",
system_prompt=analysis,
tools=[],
temperature=0.7
)
return Agent(config, model)
# Use factory
factory = AgentFactory()
custom_agent = factory.create_specialized_agent(
"Analyze financial data and create investment recommendations"
)
Coordination Overhead: More agents means more coordination complexity. Start simple and add agents only when specialization provides clear benefits.
Key Takeaways
- Specialization improves performance - dedicated agents for specific roles
- Communication is critical - clear message passing between agents
- Workflows structure collaboration - sequential vs. collaborative patterns
- Frameworks reduce complexity - AutoGen, CrewAI provide proven patterns
- Dynamic creation enables flexibility - create agents as needed for tasks
Quiz
Test your understanding of multi-agent systems: