Introduction
Atomic-agents are modular AI components designed to perform specific tasks within a larger conversational system. Following the Unix philosophy of “do one thing and do it well,” they offer a structured approach to chatbot development that’s both powerful and maintainable.
In this guide, we’ll explore how to build a custom chatbot using atomic-agents integrated with OpenAI’s powerful language models. You’ll learn to create a scalable, maintainable system that can handle complex conversational tasks while remaining easy to debug and enhance.
Project Setup and Installation
Before diving into the implementation, let’s set up our project environment:
# Create a new project directory
mkdir atomic-chatbot
cd atomic-chatbot
# Create a virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Create the project structure
mkdir -p agents/specialized_agents core config tests examples
touch agents/__init__.py agents/base_agent.py
touch agents/specialized_agents/__init__.py agents/specialized_agents/greeting_agent.py agents/specialized_agents/support_agent.py agents/specialized_agents/custom_agent.py
touch core/__init__.py core/agent_manager.py core/router.py core/context_handler.py core/error_handler.py core/response_formatter.py core/response_cache.py
touch config/__init__.py config/settings.py config/openai_config.py
touch tests/__init__.py tests/agent_tester.py
touch examples/__init__.py examples/basic_usage.py examples/advanced_usage.py
touch atomic_chatbot.py requirements.txt .env.example
Let’s define our dependencies in the requirements.txt file:
openai>=1.0.0
python-dotenv>=0.19.0
pydantic>=2.0.0
Install the dependencies:
pip install -r requirements.txt
Create an environment variables template:
OPENAI_API_KEY=your_api_key_here
OPENAI_ORGANIZATION=your_organization_id_here # Optional
MODEL_NAME=gpt-3.5-turbo
MAX_TOKENS=2000
Understanding Atomic-Agents
Core Concept and Philosophy
The atomic-agents approach is built on three fundamental principles:
- Single-purpose agents: Each agent is designed to excel at one specific task, whether it’s handling user authentication, processing natural language queries, or managing database operations.
- Modular design: Agents are independent units that can be developed, tested, and deployed separately, making the system easier to maintain and update.
- Composability: Multiple atomic-agents can be combined to create sophisticated conversational flows, similar to building blocks that snap together to form complex structures.
Key Benefits for Chatbot Development
The atomic-agents architecture offers several advantages:
- Maintainability: Isolating functionality makes it easier to update or fix individual components without affecting the entire system.
- Scalability: New capabilities can be added by creating new agents rather than modifying existing code.
- Reusability: Agents can be reused across different projects or chatbot implementations.
Architecture Overview
Core Components
The atomic-agents chatbot architecture consists of four essential components:
- Agent Manager: Orchestrates communication between agents and manages their lifecycle.
- Context Handler: Maintains conversation context and ensures coherent interactions.
- Router: Directs incoming requests to appropriate agents based on intent and context.
- Memory System: Stores and retrieves conversation history and user preferences.
Integration with OpenAI
Let’s create a configuration module to set up OpenAI integration:
# config/openai_config.py
from openai import OpenAI
from dotenv import load_dotenv
import os
def configure_openai():
"""Configure OpenAI API settings from environment variables"""
load_dotenv()
# Set API key in environment variable
# The OpenAI client will automatically use this env var
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")
# Configure default parameters
return {
"model": os.getenv("MODEL_NAME", "gpt-3.5-turbo"),
"max_tokens": int(os.getenv("MAX_TOKENS", 2000)),
"temperature": float(os.getenv("TEMPERATURE", 0.7)),
"timeout": int(os.getenv("API_TIMEOUT", 30))
}
This code loads environment variables, configures the OpenAI client with your API key, and sets up default parameters for API calls. It returns a dictionary of configuration values that can be used throughout your application.
Hands-on Implementation
Setting Up the Project Structure
Our project structure is now:
atomic-chatbot/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ └── specialized_agents/
│ ├── __init__.py
│ ├── greeting_agent.py
│ ├── support_agent.py
│ └── custom_agent.py
├── core/
│ ├── __init__.py
│ ├── agent_manager.py
│ ├── router.py
│ ├── context_handler.py
│ ├── error_handler.py
│ ├── response_formatter.py
│ └── response_cache.py
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── openai_config.py
├── tests/
│ ├── __init__.py
│ └── agent_tester.py
├── examples/
│ ├── __init__.py
│ ├── basic_usage.py
│ └── advanced_usage.py
├── .env
├── .env.example
├── atomic_chatbot.py
├── demo.py
└── requirements.txt
Creating the Base Atomic-Agent
Let’s implement the base agent class that all specialized agents will inherit from:
# filepath agents/base_agent.py
from abc import ABC, abstractmethod
from openai import OpenAI
from typing import Dict, Any, Optional
class AtomicAgent(ABC):
"""
Base class for all atomic agents in the system.
Each atomic agent is designed to perform a specific task and can be
combined with other agents to create complex conversational flows.
"""
def __init__(self,
task_description: str,
model: str = "gpt-3.5-turbo",
temperature: float = 0.7):
"""
Initialize the atomic agent.
Args:
task_description: Clear description of the agent's specific task
model: The OpenAI model to use for this agent
temperature: Controls randomness in responses (0-1)
"""
self.task = task_description
self.model = model
self.temperature = temperature
self.client = OpenAI() # Create an instance of the OpenAI client
@abstractmethod
async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Process the input data and return a response.
Args:
input_data: The user input to process
context: Optional conversation context
Returns:
The processed response as a string
"""
pass
async def _call_openai_api(self, messages: list) -> str:
"""
Helper method to call the OpenAI API.
Args:
messages: List of message objects for the API call
Returns:
The model's response as a string
"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature
)
return response.choices[0].message.content
except Exception as e:
# Log error and return graceful error message
print(f"Error calling OpenAI API: {str(e)}")
return f"I encountered an issue while processing your request. ({str(e)})"
This base class provides a foundation for all agents in the system. It defines a common interface and provides helper methods for interacting with the OpenAI API. The process
method is marked as abstract, meaning all subclasses must implement it.
Creating Your First Specialized Atomic-Agent
Now, let’s create a simple greeting agent:
# filepath agents/specialized_agents/greeting_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional
class GreetingAgent(AtomicAgent):
"""
A specialized agent for handling greetings and introductions.
"""
def __init__(self):
super().__init__(
task_description="You are a friendly assistant that specializes in greeting users and introducing the system. Keep responses brief and welcoming.",
model="gpt-3.5-turbo",
temperature=0.5 # Lower temperature for more consistent greetings
)
async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Process greeting-related input and generate appropriate response.
Args:
input_data: User's greeting message
context: Optional conversation context
Returns:
A friendly greeting response
"""
# For greetings, we want to keep the context minimal
messages = [
{"role": "system", "content": self.task},
{"role": "user", "content": input_data}
]
# If this isn't the first interaction, include that info
if context and context.get('interaction_count', 0) > 1:
messages[0]["content"] += " This is a returning user, acknowledge that."
return await self._call_openai_api(messages)
This specialized agent handles greeting interactions. It uses a lower temperature setting for more consistent responses and includes logic to recognize returning users.
Building the Agent Manager
Let’s create the agent manager that will coordinate our atomic-agents:
# filepath core/agent_manager.py
from typing import Dict, Any
from agents.base_agent import AtomicAgent
class AgentManager:
"""
Manages a collection of atomic agents and coordinates their execution.
The AgentManager is responsible for registering agents, tracking their
state, and executing them when requested by the router.
"""
def __init__(self):
"""Initialize the agent manager with empty collections."""
self.agents: Dict[str, AtomicAgent] = {}
self.context: Dict[str, Any] = {}
self.active_agents: Dict[str, bool] = {}
def register_agent(self, name: str, agent: AtomicAgent) -> None:
"""
Register a new agent with the manager.
Args:
name: Unique identifier for the agent
agent: The agent instance to register
"""
if name in self.agents:
raise ValueError(f"Agent with name '{name}' already exists")
self.agents[name] = agent
self.active_agents[name] = True
print(f"Registered agent: {name}")
def deactivate_agent(self, name: str) -> None:
"""
Temporarily disable an agent without removing it.
Args:
name: The name of the agent to deactivate
"""
if name not in self.agents:
raise ValueError(f"Agent '{name}' not found")
self.active_agents[name] = False
print(f"Deactivated agent: {name}")
def activate_agent(self, name: str) -> None:
"""
Reactivate a previously deactivated agent.
Args:
name: The name of the agent to activate
"""
if name not in self.agents:
raise ValueError(f"Agent '{name}' not found")
self.active_agents[name] = True
print(f"Activated agent: {name}")
async def execute_agent(self, agent_name: str, input_data: str) -> str:
"""
Execute an agent with the given input.
Args:
agent_name: The name of the agent to execute
input_data: The input data to process
Returns:
The agent's response as a string
"""
if agent_name not in self.agents:
raise ValueError(f"Agent '{agent_name}' not found")
if not self.active_agents.get(agent_name, False):
raise ValueError(f"Agent '{agent_name}' is not active")
agent = self.agents[agent_name]
return await agent.process(input_data, self.context)
def update_context(self, new_context: Dict[str, Any]) -> None:
"""
Update the shared context available to all agents.
Args:
new_context: Dictionary of context values to update
"""
self.context.update(new_context)
The agent manager handles agent registration, activation/deactivation, and execution. It also maintains a shared context that all agents can access, allowing them to share information.
Implementing the Router
Now, let’s build the router that determines which agent should handle each request:
# filepath core/router.py
from core.agent_manager import AgentManager
from typing import Dict, Any
from openai import OpenAI
class Router:
"""
Routes incoming user requests to the appropriate atomic agent.
The router analyzes user input to determine intent and selects
the most appropriate agent to handle the request.
"""
def __init__(self, agent_manager: AgentManager, classifier_model: str = "gpt-3.5-turbo"):
"""
Initialize the router.
Args:
agent_manager: The agent manager instance
classifier_model: The model to use for intent classification
"""
self.agent_manager = agent_manager
self.classifier_model = classifier_model
self.intent_map = {} # Maps intents to agent names
self.client = OpenAI() # Create an instance of the OpenAI client
# ... other methods remain the same ...
async def analyze_intent(self, user_input: str) -> str:
"""
Analyze the user input to determine the intent.
Uses a language model to classify the input into one of the
registered intents.
Args:
user_input: The user's input text
Returns:
The classified intent as a string
"""
# Get all possible intents
possible_intents = list(self.intent_map.keys())
# If we have only one intent registered, use it by default
if len(possible_intents) <= 1:
return possible_intents[0] if possible_intents else "general"
# Use OpenAI to classify
try:
response = self.client.chat.completions.create(
model=self.classifier_model,
messages=[
{"role": "system", "content": f"Classify the following user input into exactly one of these categories: {', '.join(possible_intents)}. Respond with only the category name."},
{"role": "user", "content": user_input}
],
temperature=0.3 # Low temperature for more consistent classification
)
intent = response.choices[0].message.content.strip().lower()
# Ensure the returned intent is in our map
return intent if intent in self.intent_map else "general"
except Exception as e:
print(f"Error in intent classification: {str(e)}")
return "general" # Default intent on error
The router analyzes user input to determine intent and routes it to the appropriate agent. It uses OpenAI’s models for intent classification and maintains a mapping of intents to agent names.
Context Handling
Let’s create a context handler to maintain conversation history and user information:
# filepath core/context_handler.py
from datetime import datetime
from typing import Dict, Any, List, Optional
class ContextHandler:
"""
Manages conversation context and user information.
The context handler is responsible for maintaining the conversation history,
tracking user information, and providing relevant context to agents.
"""
def __init__(self, max_history: int = 10):
"""
Initialize the context handler.
Args:
max_history: Maximum number of conversation turns to store
"""
self.conversation_history: List[Dict[str, Any]] = []
self.user_context: Dict[str, Any] = {}
self.max_history = max_history
def update_context(self, user_input: str, response: str) -> None:
"""
Update the context with a new conversation turn.
Args:
user_input: The user's input message
response: The system's response
"""
# Add the new turn to conversation history
self.conversation_history.append({
'user': user_input,
'system': response,
'timestamp': datetime.now().isoformat()
})
# Trim history if it exceeds max length
if len(self.conversation_history) > self.max_history:
self.conversation_history = self.conversation_history[-self.max_history:]
def set_user_info(self, key: str, value: Any) -> None:
"""
Set user-specific information.
Args:
key: The information key
value: The information value
"""
self.user_context[key] = value
def get_user_info(self, key: str, default: Any = None) -> Any:
"""
Get user-specific information.
Args:
key: The information key
default: Default value if key not found
Returns:
The stored value or default
"""
return self.user_context.get(key, default)
def get_relevant_context(self, query: Optional[str] = None) -> Dict[str, Any]:
"""
Get context relevant to the current conversation.
Args:
query: Optional query to filter relevant context
Returns:
Dictionary containing relevant context information
"""
# For basic implementation, return recent history and all user context
context = {
'recent_history': self.conversation_history[-3:], # Last 3 interactions
'user_info': self.user_context,
'interaction_count': len(self.conversation_history)
}
return context
def clear_history(self) -> None:
"""Clear the conversation history."""
self.conversation_history = []
The context handler maintains conversation history and user-specific information. It provides methods to update the context with new conversation turns, set and retrieve user information, and get relevant context for the current conversation.
Error Handling
Let’s create an error handler to manage errors in a graceful way:
# filepath core/error_handler.py
import functools
from typing import Any, Callable, TypeVar, Awaitable
import traceback
from openai import OpenAIError, RateLimitError, AuthenticationError, APIError
# Type for async functions
F = TypeVar('F', bound=Callable[..., Awaitable[Any]])
class ErrorHandler:
"""
Provides error handling utilities for chatbot operations.
Handles common errors like API rate limits, authentication issues,
and unexpected exceptions with graceful fallbacks.
"""
@staticmethod
def async_error_handler(func: F) -> F:
"""
Decorator for handling errors in async functions.
Args:
func: The async function to wrap
Returns:
Wrapped function with error handling
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except RateLimitError:
# Handle rate limit errors
return "I'm processing too many requests right now. Please try again in a moment."
except AuthenticationError:
# Handle authentication errors
return "I'm having trouble authenticating with my language service. Please contact support."
except APIError:
# Handle API errors
return "I'm having trouble connecting to my language service. Please try again later."
except Exception as e:
# Handle unexpected errors
print(f"Unexpected error: {str(e)}")
print(traceback.format_exc())
return "I encountered an unexpected error. Please try again or contact support if the issue persists."
return wrapper # type: ignore
@staticmethod
def format_error_response(error: Exception) -> dict:
"""
Format an error into a standardized response.
Args:
error: The exception to format
Returns:
Formatted error response dictionary
"""
# Determine error type and create appropriate message
if isinstance(error, RateLimitError):
error_type = "rate_limit"
user_message = "The service is experiencing high demand. Please try again shortly."
elif isinstance(error, AuthenticationError):
error_type = "authentication"
user_message = "There's an issue with service authentication. Please contact support."
elif isinstance(error, APIError):
error_type = "api"
user_message = "The service is temporarily unavailable. Please try again later."
else:
error_type = "unknown"
user_message = "An unexpected error occurred. Please try again."
return {
"success": False,
"error": {
"type": error_type,
"message": user_message,
"technical_details": str(error)
}
}
The error handler provides a decorator for adding consistent error handling to async functions and a method for formatting errors into standardized responses. This ensures that the chatbot can gracefully handle common error conditions.
Response Formatting
Create a utility for standardizing responses:
# filepath core/response_formatter.py
from datetime import datetime
from typing import Any, Dict, Optional
class ResponseFormatter:
"""
Standardizes response formatting across the system.
Ensures that all responses have a consistent structure,
making it easier to process and display them.
"""
@staticmethod
def format_response(
content: str,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Format a response with standard metadata.
Args:
content: The response text content
metadata: Optional additional metadata
Returns:
Formatted response dictionary
"""
response = {
'content': content,
'timestamp': datetime.now().isoformat(),
'status': 'success'
}
# Add any additional metadata
if metadata:
response['metadata'] = metadata
return response
@staticmethod
def enrich_response(
response: Dict[str, Any],
enrichment_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Enrich an existing response with additional data.
Args:
response: The original response dictionary
enrichment_data: Additional data to add
Returns:
Enriched response dictionary
"""
enriched = response.copy()
# Add enrichment data under the 'enrichment' key
if 'enrichment' not in enriched:
enriched['enrichment'] = {}
enriched['enrichment'].update(enrichment_data)
return enriched
The response formatter provides methods to standardize the structure of responses across the system. This makes it easier to process and display responses consistently.
Support Agent Implementation
Let’s create a support agent to handle customer support queries:
# filepath agents/specialized_agents/support_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional
class SupportAgent(AtomicAgent):
"""
A specialized agent for handling customer support queries.
"""
def __init__(self):
super().__init__(
task_description="""You are a helpful customer support assistant.
Your goal is to help users solve problems with their accounts,
orders, or services. Provide clear step-by-step instructions
when appropriate. If you don't know the answer to a specific
technical question, acknowledge that and offer to escalate
to a human support agent.""",
model="gpt-4", # Using more capable model for support queries
temperature=0.3 # Lower temperature for more accurate support
)
# Knowledge base for common issues
self.knowledge_base = {
"password_reset": "To reset your password, go to the login page and click 'Forgot Password'.",
"account_locked": "If your account is locked, wait 30 minutes and try again or contact support.",
"refund_policy": "Refunds are processed within 5-7 business days after approval."
}
async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
"""
Process support-related queries and provide helpful responses.
Args:
input_data: User's support question or issue
context: Optional conversation context
Returns:
A helpful support response
"""
# Check if we can answer from knowledge base first
for keyword, response in self.knowledge_base.items():
if keyword in input_data.lower():
# If we have a direct match in our knowledge base
return response
# Prepare messages for the API call
messages = [
{"role": "system", "content": self.task}
]
# Add conversation history for context if available
if context and 'recent_history' in context:
for exchange in context['recent_history']:
messages.append({"role": "user", "content": exchange['user']})
messages.append({"role": "assistant", "content": exchange['system']})
# Add current query
messages.append({"role": "user", "content": input_data})
# Get response from OpenAI
return await self._call_openai_api(messages)
Response Caching Implementation
Here’s the implementation of response caching for improved performance:
# filepath core/response_cache.py
import time
from typing import Dict, Any, Optional, Tuple
class ResponseCache:
"""
Simple cache for storing and retrieving responses.
Helps improve performance by avoiding redundant API calls
for similar or repeated queries.
"""
def __init__(self, ttl_seconds: int = 3600):
"""
Initialize the cache.
Args:
ttl_seconds: Time-to-live for cache entries in seconds
"""
self.cache: Dict[str, Tuple[Any, float]] = {}
self.ttl_seconds = ttl_seconds
def get(self, key: str) -> Optional[Any]:
"""
Get a value from the cache if it exists and is not expired.
Args:
key: Cache key to look up
Returns:
Cached value or None if not found/expired
"""
if key not in self.cache:
return None
value, timestamp = self.cache[key]
# Check if entry has expired
if time.time() - timestamp > self.ttl_seconds:
# Remove expired entry
del self.cache[key]
return None
return value
def set(self, key: str, value: Any) -> None:
"""
Store a value in the cache.
Args:
key: Cache key
value: Value to store
"""
self.cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear the entire cache."""
self.cache.clear()
def clean_expired(self) -> int:
"""
Remove all expired entries from the cache.
Returns:
Number of entries removed
"""
now = time.time()
expired_keys = [
k for k, (_, timestamp) in self.cache.items()
if now - timestamp > self.ttl_seconds
]
for key in expired_keys:
del self.cache[key]
return len(expired_keys)
Agent Tester Implementation
For testing your agents:
# filepath tests/agent_tester.py
from agents.base_agent import AtomicAgent
from typing import List, Dict, Any
import asyncio
import json
import time
class AgentTester:
"""
Test harness for evaluating atomic agents.
Provides tools for running tests against agents and
evaluating their performance.
"""
@staticmethod
async def test_agent(agent: AtomicAgent, test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Run a set of test cases against an agent and evaluate the results.
Args:
agent: The agent to test
test_cases: List of test case dictionaries
Returns:
Test results summary
"""
results = []
start_time = time.time()
for i, test in enumerate(test_cases):
print(f"Running test case {i+1}/{len(test_cases)}")
try:
# Process the input with the agent
response = await agent.process(test['input'])
# Validate against expected output if provided
if 'expected' in test:
success = test['expected'] in response
else:
# If no expected output, consider successful if we got a response
success = bool(response)
results.append({
'test_case': i + 1,
'input': test['input'],
'response': response,
'expected': test.get('expected', 'No expected output specified'),
'success': success
})
except Exception as e:
results.append({
'test_case': i + 1,
'input': test['input'],
'error': str(e),
'success': False
})
# Calculate test duration and success rate
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r['success'])
summary = {
'total_tests': len(test_cases),
'successful_tests': successful_tests,
'success_rate': successful_tests / len(test_cases) if test_cases else 0,
'total_time_seconds': total_time,
'average_time_per_test': total_time / len(test_cases) if test_cases else 0,
'results': results
}
return summary
@staticmethod
def print_test_report(summary: Dict[str, Any]) -> None:
"""
Print a human-readable test report.
Args:
summary: Test summary dictionary
"""
print("\n=== AGENT TEST REPORT ===")
print(f"Total Tests: {summary['total_tests']}")
print(f"Successful Tests: {summary['successful_tests']}")
print(f"Success Rate: {summary['success_rate']:.2%}")
print(f"Total Time: {summary['total_time_seconds']:.2f} seconds")
print(f"Average Time: {summary['average_time_per_test']:.2f} seconds per test")
print("\nDetailed Results:")
for i, result in enumerate(summary['results']):
print(f"\nTest Case {i+1}:")
print(f" Input: {result['input']}")
if 'error' in result:
print(f" ERROR: {result['error']}")
else:
print(f" Response: {result['response'][:100]}..." if len(result['response']) > 100 else f" Response: {result['response']}")
print(f" Expected: {result['expected']}")
print(f" Success: {'✅' if result['success'] else '❌'}")
Main Chatbot Class
The main ChatBot class that ties everything together:
# atomic_chatbot.py
from core.agent_manager import AgentManager
from core.router import Router
from core.context_handler import ContextHandler
from core.error_handler import ErrorHandler
from core.response_formatter import ResponseFormatter
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional
import config.openai_config as openai_config
class ChatBot:
"""
Main chatbot class that orchestrates the entire system.
Integrates the agent manager, router, context handler, and other
components to create a complete conversational system.
"""
def __init__(self):
"""Initialize the chatbot and its components."""
# Configure OpenAI
openai_config.configure_openai()
# Initialize components
self.agent_manager = AgentManager()
self.context_handler = ContextHandler()
self.router = Router(self.agent_manager)
self.response_formatter = ResponseFormatter()
# Track conversation state
self.conversation_active = False
def add_agent(self, name: str, agent: AtomicAgent, routing_rules: Optional[Dict[str, Any]] = None) -> None:
"""
Add an agent to the chatbot.
Args:
name: Unique name for the agent
agent: The agent instance
routing_rules: Optional rules for routing to this agent
"""
# Register the agent with the agent manager
self.agent_manager.register_agent(name, agent)
# Register intent with the router
self.router.register_intent(name, name)
# Apply any custom routing rules
if routing_rules:
if 'intent' in routing_rules:
self.router.register_intent(routing_rules['intent'], name)
@ErrorHandler.async_error_handler
async def process_message(self, user_input: str) -> Dict[str, Any]:
"""
Process a user message and return a response.
Args:
user_input: The user's input text
Returns:
Formatted response dictionary
"""
# Start conversation if not active
if not self.conversation_active:
self.conversation_active = True
# Provide context to agent manager
self.agent_manager.update_context(
self.context_handler.get_relevant_context(user_input)
)
# Route request to appropriate agent
response = await self.router.route_request(user_input)
# Update context with new interaction
self.context_handler.update_context(user_input, response)
# Format and return response
return self.response_formatter.format_response(
content=response,
metadata={
'conversation_turns': len(self.context_handler.conversation_history)
}
)
def start_new_conversation(self) -> None:
"""
Reset the conversation state to start a new conversation.
"""
self.context_handler.clear_history()
self.conversation_active = False
Settings Configuration
Let’s update our settings file with default configuration options:
# filepath config/settings.py
"""
Application-wide settings and configuration values.
"""
# Default models for different agent types
DEFAULT_MODELS = {
"general": "gpt-3.5-turbo",
"support": "gpt-4",
"routing": "gpt-3.5-turbo"
}
# Token budget settings
TOKEN_BUDGET = {
"max_per_request": 4000,
"max_per_conversation": 20000
}
# Rate limiting settings
RATE_LIMITS = {
"requests_per_minute": 60,
"tokens_per_minute": 90000
}
# Context settings
CONTEXT_SETTINGS = {
"max_history_turns": 10,
"max_history_tokens": 2000
}
# System prompt components for reuse
SYSTEM_PROMPTS = {
"base": "You are an AI assistant helping users with their queries.",
"greeting": "You are a friendly assistant that greets users and introduces them to the system.",
"support": "You are a technical support agent helping users solve problems.",
"routing": "You are a classifier determining the best agent to handle a user query."
}
Custom Agent Implementation
Let’s create a custom agent for domain-specific tasks:
# agents/specialized_agents/custom_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional
class CustomAgent(AtomicAgent):
"""A custom agent for handling specific domain queries."""
def __init__(self, domain: str, knowledge_base: Dict[str, str]):
super().__init__(
task_description=f"You are a specialized assistant for {domain} queries. Provide detailed and accurate information.",
model="gpt-4",
temperature=0.2
)
self.domain = domain
self.knowledge_base = knowledge_base
async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
# Custom processing logic here
# First check knowledge base
for key, info in self.knowledge_base.items():
if key in input_data.lower():
return f"{info} (From knowledge base)"
# If no match in knowledge base, use the LLM
messages = [
{"role": "system", "content": f"{self.task} Answer as a {self.domain} expert."},
{"role": "user", "content": input_data}
]
return await self._call_openai_api(messages)
Basic Usage Example
Here’s a simple example of using the chatbot:
# filepath demo.py
import asyncio
import sys
import os
# Add the parent directory to the path to import the chatbot
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from atomic_chatbot import ChatBot
from agents.specialized_agents.greeting_agent import GreetingAgent
from agents.specialized_agents.support_agent import SupportAgent
async def main():
# Initialize chatbot
chatbot = ChatBot()
# Add specialized agents
chatbot.add_agent('greeting', GreetingAgent())
chatbot.add_agent('support', SupportAgent(),
routing_rules={'intent': 'help'})
print("Chatbot initialized! Type 'exit' to quit.\n")
# Simple chat loop
while True:
user_input = input("\nYou: ")
if user_input.lower() == 'exit':
print("Goodbye!")
break
# Process the user input
response = await chatbot.process_message(user_input)
# Print the response
print(f"\nBot: {response['content']}")
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage Example
And a more advanced usage example:
# filepath demo2.py
import asyncio
import sys
import os
import json
from typing import List, Dict
# Add the parent directory to the path to import the chatbot
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from atomic_chatbot import ChatBot
from agents.specialized_agents.greeting_agent import GreetingAgent
from agents.specialized_agents.support_agent import SupportAgent
from agents.specialized_agents.custom_agent import CustomAgent
async def main():
# Initialize chatbot
chatbot = ChatBot()
# Add specialized agents
chatbot.add_agent('greeting', GreetingAgent())
chatbot.add_agent('support', SupportAgent())
# Add custom domain-specific agent
finance_knowledge = {
"stock market": "The stock market refers to public markets for trading company stocks and derivatives.",
"401k": "A 401(k) is a tax-advantaged retirement savings plan offered by employers.",
"inflation": "Inflation is the rate at which prices increase over time, resulting in a fall in the purchasing value of money."
}
finance_agent = CustomAgent("finance", finance_knowledge)
chatbot.add_agent('finance', finance_agent,
routing_rules={'intent': 'money'})
# Process a conversation
conversation = [
"Hello there!",
"I need help with my 401k options",
"What about stock market investments?",
"Can you help me reset my password?"
]
print("Starting conversation simulation...\n")
for message in conversation:
print(f"User: {message}")
response = await chatbot.process_message(message)
print(f"Bot: {response['content']}")
print(f"Metadata: {json.dumps(response.get('metadata', {}), indent=2)}\n")
# Add a small delay to simulate a real conversation
await asyncio.sleep(1)
print("Conversation simulation completed.")
if __name__ == "__main__":
asyncio.run(main())
Conclusion
Building chatbots with atomic-agents and OpenAI offers a powerful, maintainable approach to conversational AI. The modular nature of atomic-agents makes it easier to develop, test, and scale your chatbot while maintaining code quality and system reliability.
Key takeaways:
- Atomic-agents provide a modular, maintainable approach to chatbot development
- OpenAI integration enables powerful language understanding capabilities
- Proper architecture and implementation patterns are crucial for success
- Regular testing and monitoring ensure reliable performance