Back
advanced
AI Agents & Autonomous Systems

Multi-Agent Systems

Build sophisticated multi-agent systems where AI agents collaborate, communicate, and solve complex problems together

30 min read· Multi-Agent· AutoGen· CrewAI· AI Agents

Multi-Agent Systems

Multi-agent systems enable multiple AI agents to work together, each with specialized roles and capabilities, to solve complex problems that would be difficult for a single agent.

Multi-Agent Systems: Architectures where multiple AI agents collaborate, communicate, and coordinate to accomplish tasks that require diverse skills, perspectives, or parallel processing.

Why Multi-Agent Systems?

Single agents have limitations:

  • Specialization vs. Generalization: Hard to be expert at everything
  • Complex workflows: Multi-step tasks need coordination
  • Parallel processing: Some tasks benefit from concurrent work
  • Verification: Multiple perspectives reduce errors

Multi-agent systems provide:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Planner   │────▶│  Executor   │────▶│  Reviewer   │
│   Agent     │     │   Agent     │     │   Agent     │
└─────────────┘     └─────────────┘     └─────────────┘
      │                    │                    │
      └────────────────────┼────────────────────┘
                    Coordination

Core Concepts

1. Agent Roles

Define specialized agents for different tasks:

python
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum


class AgentRole(Enum):
    RESEARCHER = "researcher"
    WRITER = "writer"
    CRITIC = "critic"
    PLANNER = "planner"
    EXECUTOR = "executor"


@dataclass
class AgentConfig:
    """Configuration for an agent."""
    role: AgentRole
    name: str
    description: str
    system_prompt: str
    tools: List[str]
    temperature: float = 0.7


class Agent:
    """Base agent class."""

    def __init__(self, config: AgentConfig, model: str = "gpt-4"):
        self.config = config
        self.model = model
        self.memory: List[Dict[str, str]] = []

    def process(self, message: str, context: Optional[Dict] = None) -> str:
        """Process a message and return response."""
        # Build messages with system prompt
        messages = [
            {"role": "system", "content": self.config.system_prompt}
        ]

        # Add conversation history
        messages.extend(self.memory)

        # Add current message with context
        if context:
            message = f"{message}\n\nContext: {context}"

        messages.append({"role": "user", "content": message})

        # Get response
        import openai
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=messages,
            temperature=self.config.temperature
        )

        reply = response.choices[0].message.content

        # Update memory
        self.memory.append({"role": "user", "content": message})
        self.memory.append({"role": "assistant", "content": reply})

        return reply

    def reset_memory(self):
        """Clear agent's conversation memory."""
        self.memory.clear()


# Create specialized agents
researcher_config = AgentConfig(
    role=AgentRole.RESEARCHER,
    name="Research Agent",
    description="Gathers and synthesizes information",
    system_prompt="""You are a research agent. Your role is to:
1. Gather relevant information on topics
2. Synthesize findings from multiple sources
3. Present factual, well-organized research
4. Cite sources when applicable

Be thorough, accurate, and objective.""",
    tools=["web_search", "database_query"],
    temperature=0.3
)

writer_config = AgentConfig(
    role=AgentRole.WRITER,
    name="Writer Agent",
    description="Creates content based on research",
    system_prompt="""You are a writer agent. Your role is to:
1. Transform research into engaging content
2. Maintain clarity and readability
3. Adapt tone to the audience
4. Structure information effectively

Be creative but accurate.""",
    tools=["text_generation"],
    temperature=0.7
)

critic_config = AgentConfig(
    role=AgentRole.CRITIC,
    name="Critic Agent",
    description="Reviews and improves content",
    system_prompt="""You are a critic agent. Your role is to:
1. Identify errors, inconsistencies, or gaps
2. Suggest specific improvements
3. Ensure quality and accuracy
4. Provide constructive feedback

Be thorough but constructive.""",
    tools=["analysis"],
    temperature=0.5
)

2. Agent Communication

Implement message passing between agents:

python
from typing import Protocol
from queue import Queue
from datetime import datetime


@dataclass
class Message:
    """Message passed between agents."""
    sender: str
    receiver: str
    content: str
    timestamp: datetime
    metadata: Optional[Dict] = None


class MessageBus:
    """Central message bus for agent communication."""

    def __init__(self):
        self.queues: Dict[str, Queue] = {}
        self.history: List[Message] = []

    def register_agent(self, agent_name: str):
        """Register an agent to receive messages."""
        self.queues[agent_name] = Queue()

    def send(self, message: Message):
        """Send a message to an agent."""
        if message.receiver not in self.queues:
            raise ValueError(f"Unknown receiver: {message.receiver}")

        self.queues[message.receiver].put(message)
        self.history.append(message)

    def receive(self, agent_name: str, timeout: Optional[float] = None) -> Optional[Message]:
        """Receive a message for an agent."""
        if agent_name not in self.queues:
            raise ValueError(f"Unknown agent: {agent_name}")

        try:
            return self.queues[agent_name].get(timeout=timeout)
        except:
            return None

    def get_history(self, agent_name: Optional[str] = None) -> List[Message]:
        """Get message history, optionally filtered by agent."""
        if agent_name:
            return [
                msg for msg in self.history
                if msg.sender == agent_name or msg.receiver == agent_name
            ]
        return self.history


class CommunicatingAgent(Agent):
    """Agent with communication capabilities."""

    def __init__(self, config: AgentConfig, message_bus: MessageBus, model: str = "gpt-4"):
        super().__init__(config, model)
        self.message_bus = message_bus
        self.message_bus.register_agent(config.name)

    def send_message(self, receiver: str, content: str, metadata: Optional[Dict] = None):
        """Send a message to another agent."""
        message = Message(
            sender=self.config.name,
            receiver=receiver,
            content=content,
            timestamp=datetime.now(),
            metadata=metadata
        )
        self.message_bus.send(message)

    def receive_message(self, timeout: Optional[float] = None) -> Optional[Message]:
        """Receive a message from another agent."""
        return self.message_bus.receive(self.config.name, timeout)

Building a Multi-Agent Workflow

Sequential Workflow

Agents work in sequence, each building on the previous:

python
class SequentialWorkflow:
    """Execute agents in sequence."""

    def __init__(self, agents: List[Agent]):
        self.agents = agents

    def run(self, initial_input: str) -> Dict[str, Any]:
        """Run the workflow."""
        results = {}
        current_input = initial_input

        for i, agent in enumerate(self.agents):
            print(f"\n{'='*60}")
            print(f"Step {i+1}: {agent.config.name}")
            print(f"{'='*60}")

            # Process with current input
            output = agent.process(current_input)
            results[agent.config.name] = output

            print(f"\nOutput:\n{output[:200]}...")

            # Output becomes input for next agent
            current_input = output

        return results


# Create workflow
researcher = Agent(researcher_config)
writer = Agent(writer_config)
critic = Agent(critic_config)

workflow = SequentialWorkflow([researcher, writer, critic])

# Run workflow
results = workflow.run(
    "Research and write about the benefits of multi-agent AI systems"
)

print(f"\n\nFinal Output:\n{results['Critic Agent']}")

Collaborative Workflow

Agents collaborate with discussion and refinement:

python
class CollaborativeWorkflow:
    """Agents collaborate through discussion."""

    def __init__(self, agents: List[CommunicatingAgent], message_bus: MessageBus):
        self.agents = agents
        self.message_bus = message_bus

    def run(self, task: str, max_rounds: int = 3) -> str:
        """Run collaborative workflow."""
        print(f"Starting collaborative task: {task}\n")

        # Initial assignment to first agent
        first_agent = self.agents[0]
        current_result = first_agent.process(task)

        for round_num in range(max_rounds):
            print(f"\n{'='*60}")
            print(f"Round {round_num + 1}")
            print(f"{'='*60}\n")

            # Each agent reviews and improves
            for i, agent in enumerate(self.agents):
                if i == 0 and round_num == 0:
                    continue  # Skip first agent in first round

                # Get feedback/improvement
                prompt = f"""Review the following work and provide improvements:

{current_result}

Provide specific suggestions or an improved version."""

                improvement = agent.process(prompt)

                print(f"\n{agent.config.name}:")
                print(f"{improvement[:200]}...")

                # Update result
                current_result = improvement

        return current_result


# Use collaborative workflow
message_bus = MessageBus()
collab_agents = [
    CommunicatingAgent(researcher_config, message_bus),
    CommunicatingAgent(writer_config, message_bus),
    CommunicatingAgent(critic_config, message_bus)
]

collab_workflow = CollaborativeWorkflow(collab_agents, message_bus)
final_result = collab_workflow.run(
    "Write a technical blog post about RAG systems",
    max_rounds=2
)

Design Pattern: Use sequential workflows for linear tasks and collaborative workflows when multiple perspectives improve quality.

AutoGen-Style Implementation

Implement a simplified version of Microsoft's AutoGen framework:

python
from typing import Callable, Optional


class ConversableAgent:
    """AutoGen-style conversable agent."""

    def __init__(
        self,
        name: str,
        system_message: str,
        model: str = "gpt-4",
        human_input_mode: str = "NEVER"
    ):
        self.name = name
        self.system_message = system_message
        self.model = model
        self.human_input_mode = human_input_mode
        self.conversation_history: List[Dict] = []

    def generate_reply(self, messages: List[Dict]) -> str:
        """Generate a reply to messages."""
        # Build full conversation
        full_messages = [
            {"role": "system", "content": self.system_message}
        ] + messages

        import openai
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=full_messages,
            temperature=0.7
        )

        return response.choices[0].message.content

    def send(self, message: str, recipient: 'ConversableAgent') -> str:
        """Send a message to another agent."""
        # Add to conversation history
        self.conversation_history.append({
            "role": "assistant",
            "content": message,
            "name": self.name
        })

        # Recipient processes and replies
        return recipient.receive(message, self)

    def receive(self, message: str, sender: 'ConversableAgent') -> str:
        """Receive and respond to a message."""
        # Add to history
        self.conversation_history.append({
            "role": "user",
            "content": message,
            "name": sender.name
        })

        # Generate reply
        reply = self.generate_reply(self.conversation_history)

        return reply


class GroupChat:
    """Manage multi-agent group conversation."""

    def __init__(
        self,
        agents: List[ConversableAgent],
        messages: List[Dict],
        max_round: int = 10
    ):
        self.agents = agents
        self.messages = messages
        self.max_round = max_round

    def select_speaker(self, last_speaker: Optional[ConversableAgent] = None) -> ConversableAgent:
        """Select next speaker (simplified - could use LLM for smart selection)."""
        if not last_speaker:
            return self.agents[0]

        # Simple round-robin
        idx = self.agents.index(last_speaker)
        return self.agents[(idx + 1) % len(self.agents)]

    def run(self):
        """Run the group chat."""
        current_speaker = None

        for round_num in range(self.max_round):
            # Select next speaker
            current_speaker = self.select_speaker(current_speaker)

            print(f"\n{'='*60}")
            print(f"Round {round_num + 1} - Speaker: {current_speaker.name}")
            print(f"{'='*60}\n")

            # Generate message
            message = current_speaker.generate_reply(self.messages)

            print(f"{message}\n")

            # Add to messages
            self.messages.append({
                "role": "assistant",
                "content": message,
                "name": current_speaker.name
            })

            # Check if task is complete
            if "TERMINATE" in message or "task complete" in message.lower():
                print("Task completed!")
                break


# Create AutoGen-style agents
planner = ConversableAgent(
    name="Planner",
    system_message="""You are a planner. Create detailed plans to solve tasks.
Break down complex problems into clear steps. When done, say TERMINATE."""
)

coder = ConversableAgent(
    name="Coder",
    system_message="""You are a coder. Implement solutions based on plans.
Write clean, well-documented code. When done, say TERMINATE."""
)

reviewer = ConversableAgent(
    name="Reviewer",
    system_message="""You are a code reviewer. Review code for correctness,
efficiency, and best practices. When satisfied, say TERMINATE."""
)

# Run group chat
group_chat = GroupChat(
    agents=[planner, coder, reviewer],
    messages=[{
        "role": "user",
        "content": "Create a function to calculate fibonacci numbers efficiently"
    }],
    max_round=6
)

group_chat.run()

CrewAI-Style Implementation

Implement a CrewAI-inspired task delegation system:

python
@dataclass
class Task:
    """A task to be completed by an agent."""
    description: str
    agent: Optional[Agent] = None
    expected_output: Optional[str] = None
    context: Optional[List['Task']] = None


class Crew:
    """Manages a crew of agents working on tasks."""

    def __init__(
        self,
        agents: List[Agent],
        tasks: List[Task],
        process: str = "sequential"
    ):
        self.agents = agents
        self.tasks = tasks
        self.process = process
        self.results: Dict[Task, str] = {}

    def kickoff(self) -> Dict[Task, str]:
        """Start the crew's work."""
        if self.process == "sequential":
            return self._run_sequential()
        elif self.process == "hierarchical":
            return self._run_hierarchical()
        else:
            raise ValueError(f"Unknown process: {self.process}")

    def _run_sequential(self) -> Dict[Task, str]:
        """Run tasks sequentially."""
        for task in self.tasks:
            print(f"\n{'='*60}")
            print(f"Task: {task.description}")
            print(f"{'='*60}\n")

            # Get context from previous tasks
            context = {}
            if task.context:
                context = {
                    t.description: self.results[t]
                    for t in task.context
                    if t in self.results
                }

            # Execute task
            agent = task.agent or self.agents[0]
            result = agent.process(task.description, context)

            print(f"Result:\n{result[:200]}...\n")

            self.results[task] = result

        return self.results

    def _run_hierarchical(self) -> Dict[Task, str]:
        """Run tasks with a manager delegating work."""
        # Create manager agent
        manager = Agent(
            AgentConfig(
                role=AgentRole.PLANNER,
                name="Manager",
                description="Coordinates other agents",
                system_prompt="""You are a manager coordinating a team.
Delegate tasks to appropriate agents and synthesize results.""",
                tools=["delegate"],
                temperature=0.7
            )
        )

        # Manager coordinates execution
        for task in self.tasks:
            # Manager decides which agent should handle task
            decision_prompt = f"""
Task: {task.description}

Available agents:
{chr(10).join(f"- {a.config.name}: {a.config.description}" for a in self.agents)}

Which agent should handle this task and what specific instructions should they receive?
"""

            decision = manager.process(decision_prompt)

            # Execute with chosen agent (simplified - parse decision)
            agent = task.agent or self.agents[0]
            result = agent.process(task.description)

            self.results[task] = result

        return self.results


# Create crew
task1 = Task(
    description="Research the latest developments in multi-agent AI systems",
)

task2 = Task(
    description="Write a summary of the research findings",
    context=[task1]
)

task3 = Task(
    description="Create recommendations based on the summary",
    context=[task2]
)

crew = Crew(
    agents=[
        Agent(researcher_config),
        Agent(writer_config),
        Agent(critic_config)
    ],
    tasks=[task1, task2, task3],
    process="sequential"
)

results = crew.kickoff()

Best Practice: Assign tasks to agents based on their specializations. A research agent for gathering information, a writer for content creation, and a critic for review.

Advanced: Dynamic Agent Creation

Create agents dynamically based on task requirements:

python
class AgentFactory:
    """Factory for creating specialized agents."""

    @staticmethod
    def create_specialized_agent(
        task_description: str,
        model: str = "gpt-4"
    ) -> Agent:
        """Create an agent specialized for a specific task."""
        # Use LLM to determine optimal agent configuration
        analysis_prompt = f"""
Analyze this task and determine the optimal agent configuration:

Task: {task_description}

Provide:
1. Agent role/specialization
2. Key capabilities needed
3. Appropriate system prompt
4. Recommended temperature (0.0-1.0)
"""

        import openai
        response = openai.ChatCompletion.create(
            model=model,
            messages=[{"role": "user", "content": analysis_prompt}],
            temperature=0.3
        )

        analysis = response.choices[0].message.content

        # Parse and create agent (simplified)
        config = AgentConfig(
            role=AgentRole.EXECUTOR,
            name=f"Specialist-{hash(task_description) % 1000}",
            description=f"Specialized for: {task_description[:50]}",
            system_prompt=analysis,
            tools=[],
            temperature=0.7
        )

        return Agent(config, model)


# Use factory
factory = AgentFactory()
custom_agent = factory.create_specialized_agent(
    "Analyze financial data and create investment recommendations"
)

Coordination Overhead: More agents means more coordination complexity. Start simple and add agents only when specialization provides clear benefits.

Key Takeaways

  1. Specialization improves performance - dedicated agents for specific roles
  2. Communication is critical - clear message passing between agents
  3. Workflows structure collaboration - sequential vs. collaborative patterns
  4. Frameworks reduce complexity - AutoGen, CrewAI provide proven patterns
  5. Dynamic creation enables flexibility - create agents as needed for tasks

Quiz

Test your understanding of multi-agent systems: