Building Custom Swarms: A Comprehensive Guide for Swarm Engineers¶
Introduction¶
As artificial intelligence and machine learning continue to grow in complexity and applicability, building systems that can harness multiple agents to solve complex tasks becomes more critical. Swarm engineering enables AI agents to collaborate and solve problems autonomously in diverse fields such as finance, marketing, operations, and even creative industries.
This comprehensive guide covers how to build a custom swarm system that integrates multiple agents into a cohesive system capable of solving tasks collaboratively. We'll cover everything from basic swarm structure to advanced features like conversation management, logging, error handling, and scalability.
By the end of this guide, you will have a complete understanding of:
-
What swarms are and how they can be built
-
How to create agents and integrate them into swarms
-
How to implement proper conversation management for message storage
-
Best practices for error handling, logging, and optimization
-
How to make swarms scalable and production-ready
Overview of Swarm Architecture¶
A Swarm refers to a collection of agents that collaborate to solve a problem. Each agent in the swarm performs part of the task, either independently or by communicating with other agents. Swarms are ideal for:
-
Scalability: You can add or remove agents dynamically based on the task's complexity
-
Flexibility: Each agent can be designed to specialize in different parts of the problem, offering modularity
-
Autonomy: Agents in a swarm can operate autonomously, reducing the need for constant supervision
-
Conversation Management: All interactions are tracked and stored for analysis and continuity
Core Requirements for Swarm Classes¶
Every Swarm class must adhere to these fundamental requirements:
Required Methods and Attributes¶
-
run(task: str, img: str, *args, **kwargs)
method: The primary execution method for tasks -
name
: A descriptive name for the swarm -
description
: A clear description of the swarm's purpose -
agents
: A list of callables representing the agents -
conversation
: A conversation structure for message storage and history management
Required Agent Structure¶
Each Agent within the swarm must contain:
-
agent_name
: Unique identifier for the agent -
system_prompt
: Instructions that guide the agent's behavior -
run
method: Method to execute tasks assigned to the agent
Setting Up the Foundation¶
Required Dependencies¶
from typing import List, Union, Any, Optional, Callable
from loguru import logger
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.agent import Agent
import concurrent.futures
import os
Custom Exception Handling¶
class SwarmExecutionError(Exception):
"""Custom exception for handling swarm execution errors."""
pass
class AgentValidationError(Exception):
"""Custom exception for agent validation errors."""
pass
Building the Custom Swarm Class¶
Basic Swarm Structure¶
class CustomSwarm(BaseSwarm):
"""
A custom swarm class to manage and execute tasks with multiple agents.
This swarm integrates conversation management for tracking all agent interactions,
provides error handling, and supports both sequential and concurrent execution.
Attributes:
name (str): The name of the swarm.
description (str): A brief description of the swarm's purpose.
agents (List[Callable]): A list of callables representing the agents.
conversation (Conversation): Conversation management for message storage.
max_workers (int): Maximum number of concurrent workers for parallel execution.
autosave_conversation (bool): Whether to automatically save conversation history.
"""
def __init__(
self,
name: str,
description: str,
agents: List[Callable],
max_workers: int = 4,
autosave_conversation: bool = True,
conversation_config: Optional[dict] = None,
):
"""
Initialize the CustomSwarm with its name, description, and agents.
Args:
name (str): The name of the swarm.
description (str): A description of the swarm.
agents (List[Callable]): A list of callables that provide the agents for the swarm.
max_workers (int): Maximum number of concurrent workers.
autosave_conversation (bool): Whether to automatically save conversations.
conversation_config (dict): Configuration for conversation management.
"""
super().__init__(name=name, description=description, agents=agents)
self.name = name
self.description = description
self.agents = agents
self.max_workers = max_workers
self.autosave_conversation = autosave_conversation
# Initialize conversation management
# See: https://docs.swarms.world/swarms/structs/conversation/
conversation_config = conversation_config or {}
self.conversation = Conversation(
id=f"swarm_{name}_{int(time.time())}",
name=f"{name}_conversation",
autosave=autosave_conversation,
save_enabled=True,
time_enabled=True,
**conversation_config
)
# Validate agents and log initialization
self.validate_agents()
logger.info(f"🚀 CustomSwarm '{self.name}' initialized with {len(self.agents)} agents")
# Add swarm initialization to conversation history
self.conversation.add(
role="System",
content=f"Swarm '{self.name}' initialized with {len(self.agents)} agents: {[getattr(agent, 'agent_name', 'Unknown') for agent in self.agents]}"
)
def validate_agents(self):
"""
Validates that each agent has the required methods and attributes.
Raises:
AgentValidationError: If any agent fails validation.
"""
for i, agent in enumerate(self.agents):
# Check for required run method
if not hasattr(agent, 'run'):
raise AgentValidationError(f"Agent at index {i} does not have a 'run' method.")
# Check for agent_name attribute
if not hasattr(agent, 'agent_name'):
logger.warning(f"Agent at index {i} does not have 'agent_name' attribute. Using 'Agent_{i}'")
agent.agent_name = f"Agent_{i}"
logger.info(f"✅ Agent '{agent.agent_name}' validated successfully.")
def run(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> Any:
"""
Execute a task using the swarm and its agents with conversation tracking.
Args:
task (str): The task description.
img (str): The image input (optional).
*args: Additional positional arguments for customization.
**kwargs: Additional keyword arguments for fine-tuning behavior.
Returns:
Any: The result of the task execution, aggregated from all agents.
"""
logger.info(f"🎯 Running task '{task}' across {len(self.agents)} agents in swarm '{self.name}'")
# Add task to conversation history
self.conversation.add(
role="User",
content=f"Task: {task}" + (f" | Image: {img}" if img else ""),
category="input"
)
try:
# Execute task across all agents
results = self._execute_agents(task, img, *args, **kwargs)
# Add results to conversation
self.conversation.add(
role="Swarm",
content=f"Task completed successfully. Processed by {len(results)} agents.",
category="output"
)
logger.success(f"✅ Task completed successfully by swarm '{self.name}'")
return results
except Exception as e:
error_msg = f"❌ Task execution failed in swarm '{self.name}': {str(e)}"
logger.error(error_msg)
# Add error to conversation
self.conversation.add(
role="System",
content=f"Error: {error_msg}",
category="error"
)
raise SwarmExecutionError(error_msg)
def _execute_agents(self, task: str, img: str = None, *args, **kwargs) -> List[Any]:
"""
Execute the task across all agents with proper conversation tracking.
Args:
task (str): The task to execute.
img (str): Optional image input.
Returns:
List[Any]: Results from all agents.
"""
results = []
for agent in self.agents:
try:
# Execute agent task
result = agent.run(task, img, *args, **kwargs)
results.append(result)
# Add agent response to conversation
self.conversation.add(
role=agent.agent_name,
content=result,
category="agent_output"
)
logger.info(f"✅ Agent '{agent.agent_name}' completed task successfully")
except Exception as e:
error_msg = f"Agent '{agent.agent_name}' failed: {str(e)}"
logger.error(error_msg)
# Add agent error to conversation
self.conversation.add(
role=agent.agent_name,
content=f"Error: {error_msg}",
category="agent_error"
)
# Continue with other agents but log the failure
results.append(f"FAILED: {error_msg}")
return results
Enhanced Swarm with Concurrent Execution¶
def run_concurrent(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> List[Any]:
"""
Execute a task using concurrent execution for better performance.
Args:
task (str): The task description.
img (str): The image input (optional).
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
List[Any]: Results from all agents executed concurrently.
"""
logger.info(f"🚀 Running task concurrently across {len(self.agents)} agents")
# Add task to conversation
self.conversation.add(
role="User",
content=f"Concurrent Task: {task}" + (f" | Image: {img}" if img else ""),
category="input"
)
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all agent tasks
future_to_agent = {
executor.submit(self._run_single_agent, agent, task, img, *args, **kwargs): agent
for agent in self.agents
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future]
try:
result = future.result()
results.append(result)
# Add to conversation
self.conversation.add(
role=agent.agent_name,
content=result,
category="agent_output"
)
except Exception as e:
error_msg = f"Concurrent execution failed for agent '{agent.agent_name}': {str(e)}"
logger.error(error_msg)
results.append(f"FAILED: {error_msg}")
# Add error to conversation
self.conversation.add(
role=agent.agent_name,
content=f"Error: {error_msg}",
category="agent_error"
)
# Add completion summary
self.conversation.add(
role="Swarm",
content=f"Concurrent task completed. {len(results)} agents processed.",
category="output"
)
return results
def _run_single_agent(self, agent: Callable, task: str, img: str = None, *args, **kwargs) -> Any:
"""
Execute a single agent with error handling.
Args:
agent: The agent to execute.
task (str): The task to execute.
img (str): Optional image input.
Returns:
Any: The agent's result.
"""
try:
return agent.run(task, img, *args, **kwargs)
except Exception as e:
logger.error(f"Agent '{getattr(agent, 'agent_name', 'Unknown')}' execution failed: {str(e)}")
raise
Advanced Features¶
def run_with_retries(self, task: str, img: str = None, retries: int = 3, *args, **kwargs) -> List[Any]:
"""
Execute a task with retry logic for failed agents.
Args:
task (str): The task to execute.
img (str): Optional image input.
retries (int): Number of retries for failed agents.
Returns:
List[Any]: Results from all agents with retry attempts.
"""
logger.info(f"🔄 Running task with {retries} retries per agent")
# Add task to conversation
self.conversation.add(
role="User",
content=f"Task with retries ({retries}): {task}",
category="input"
)
results = []
for agent in self.agents:
attempt = 0
success = False
while attempt <= retries and not success:
try:
result = agent.run(task, img, *args, **kwargs)
results.append(result)
success = True
# Add successful result to conversation
self.conversation.add(
role=agent.agent_name,
content=result,
category="agent_output"
)
if attempt > 0:
logger.success(f"✅ Agent '{agent.agent_name}' succeeded on attempt {attempt + 1}")
except Exception as e:
attempt += 1
error_msg = f"Agent '{agent.agent_name}' failed on attempt {attempt}: {str(e)}"
logger.warning(error_msg)
# Add retry attempt to conversation
self.conversation.add(
role=agent.agent_name,
content=f"Retry attempt {attempt}: {error_msg}",
category="agent_retry"
)
if attempt > retries:
final_error = f"Agent '{agent.agent_name}' exhausted all {retries} retries"
logger.error(final_error)
results.append(f"FAILED: {final_error}")
# Add final failure to conversation
self.conversation.add(
role=agent.agent_name,
content=final_error,
category="agent_error"
)
return results
def get_conversation_summary(self) -> dict:
"""
Get a summary of the conversation history and agent performance.
Returns:
dict: Summary of conversation statistics and agent performance.
"""
# Get conversation statistics
message_counts = self.conversation.count_messages_by_role()
# Count categories
category_counts = {}
for message in self.conversation.conversation_history:
category = message.get("category", "uncategorized")
category_counts[category] = category_counts.get(category, 0) + 1
# Get token counts if available
token_summary = self.conversation.export_and_count_categories()
return {
"swarm_name": self.name,
"total_messages": len(self.conversation.conversation_history),
"messages_by_role": message_counts,
"messages_by_category": category_counts,
"token_summary": token_summary,
"conversation_id": self.conversation.id,
}
def export_conversation(self, filepath: str = None) -> str:
"""
Export the conversation history to a file.
Args:
filepath (str): Optional custom filepath for export.
Returns:
str: The filepath where the conversation was saved.
"""
if filepath is None:
filepath = f"conversations/{self.name}_{self.conversation.id}.json"
self.conversation.export_conversation(filepath)
logger.info(f"📄 Conversation exported to: {filepath}")
return filepath
def display_conversation(self, detailed: bool = True):
"""
Display the conversation history in a formatted way.
Args:
detailed (bool): Whether to show detailed information.
"""
logger.info(f"💬 Displaying conversation for swarm: {self.name}")
self.conversation.display_conversation(detailed=detailed)
Creating Agents for Your Swarm¶
Basic Agent Structure¶
class CustomAgent:
"""
A custom agent class that integrates with the swarm conversation system.
Attributes:
agent_name (str): The name of the agent.
system_prompt (str): The system prompt guiding the agent's behavior.
conversation (Optional[Conversation]): Shared conversation for context.
"""
def __init__(
self,
agent_name: str,
system_prompt: str,
conversation: Optional[Conversation] = None
):
"""
Initialize the agent with its name and system prompt.
Args:
agent_name (str): The name of the agent.
system_prompt (str): The guiding prompt for the agent.
conversation (Optional[Conversation]): Shared conversation context.
"""
self.agent_name = agent_name
self.system_prompt = system_prompt
self.conversation = conversation
def run(self, task: str, img: str = None, *args: Any, **kwargs: Any) -> Any:
"""
Execute a specific task assigned to the agent.
Args:
task (str): The task description.
img (str): The image input for processing.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Any: The result of the task execution.
"""
# Add context from shared conversation if available
context = ""
if self.conversation:
context = f"Previous context: {self.conversation.get_last_message_as_string()}\n\n"
# Process the task (implement your custom logic here)
result = f"Agent {self.agent_name} processed: {context}{task}"
logger.info(f"🤖 Agent '{self.agent_name}' completed task")
return result
Using Swarms Framework Agents¶
You can also use the built-in Agent class from the Swarms framework:
from swarms.structs.agent import Agent
def create_financial_agent() -> Agent:
"""Create a financial analysis agent."""
return Agent(
agent_name="FinancialAnalyst",
system_prompt="You are a financial analyst specializing in market analysis and risk assessment.",
model_name="gpt-4o-mini",
max_loops=1,
)
def create_marketing_agent() -> Agent:
"""Create a marketing analysis agent."""
return Agent(
agent_name="MarketingSpecialist",
system_prompt="You are a marketing specialist focused on campaign analysis and customer insights.",
model_name="gpt-4o-mini",
max_loops=1,
)
Complete Implementation Example¶
Setting Up Your Swarm¶
import time
from typing import List
def create_multi_domain_swarm() -> CustomSwarm:
"""
Create a comprehensive multi-domain analysis swarm.
Returns:
CustomSwarm: A configured swarm with multiple specialized agents.
"""
# Create agents
agents = [
create_financial_agent(),
create_marketing_agent(),
Agent(
agent_name="OperationsAnalyst",
system_prompt="You are an operations analyst specializing in process optimization and efficiency.",
model_name="gpt-4o-mini",
max_loops=1,
),
]
# Configure conversation settings
conversation_config = {
"backend": "sqlite", # Use SQLite for persistent storage
"db_path": f"conversations/swarm_conversations.db",
"time_enabled": True,
"token_count": True,
}
# Create the swarm
swarm = CustomSwarm(
name="MultiDomainAnalysisSwarm",
description="A comprehensive swarm for financial, marketing, and operations analysis",
agents=agents,
max_workers=3,
autosave_conversation=True,
conversation_config=conversation_config,
)
return swarm
# Usage example
if __name__ == "__main__":
# Create and initialize the swarm
swarm = create_multi_domain_swarm()
# Execute a complex analysis task
task = """
Analyze the Q3 2024 performance data for our company:
- Revenue: $2.5M (up 15% from Q2)
- Customer acquisition: 1,200 new customers
- Marketing spend: $150K
- Operational costs: $800K
Provide insights from financial, marketing, and operations perspectives.
"""
# Run the analysis
results = swarm.run(task)
# Display results
print("\n" + "="*50)
print("SWARM ANALYSIS RESULTS")
print("="*50)
for i, result in enumerate(results):
agent_name = swarm.agents[i].agent_name
print(f"\n🤖 {agent_name}:")
print(f"📊 {result}")
# Get conversation summary
summary = swarm.get_conversation_summary()
print(f"\n📈 Conversation Summary:")
print(f" Total messages: {summary['total_messages']}")
print(f" Total tokens: {summary['token_summary']['total_tokens']}")
# Export conversation for later analysis
export_path = swarm.export_conversation()
print(f"💾 Conversation saved to: {export_path}")
Advanced Usage with Concurrent Execution¶
def run_batch_analysis():
"""Example of running multiple tasks concurrently."""
swarm = create_multi_domain_swarm()
tasks = [
"Analyze Q1 financial performance",
"Evaluate marketing campaign effectiveness",
"Review operational efficiency metrics",
"Assess customer satisfaction trends",
]
# Process all tasks concurrently
all_results = []
for task in tasks:
results = swarm.run_concurrent(task)
all_results.append({"task": task, "results": results})
return all_results
Conversation Management Integration¶
The swarm uses the Swarms framework's Conversation structure for comprehensive message storage and management. This provides:
Key Features¶
-
Persistent Storage: Multiple backend options (SQLite, Redis, Supabase, etc.)
-
Message Categorization: Organize messages by type (input, output, error, etc.)
-
Token Tracking: Monitor token usage across conversations
-
Export/Import: Save and load conversation histories
-
Search Capabilities: Find specific messages or content
Conversation Configuration Options¶
conversation_config = {
# Backend storage options
"backend": "sqlite", # or "redis", "supabase", "duckdb", "in-memory"
# File-based storage
"db_path": "conversations/swarm_data.db",
# Redis configuration (if using Redis backend)
"redis_host": "localhost",
"redis_port": 6379,
# Features
"time_enabled": True, # Add timestamps to messages
"token_count": True, # Track token usage
"autosave": True, # Automatically save conversations
"save_enabled": True, # Enable saving functionality
}
Accessing Conversation Data¶
# Get conversation history
history = swarm.conversation.return_history_as_string()
# Search for specific content
financial_messages = swarm.conversation.search("financial")
# Export conversation data
swarm.conversation.export_conversation("analysis_session.json")
# Get conversation statistics
stats = swarm.conversation.count_messages_by_role()
token_usage = swarm.conversation.export_and_count_categories()
For complete documentation on conversation management, see the Conversation Structure Documentation.
Conclusion¶
Building custom swarms with proper conversation management enables you to create powerful, scalable, and maintainable multi-agent systems. The integration with the Swarms framework's conversation structure provides:
-
Complete audit trail of all agent interactions
-
Persistent storage options for different deployment scenarios
-
Performance monitoring through token and message tracking
-
Easy debugging with searchable conversation history
-
Scalable architecture that grows with your needs
By following the patterns and best practices outlined in this guide, you can create robust swarms that handle complex tasks efficiently while maintaining full visibility into their operations.
Key Takeaways¶
- Always implement conversation management for tracking and auditing
- Use proper error handling and retries for production resilience
- Implement monitoring and logging for observability
- Design for scalability with concurrent execution patterns
- Test thoroughly with unit tests and integration tests
- Configure appropriately for your deployment environment
For more advanced patterns and examples, explore the Swarms Examples and consider contributing your custom swarms back to the community by submitting a pull request to the Swarms repository.
Additional Resources¶
-
Conversation Structure Documentation - Complete guide to conversation management
-
Agent Documentation - Learn about creating and configuring agents
-
Multi-Agent Architectures - Explore other swarm patterns and architectures
-
Examples Repository - Real-world swarm implementations
-
Swarms Framework GitHub - Source code and contributions