Categories ML-AI Python

Building Custom Chatbots with Atomic-Agents and OpenAI

Introduction

Atomic-agents are modular AI components designed to perform specific tasks within a larger conversational system. Following the Unix philosophy of “do one thing and do it well,” they offer a structured approach to chatbot development that’s both powerful and maintainable.

In this guide, we’ll explore how to build a custom chatbot using atomic-agents integrated with OpenAI’s powerful language models. You’ll learn to create a scalable, maintainable system that can handle complex conversational tasks while remaining easy to debug and enhance.

Project Setup and Installation

Before diving into the implementation, let’s set up our project environment:

# Create a new project directory
mkdir atomic-chatbot
cd atomic-chatbot

# Create a virtual environment
python3 -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Create the project structure
mkdir -p agents/specialized_agents core config tests examples
touch agents/__init__.py agents/base_agent.py 
touch agents/specialized_agents/__init__.py agents/specialized_agents/greeting_agent.py agents/specialized_agents/support_agent.py agents/specialized_agents/custom_agent.py
touch core/__init__.py core/agent_manager.py core/router.py core/context_handler.py core/error_handler.py core/response_formatter.py core/response_cache.py
touch config/__init__.py config/settings.py config/openai_config.py
touch tests/__init__.py tests/agent_tester.py
touch examples/__init__.py examples/basic_usage.py examples/advanced_usage.py
touch atomic_chatbot.py requirements.txt .env.example

Let’s define our dependencies in the requirements.txt file:

openai>=1.0.0
python-dotenv>=0.19.0
pydantic>=2.0.0

Install the dependencies:

pip install -r requirements.txt

Create an environment variables template:

OPENAI_API_KEY=your_api_key_here
OPENAI_ORGANIZATION=your_organization_id_here  # Optional
MODEL_NAME=gpt-3.5-turbo
MAX_TOKENS=2000

Understanding Atomic-Agents

Core Concept and Philosophy

The atomic-agents approach is built on three fundamental principles:

  1. Single-purpose agents: Each agent is designed to excel at one specific task, whether it’s handling user authentication, processing natural language queries, or managing database operations.
  2. Modular design: Agents are independent units that can be developed, tested, and deployed separately, making the system easier to maintain and update.
  3. Composability: Multiple atomic-agents can be combined to create sophisticated conversational flows, similar to building blocks that snap together to form complex structures.

Key Benefits for Chatbot Development

The atomic-agents architecture offers several advantages:

  • Maintainability: Isolating functionality makes it easier to update or fix individual components without affecting the entire system.
  • Scalability: New capabilities can be added by creating new agents rather than modifying existing code.
  • Reusability: Agents can be reused across different projects or chatbot implementations.

Architecture Overview

Core Components

The atomic-agents chatbot architecture consists of four essential components:

  1. Agent Manager: Orchestrates communication between agents and manages their lifecycle.
  2. Context Handler: Maintains conversation context and ensures coherent interactions.
  3. Router: Directs incoming requests to appropriate agents based on intent and context.
  4. Memory System: Stores and retrieves conversation history and user preferences.

Integration with OpenAI

Let’s create a configuration module to set up OpenAI integration:

# config/openai_config.py
from openai import OpenAI
from dotenv import load_dotenv
import os

def configure_openai():
    """Configure OpenAI API settings from environment variables"""
    load_dotenv()
    
    # Set API key in environment variable
    # The OpenAI client will automatically use this env var
    os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "")
    
    # Configure default parameters
    return {
        "model": os.getenv("MODEL_NAME", "gpt-3.5-turbo"),
        "max_tokens": int(os.getenv("MAX_TOKENS", 2000)),
        "temperature": float(os.getenv("TEMPERATURE", 0.7)),
        "timeout": int(os.getenv("API_TIMEOUT", 30))
    }

This code loads environment variables, configures the OpenAI client with your API key, and sets up default parameters for API calls. It returns a dictionary of configuration values that can be used throughout your application.

Hands-on Implementation

Setting Up the Project Structure

Our project structure is now:

atomic-chatbot/
├── agents/
│   ├── __init__.py
│   ├── base_agent.py
│   └── specialized_agents/
│       ├── __init__.py
│       ├── greeting_agent.py
│       ├── support_agent.py
│       └── custom_agent.py
├── core/
│   ├── __init__.py
│   ├── agent_manager.py
│   ├── router.py
│   ├── context_handler.py
│   ├── error_handler.py
│   ├── response_formatter.py
│   └── response_cache.py
├── config/
│   ├── __init__.py
│   ├── settings.py
│   └── openai_config.py
├── tests/
│   ├── __init__.py
│   └── agent_tester.py
├── examples/
│   ├── __init__.py
│   ├── basic_usage.py
│   └── advanced_usage.py
├── .env
├── .env.example
├── atomic_chatbot.py
├── demo.py
└── requirements.txt

Creating the Base Atomic-Agent

Let’s implement the base agent class that all specialized agents will inherit from:

# filepath agents/base_agent.py
from abc import ABC, abstractmethod
from openai import OpenAI
from typing import Dict, Any, Optional

class AtomicAgent(ABC):
    """
    Base class for all atomic agents in the system.
    
    Each atomic agent is designed to perform a specific task and can be
    combined with other agents to create complex conversational flows.
    """
    
    def __init__(self, 
                 task_description: str,
                 model: str = "gpt-3.5-turbo",
                 temperature: float = 0.7):
        """
        Initialize the atomic agent.
        
        Args:
            task_description: Clear description of the agent's specific task
            model: The OpenAI model to use for this agent
            temperature: Controls randomness in responses (0-1)
        """
        self.task = task_description
        self.model = model
        self.temperature = temperature
        self.client = OpenAI()  # Create an instance of the OpenAI client
    
    @abstractmethod
    async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
        """
        Process the input data and return a response.
        
        Args:
            input_data: The user input to process
            context: Optional conversation context
            
        Returns:
            The processed response as a string
        """
        pass
    
    async def _call_openai_api(self, messages: list) -> str:
        """
        Helper method to call the OpenAI API.
        
        Args:
            messages: List of message objects for the API call
            
        Returns:
            The model's response as a string
        """
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=self.temperature
            )
            return response.choices[0].message.content
        except Exception as e:
            # Log error and return graceful error message
            print(f"Error calling OpenAI API: {str(e)}")
            return f"I encountered an issue while processing your request. ({str(e)})"

This base class provides a foundation for all agents in the system. It defines a common interface and provides helper methods for interacting with the OpenAI API. The process method is marked as abstract, meaning all subclasses must implement it.

Creating Your First Specialized Atomic-Agent

Now, let’s create a simple greeting agent:

# filepath agents/specialized_agents/greeting_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional

class GreetingAgent(AtomicAgent):
    """
    A specialized agent for handling greetings and introductions.
    """
    
    def __init__(self):
        super().__init__(
            task_description="You are a friendly assistant that specializes in greeting users and introducing the system. Keep responses brief and welcoming.",
            model="gpt-3.5-turbo",
            temperature=0.5  # Lower temperature for more consistent greetings
        )
    
    async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
        """
        Process greeting-related input and generate appropriate response.
        
        Args:
            input_data: User's greeting message
            context: Optional conversation context
            
        Returns:
            A friendly greeting response
        """
        # For greetings, we want to keep the context minimal
        messages = [
            {"role": "system", "content": self.task},
            {"role": "user", "content": input_data}
        ]
        
        # If this isn't the first interaction, include that info
        if context and context.get('interaction_count', 0) > 1:
            messages[0]["content"] += " This is a returning user, acknowledge that."
            
        return await self._call_openai_api(messages)

This specialized agent handles greeting interactions. It uses a lower temperature setting for more consistent responses and includes logic to recognize returning users.

Building the Agent Manager

Let’s create the agent manager that will coordinate our atomic-agents:

# filepath core/agent_manager.py
from typing import Dict, Any
from agents.base_agent import AtomicAgent

class AgentManager:
    """
    Manages a collection of atomic agents and coordinates their execution.
    
    The AgentManager is responsible for registering agents, tracking their
    state, and executing them when requested by the router.
    """
    
    def __init__(self):
        """Initialize the agent manager with empty collections."""
        self.agents: Dict[str, AtomicAgent] = {}
        self.context: Dict[str, Any] = {}
        self.active_agents: Dict[str, bool] = {}
    
    def register_agent(self, name: str, agent: AtomicAgent) -> None:
        """
        Register a new agent with the manager.
        
        Args:
            name: Unique identifier for the agent
            agent: The agent instance to register
        """
        if name in self.agents:
            raise ValueError(f"Agent with name '{name}' already exists")
        
        self.agents[name] = agent
        self.active_agents[name] = True
        print(f"Registered agent: {name}")
    
    def deactivate_agent(self, name: str) -> None:
        """
        Temporarily disable an agent without removing it.
        
        Args:
            name: The name of the agent to deactivate
        """
        if name not in self.agents:
            raise ValueError(f"Agent '{name}' not found")
        
        self.active_agents[name] = False
        print(f"Deactivated agent: {name}")
    
    def activate_agent(self, name: str) -> None:
        """
        Reactivate a previously deactivated agent.
        
        Args:
            name: The name of the agent to activate
        """
        if name not in self.agents:
            raise ValueError(f"Agent '{name}' not found")
        
        self.active_agents[name] = True
        print(f"Activated agent: {name}")
        
    async def execute_agent(self, agent_name: str, input_data: str) -> str:
        """
        Execute an agent with the given input.
        
        Args:
            agent_name: The name of the agent to execute
            input_data: The input data to process
            
        Returns:
            The agent's response as a string
        """
        if agent_name not in self.agents:
            raise ValueError(f"Agent '{agent_name}' not found")
        
        if not self.active_agents.get(agent_name, False):
            raise ValueError(f"Agent '{agent_name}' is not active")
        
        agent = self.agents[agent_name]
        return await agent.process(input_data, self.context)
    
    def update_context(self, new_context: Dict[str, Any]) -> None:
        """
        Update the shared context available to all agents.
        
        Args:
            new_context: Dictionary of context values to update
        """
        self.context.update(new_context)

The agent manager handles agent registration, activation/deactivation, and execution. It also maintains a shared context that all agents can access, allowing them to share information.

Implementing the Router

Now, let’s build the router that determines which agent should handle each request:

# filepath core/router.py
from core.agent_manager import AgentManager
from typing import Dict, Any
from openai import OpenAI

class Router:
    """
    Routes incoming user requests to the appropriate atomic agent.
    
    The router analyzes user input to determine intent and selects
    the most appropriate agent to handle the request.
    """
    
    def __init__(self, agent_manager: AgentManager, classifier_model: str = "gpt-3.5-turbo"):
        """
        Initialize the router.
        
        Args:
            agent_manager: The agent manager instance
            classifier_model: The model to use for intent classification
        """
        self.agent_manager = agent_manager
        self.classifier_model = classifier_model
        self.intent_map = {}  # Maps intents to agent names
        self.client = OpenAI()  # Create an instance of the OpenAI client
        
    # ... other methods remain the same ...
        
    async def analyze_intent(self, user_input: str) -> str:
        """
        Analyze the user input to determine the intent.
        
        Uses a language model to classify the input into one of the
        registered intents.
        
        Args:
            user_input: The user's input text
            
        Returns:
            The classified intent as a string
        """
        # Get all possible intents
        possible_intents = list(self.intent_map.keys())
        
        # If we have only one intent registered, use it by default
        if len(possible_intents) <= 1:
            return possible_intents[0] if possible_intents else "general"
            
        # Use OpenAI to classify
        try:
            response = self.client.chat.completions.create(
                model=self.classifier_model,
                messages=[
                    {"role": "system", "content": f"Classify the following user input into exactly one of these categories: {', '.join(possible_intents)}. Respond with only the category name."},
                    {"role": "user", "content": user_input}
                ],
                temperature=0.3  # Low temperature for more consistent classification
            )
            intent = response.choices[0].message.content.strip().lower()
            
            # Ensure the returned intent is in our map
            return intent if intent in self.intent_map else "general"
            
        except Exception as e:
            print(f"Error in intent classification: {str(e)}")
            return "general"  # Default intent on error

The router analyzes user input to determine intent and routes it to the appropriate agent. It uses OpenAI’s models for intent classification and maintains a mapping of intents to agent names.

Context Handling

Let’s create a context handler to maintain conversation history and user information:

# filepath core/context_handler.py
from datetime import datetime
from typing import Dict, Any, List, Optional

class ContextHandler:
    """
    Manages conversation context and user information.
    
    The context handler is responsible for maintaining the conversation history,
    tracking user information, and providing relevant context to agents.
    """
    
    def __init__(self, max_history: int = 10):
        """
        Initialize the context handler.
        
        Args:
            max_history: Maximum number of conversation turns to store
        """
        self.conversation_history: List[Dict[str, Any]] = []
        self.user_context: Dict[str, Any] = {}
        self.max_history = max_history
        
    def update_context(self, user_input: str, response: str) -> None:
        """
        Update the context with a new conversation turn.
        
        Args:
            user_input: The user's input message
            response: The system's response
        """
        # Add the new turn to conversation history
        self.conversation_history.append({
            'user': user_input,
            'system': response,
            'timestamp': datetime.now().isoformat()
        })
        
        # Trim history if it exceeds max length
        if len(self.conversation_history) > self.max_history:
            self.conversation_history = self.conversation_history[-self.max_history:]
            
    def set_user_info(self, key: str, value: Any) -> None:
        """
        Set user-specific information.
        
        Args:
            key: The information key
            value: The information value
        """
        self.user_context[key] = value
        
    def get_user_info(self, key: str, default: Any = None) -> Any:
        """
        Get user-specific information.
        
        Args:
            key: The information key
            default: Default value if key not found
            
        Returns:
            The stored value or default
        """
        return self.user_context.get(key, default)
        
    def get_relevant_context(self, query: Optional[str] = None) -> Dict[str, Any]:
        """
        Get context relevant to the current conversation.
        
        Args:
            query: Optional query to filter relevant context
            
        Returns:
            Dictionary containing relevant context information
        """
        # For basic implementation, return recent history and all user context
        context = {
            'recent_history': self.conversation_history[-3:],  # Last 3 interactions
            'user_info': self.user_context,
            'interaction_count': len(self.conversation_history)
        }
        
        return context
        
    def clear_history(self) -> None:
        """Clear the conversation history."""
        self.conversation_history = []

The context handler maintains conversation history and user-specific information. It provides methods to update the context with new conversation turns, set and retrieve user information, and get relevant context for the current conversation.

Error Handling

Let’s create an error handler to manage errors in a graceful way:

# filepath core/error_handler.py
import functools
from typing import Any, Callable, TypeVar, Awaitable
import traceback
from openai import OpenAIError, RateLimitError, AuthenticationError, APIError

# Type for async functions
F = TypeVar('F', bound=Callable[..., Awaitable[Any]])

class ErrorHandler:
    """
    Provides error handling utilities for chatbot operations.
    
    Handles common errors like API rate limits, authentication issues,
    and unexpected exceptions with graceful fallbacks.
    """
    
    @staticmethod
    def async_error_handler(func: F) -> F:
        """
        Decorator for handling errors in async functions.
        
        Args:
            func: The async function to wrap
            
        Returns:
            Wrapped function with error handling
        """
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await func(*args, **kwargs)
            except RateLimitError:
                # Handle rate limit errors
                return "I'm processing too many requests right now. Please try again in a moment."
            except AuthenticationError:
                # Handle authentication errors
                return "I'm having trouble authenticating with my language service. Please contact support."
            except APIError:
                # Handle API errors
                return "I'm having trouble connecting to my language service. Please try again later."
            except Exception as e:
                # Handle unexpected errors
                print(f"Unexpected error: {str(e)}")
                print(traceback.format_exc())
                return "I encountered an unexpected error. Please try again or contact support if the issue persists."
        
        return wrapper  # type: ignore
    
    @staticmethod
    def format_error_response(error: Exception) -> dict:
        """
        Format an error into a standardized response.
        
        Args:
            error: The exception to format
            
        Returns:
            Formatted error response dictionary
        """
        # Determine error type and create appropriate message
        if isinstance(error, RateLimitError):
            error_type = "rate_limit"
            user_message = "The service is experiencing high demand. Please try again shortly."
        elif isinstance(error, AuthenticationError):
            error_type = "authentication"
            user_message = "There's an issue with service authentication. Please contact support."
        elif isinstance(error, APIError):
            error_type = "api"
            user_message = "The service is temporarily unavailable. Please try again later."
        else:
            error_type = "unknown"
            user_message = "An unexpected error occurred. Please try again."
            
        return {
            "success": False,
            "error": {
                "type": error_type,
                "message": user_message,
                "technical_details": str(error)
            }
        }

The error handler provides a decorator for adding consistent error handling to async functions and a method for formatting errors into standardized responses. This ensures that the chatbot can gracefully handle common error conditions.

Response Formatting

Create a utility for standardizing responses:

# filepath core/response_formatter.py
from datetime import datetime
from typing import Any, Dict, Optional

class ResponseFormatter:
    """
    Standardizes response formatting across the system.
    
    Ensures that all responses have a consistent structure,
    making it easier to process and display them.
    """
    
    @staticmethod
    def format_response(
        content: str, 
        metadata: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """
        Format a response with standard metadata.
        
        Args:
            content: The response text content
            metadata: Optional additional metadata
            
        Returns:
            Formatted response dictionary
        """
        response = {
            'content': content,
            'timestamp': datetime.now().isoformat(),
            'status': 'success'
        }
        
        # Add any additional metadata
        if metadata:
            response['metadata'] = metadata
            
        return response
    
    @staticmethod
    def enrich_response(
        response: Dict[str, Any], 
        enrichment_data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Enrich an existing response with additional data.
        
        Args:
            response: The original response dictionary
            enrichment_data: Additional data to add
            
        Returns:
            Enriched response dictionary
        """
        enriched = response.copy()
        
        # Add enrichment data under the 'enrichment' key
        if 'enrichment' not in enriched:
            enriched['enrichment'] = {}
            
        enriched['enrichment'].update(enrichment_data)
        
        return enriched

The response formatter provides methods to standardize the structure of responses across the system. This makes it easier to process and display responses consistently.

Support Agent Implementation

Let’s create a support agent to handle customer support queries:

# filepath agents/specialized_agents/support_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional

class SupportAgent(AtomicAgent):
    """
    A specialized agent for handling customer support queries.
    """
    
    def __init__(self):
        super().__init__(
            task_description="""You are a helpful customer support assistant. 
            Your goal is to help users solve problems with their accounts, 
            orders, or services. Provide clear step-by-step instructions 
            when appropriate. If you don't know the answer to a specific 
            technical question, acknowledge that and offer to escalate 
            to a human support agent.""",
            model="gpt-4",  # Using more capable model for support queries
            temperature=0.3  # Lower temperature for more accurate support
        )
        
        # Knowledge base for common issues
        self.knowledge_base = {
            "password_reset": "To reset your password, go to the login page and click 'Forgot Password'.",
            "account_locked": "If your account is locked, wait 30 minutes and try again or contact support.",
            "refund_policy": "Refunds are processed within 5-7 business days after approval."
        }
    
    async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
        """
        Process support-related queries and provide helpful responses.
        
        Args:
            input_data: User's support question or issue
            context: Optional conversation context
            
        Returns:
            A helpful support response
        """
        # Check if we can answer from knowledge base first
        for keyword, response in self.knowledge_base.items():
            if keyword in input_data.lower():
                # If we have a direct match in our knowledge base
                return response
        
        # Prepare messages for the API call
        messages = [
            {"role": "system", "content": self.task}
        ]
        
        # Add conversation history for context if available
        if context and 'recent_history' in context:
            for exchange in context['recent_history']:
                messages.append({"role": "user", "content": exchange['user']})
                messages.append({"role": "assistant", "content": exchange['system']})
        
        # Add current query
        messages.append({"role": "user", "content": input_data})
        
        # Get response from OpenAI
        return await self._call_openai_api(messages)

Response Caching Implementation

Here’s the implementation of response caching for improved performance:

# filepath core/response_cache.py
import time
from typing import Dict, Any, Optional, Tuple

class ResponseCache:
    """
    Simple cache for storing and retrieving responses.
    
    Helps improve performance by avoiding redundant API calls
    for similar or repeated queries.
    """
    
    def __init__(self, ttl_seconds: int = 3600):
        """
        Initialize the cache.
        
        Args:
            ttl_seconds: Time-to-live for cache entries in seconds
        """
        self.cache: Dict[str, Tuple[Any, float]] = {}
        self.ttl_seconds = ttl_seconds
        
    def get(self, key: str) -> Optional[Any]:
        """
        Get a value from the cache if it exists and is not expired.
        
        Args:
            key: Cache key to look up
            
        Returns:
            Cached value or None if not found/expired
        """
        if key not in self.cache:
            return None
            
        value, timestamp = self.cache[key]
        
        # Check if entry has expired
        if time.time() - timestamp > self.ttl_seconds:
            # Remove expired entry
            del self.cache[key]
            return None
            
        return value
        
    def set(self, key: str, value: Any) -> None:
        """
        Store a value in the cache.
        
        Args:
            key: Cache key
            value: Value to store
        """
        self.cache[key] = (value, time.time())
        
    def clear(self) -> None:
        """Clear the entire cache."""
        self.cache.clear()
        
    def clean_expired(self) -> int:
        """
        Remove all expired entries from the cache.
        
        Returns:
            Number of entries removed
        """
        now = time.time()
        expired_keys = [
            k for k, (_, timestamp) in self.cache.items() 
            if now - timestamp > self.ttl_seconds
        ]
        
        for key in expired_keys:
            del self.cache[key]
            
        return len(expired_keys)

Agent Tester Implementation

For testing your agents:

# filepath tests/agent_tester.py
from agents.base_agent import AtomicAgent
from typing import List, Dict, Any
import asyncio
import json
import time

class AgentTester:
    """
    Test harness for evaluating atomic agents.
    
    Provides tools for running tests against agents and
    evaluating their performance.
    """
    
    @staticmethod
    async def test_agent(agent: AtomicAgent, test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Run a set of test cases against an agent and evaluate the results.
        
        Args:
            agent: The agent to test
            test_cases: List of test case dictionaries
            
        Returns:
            Test results summary
        """
        results = []
        start_time = time.time()
        
        for i, test in enumerate(test_cases):
            print(f"Running test case {i+1}/{len(test_cases)}")
            
            try:
                # Process the input with the agent
                response = await agent.process(test['input'])
                
                # Validate against expected output if provided
                if 'expected' in test:
                    success = test['expected'] in response
                else:
                    # If no expected output, consider successful if we got a response
                    success = bool(response)
                    
                results.append({
                    'test_case': i + 1,
                    'input': test['input'],
                    'response': response,
                    'expected': test.get('expected', 'No expected output specified'),
                    'success': success
                })
                
            except Exception as e:
                results.append({
                    'test_case': i + 1,
                    'input': test['input'],
                    'error': str(e),
                    'success': False
                })
        
        # Calculate test duration and success rate
        total_time = time.time() - start_time
        successful_tests = sum(1 for r in results if r['success'])
        
        summary = {
            'total_tests': len(test_cases),
            'successful_tests': successful_tests,
            'success_rate': successful_tests / len(test_cases) if test_cases else 0,
            'total_time_seconds': total_time,
            'average_time_per_test': total_time / len(test_cases) if test_cases else 0,
            'results': results
        }
        
        return summary
        
    @staticmethod
    def print_test_report(summary: Dict[str, Any]) -> None:
        """
        Print a human-readable test report.
        
        Args:
            summary: Test summary dictionary
        """
        print("\n=== AGENT TEST REPORT ===")
        print(f"Total Tests: {summary['total_tests']}")
        print(f"Successful Tests: {summary['successful_tests']}")
        print(f"Success Rate: {summary['success_rate']:.2%}")
        print(f"Total Time: {summary['total_time_seconds']:.2f} seconds")
        print(f"Average Time: {summary['average_time_per_test']:.2f} seconds per test")
        print("\nDetailed Results:")
        
        for i, result in enumerate(summary['results']):
            print(f"\nTest Case {i+1}:")
            print(f"  Input: {result['input']}")
            if 'error' in result:
                print(f"  ERROR: {result['error']}")
            else:
                print(f"  Response: {result['response'][:100]}..." if len(result['response']) > 100 else f"  Response: {result['response']}")
                print(f"  Expected: {result['expected']}")
                print(f"  Success: {'✅' if result['success'] else '❌'}")

Main Chatbot Class

The main ChatBot class that ties everything together:

# atomic_chatbot.py
from core.agent_manager import AgentManager
from core.router import Router
from core.context_handler import ContextHandler
from core.error_handler import ErrorHandler
from core.response_formatter import ResponseFormatter
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional
import config.openai_config as openai_config

class ChatBot:
    """
    Main chatbot class that orchestrates the entire system.
    
    Integrates the agent manager, router, context handler, and other
    components to create a complete conversational system.
    """
    
    def __init__(self):
        """Initialize the chatbot and its components."""
        # Configure OpenAI
        openai_config.configure_openai()
        
        # Initialize components
        self.agent_manager = AgentManager()
        self.context_handler = ContextHandler()
        self.router = Router(self.agent_manager)
        self.response_formatter = ResponseFormatter()
        
        # Track conversation state
        self.conversation_active = False
        
    def add_agent(self, name: str, agent: AtomicAgent, routing_rules: Optional[Dict[str, Any]] = None) -> None:
        """
        Add an agent to the chatbot.
        
        Args:
            name: Unique name for the agent
            agent: The agent instance
            routing_rules: Optional rules for routing to this agent
        """
        # Register the agent with the agent manager
        self.agent_manager.register_agent(name, agent)
        
        # Register intent with the router
        self.router.register_intent(name, name)
        
        # Apply any custom routing rules
        if routing_rules:
            if 'intent' in routing_rules:
                self.router.register_intent(routing_rules['intent'], name)
                
    @ErrorHandler.async_error_handler
    async def process_message(self, user_input: str) -> Dict[str, Any]:
        """
        Process a user message and return a response.
        
        Args:
            user_input: The user's input text
            
        Returns:
            Formatted response dictionary
        """
        # Start conversation if not active
        if not self.conversation_active:
            self.conversation_active = True
            
        # Provide context to agent manager
        self.agent_manager.update_context(
            self.context_handler.get_relevant_context(user_input)
        )
        
        # Route request to appropriate agent
        response = await self.router.route_request(user_input)
        
        # Update context with new interaction
        self.context_handler.update_context(user_input, response)
        
        # Format and return response
        return self.response_formatter.format_response(
            content=response,
            metadata={
                'conversation_turns': len(self.context_handler.conversation_history)
            }
        )
        
    def start_new_conversation(self) -> None:
        """
        Reset the conversation state to start a new conversation.
        """
        self.context_handler.clear_history()
        self.conversation_active = False

Settings Configuration

Let’s update our settings file with default configuration options:

# filepath config/settings.py
"""
Application-wide settings and configuration values.
"""

# Default models for different agent types
DEFAULT_MODELS = {
    "general": "gpt-3.5-turbo",
    "support": "gpt-4",
    "routing": "gpt-3.5-turbo"
}

# Token budget settings
TOKEN_BUDGET = {
    "max_per_request": 4000,
    "max_per_conversation": 20000
}

# Rate limiting settings
RATE_LIMITS = {
    "requests_per_minute": 60,
    "tokens_per_minute": 90000
}

# Context settings
CONTEXT_SETTINGS = {
    "max_history_turns": 10,
    "max_history_tokens": 2000
}

# System prompt components for reuse
SYSTEM_PROMPTS = {
    "base": "You are an AI assistant helping users with their queries.",
    "greeting": "You are a friendly assistant that greets users and introduces them to the system.",
    "support": "You are a technical support agent helping users solve problems.",
    "routing": "You are a classifier determining the best agent to handle a user query."
}

Custom Agent Implementation

Let’s create a custom agent for domain-specific tasks:

# agents/specialized_agents/custom_agent.py
from agents.base_agent import AtomicAgent
from typing import Dict, Any, Optional

class CustomAgent(AtomicAgent):
    """A custom agent for handling specific domain queries."""
    
    def __init__(self, domain: str, knowledge_base: Dict[str, str]):
        super().__init__(
            task_description=f"You are a specialized assistant for {domain} queries. Provide detailed and accurate information.",
            model="gpt-4",
            temperature=0.2
        )
        self.domain = domain
        self.knowledge_base = knowledge_base
        
    async def process(self, input_data: str, context: Optional[Dict[str, Any]] = None) -> str:
        # Custom processing logic here
        # First check knowledge base
        for key, info in self.knowledge_base.items():
            if key in input_data.lower():
                return f"{info} (From knowledge base)"
        
        # If no match in knowledge base, use the LLM
        messages = [
            {"role": "system", "content": f"{self.task} Answer as a {self.domain} expert."},
            {"role": "user", "content": input_data}
        ]
        
        return await self._call_openai_api(messages)

Basic Usage Example

Here’s a simple example of using the chatbot:

# filepath demo.py
import asyncio
import sys
import os

# Add the parent directory to the path to import the chatbot
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from atomic_chatbot import ChatBot
from agents.specialized_agents.greeting_agent import GreetingAgent
from agents.specialized_agents.support_agent import SupportAgent

async def main():
    # Initialize chatbot
    chatbot = ChatBot()
    
    # Add specialized agents
    chatbot.add_agent('greeting', GreetingAgent())
    chatbot.add_agent('support', SupportAgent(), 
                     routing_rules={'intent': 'help'})
    
    print("Chatbot initialized! Type 'exit' to quit.\n")
    
    # Simple chat loop
    while True:
        user_input = input("\nYou: ")
        
        if user_input.lower() == 'exit':
            print("Goodbye!")
            break
            
        # Process the user input
        response = await chatbot.process_message(user_input)
        
        # Print the response
        print(f"\nBot: {response['content']}")

if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage Example

And a more advanced usage example:

# filepath demo2.py
import asyncio
import sys
import os
import json
from typing import List, Dict

# Add the parent directory to the path to import the chatbot
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from atomic_chatbot import ChatBot
from agents.specialized_agents.greeting_agent import GreetingAgent
from agents.specialized_agents.support_agent import SupportAgent
from agents.specialized_agents.custom_agent import CustomAgent

async def main():
    # Initialize chatbot
    chatbot = ChatBot()
    
    # Add specialized agents
    chatbot.add_agent('greeting', GreetingAgent())
    chatbot.add_agent('support', SupportAgent())
    
    # Add custom domain-specific agent
    finance_knowledge = {
        "stock market": "The stock market refers to public markets for trading company stocks and derivatives.",
        "401k": "A 401(k) is a tax-advantaged retirement savings plan offered by employers.",
        "inflation": "Inflation is the rate at which prices increase over time, resulting in a fall in the purchasing value of money."
    }
    
    finance_agent = CustomAgent("finance", finance_knowledge)
    chatbot.add_agent('finance', finance_agent, 
                     routing_rules={'intent': 'money'})
    
    # Process a conversation
    conversation = [
        "Hello there!",
        "I need help with my 401k options",
        "What about stock market investments?",
        "Can you help me reset my password?"
    ]
    
    print("Starting conversation simulation...\n")
    
    for message in conversation:
        print(f"User: {message}")
        response = await chatbot.process_message(message)
        print(f"Bot: {response['content']}")
        print(f"Metadata: {json.dumps(response.get('metadata', {}), indent=2)}\n")
        
        # Add a small delay to simulate a real conversation
        await asyncio.sleep(1)
        
    print("Conversation simulation completed.")

if __name__ == "__main__":
    asyncio.run(main())

Conclusion

Building chatbots with atomic-agents and OpenAI offers a powerful, maintainable approach to conversational AI. The modular nature of atomic-agents makes it easier to develop, test, and scale your chatbot while maintaining code quality and system reliability.

Key takeaways:

  • Atomic-agents provide a modular, maintainable approach to chatbot development
  • OpenAI integration enables powerful language understanding capabilities
  • Proper architecture and implementation patterns are crucial for success
  • Regular testing and monitoring ensure reliable performance

More From Author

You May Also Like