System Design: Building a Production-Ready AI Chatbot (End-to-End)

Param Harrison
23 min read

Share this post

The Challenge

"Build a chatbot" is the new "Build a URL shortener." It sounds like a junior interview question, but in 2024, it is a distributed systems minefield.

If you just write a Python script that hits the OpenAI API, you are building a toy. A Production AI Chatbot needs to handle:

  • Streaming: No one waits 10 seconds for a spinner.
  • Memory: "Wait, who am I?" (State Management).
  • Concurrency: 1,000 users chatting simultaneously without blocking threads.
  • Reliability: What happens when the LLM hallucinates or times out?

Discussion: How do you design a system that feels instant, remembers context, scales to thousands of concurrent users, and gracefully handles failures?

This post breaks down the end-to-end architecture of a scalable, enterprise-grade AI chatbot.

1. The High-Level Architecture: Why Not Just a Script?

Think of building a chatbot like building a restaurant. You could just set up a food truck (a simple script), but if you want to serve thousands of customers, you need a proper restaurant with different stations: the front door (entry point), the kitchen (processing), the storage (memory), and the delivery system (streaming).

We are not building a monolithic script. We are building an Event-Driven, Asynchronous System — meaning different parts work independently and communicate through events.

graph TD
    subgraph ClientSide["Client Side"]
        Browser[React / Next.js UI]
    end

    subgraph EdgeLayer["The Edge Entry Point"]
        LB[Load Balancer / Cloudflare]
        API[API Gateway]
    end

    subgraph AppLayer["The Application Layer FastAPI"]
        ChatService[Chat Service Async]
    end

    subgraph AILayer["The Brain AI Layer"]
        Orchestrator[Agent Orchestrator LangGraph]
        RAG[Vector Search]
    end

    subgraph PersistenceLayer["The Memory Persistence"]
        Postgres[(Postgres Chat History)]
        Redis[(Redis Cache Rate Limits)]
        VectorDB[(Qdrant/Pinecone)]
    end

    Browser -->|"1. POST /message"| LB
    LB --> API
    API --> ChatService
    
    ChatService -->|"2. Retrieve Context"| Postgres
    ChatService -->|"3. Check Limits"| Redis
    
    ChatService -->|"4. Invoke Agent"| Orchestrator
    Orchestrator -->|"5. Search Knowledge"| RAG
    RAG --> VectorDB
    
    Orchestrator -->|"6. Stream Response"| ChatService
    ChatService -->|"7. SSE Stream"| Browser

What Each Layer Does (In Simple Terms)

  • Client Side (Browser): The chat window you see. It shows messages instantly and handles errors gracefully.
  • Edge Layer (Load Balancer): The front door. Routes requests, checks authentication, and protects against attacks.
  • Application Layer (Chat Service): The brain that coordinates everything. Manages state and orchestrates the flow.
  • AI Layer (Orchestrator + RAG): The actual AI. Decides what to do and searches knowledge when needed.
  • Persistence Layer (Databases): The memory. Stores chat history, caches data, and tracks rate limits.

Why This Matters: If one part breaks or gets slow, the others keep working. It's like having separate stations in a restaurant — if the dessert station is slow, the main course can still be served.

2. The Tech Stack: Choosing Your Tools

When building a chatbot, you need to make choices about what technologies to use. Let's understand why we pick each tool, not just what it is.

Frontend: React + Tailwind + Vercel AI SDK

Why React? React lets you build reusable components. Think of it like LEGO blocks — you build a message bubble once, then reuse it everywhere.

Why Tailwind? Tailwind is like having a design system built-in. Instead of writing custom CSS, you use pre-made classes. It's faster and more consistent.

The Secret Weapon: Vercel AI SDK

Here's the thing: streaming text from a server is tricky. You need to:

  • Parse incoming chunks
  • Handle reconnections if the connection drops
  • Manage message state
  • Show errors gracefully

The Vercel AI SDK does all of this for you. Don't write your own stream parser — it's like building your own HTTP client when you could just use fetch().

import { useChat } from 'ai/react';

function ChatComponent() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: '/api/chat',
    onError: (error) => {
      // Handle errors gracefully
      console.error('Chat error:', error);
    },
  });

  return (
    <div>
      {messages.map((message) => (
        <div key={message.id}>{message.content}</div>
      ))}
      <form onSubmit={handleSubmit}>
        <input value={input} onChange={handleInputChange} />
      </form>
    </div>
  );
}

Why not build your own? The SDK handles:

  • SSE event parsing
  • Automatic reconnection
  • Message state management
  • Error recovery
  • Token accumulation

Communication: Server-Sent Events (SSE) vs WebSockets

Imagine you're at a restaurant. There are two ways the waiter could bring you food:

  1. SSE (Server-Sent Events): Like a conveyor belt sushi restaurant. Food comes to you automatically, one piece at a time. You can't send food back, but you get a steady stream.

  2. WebSockets: Like a regular restaurant where you can call the waiter back anytime. Two-way communication, but more complex to set up.

For chatbots, SSE is usually better. Here's why:

Feature WebSockets SSE
Direction Both ways (you can send messages back) One way (server → you)
Complexity More complex, harder to debug Simple, uses regular HTTP
Firewall Sometimes blocked Works everywhere
Reconnection You handle it manually Automatic
Best For Gaming, video calls, real-time collaboration Chat, notifications, streaming text

Think About It: ChatGPT uses SSE. If it's good enough for them, it's probably good enough for you.

When you'd use WebSockets instead:

  • You need the user to send control messages (like "stop generating")
  • Multiple people editing the same document
  • Voice or video streaming

Backend: Python (FastAPI)

Why Python? The entire AI ecosystem lives in Python. LangChain, LlamaIndex, PyTorch — they're all Python. You could use Go or Node.js, but you'd be fighting against the ecosystem.

Why FastAPI? FastAPI is modern Python. It has:

  • Built-in async support (critical for chatbots)
  • Automatic API documentation
  • Type checking with Pydantic
  • Great performance for I/O-bound tasks (like waiting for LLM responses)

The Critical Rule: Always Use async

Here's what happens if you don't:

# ❌ BAD: This blocks everything
@app.post("/chat")
def chat_sync(request: ChatRequest):
    response = openai.ChatCompletion.create(...)  # Waits 20 seconds
    return response
    # While waiting, NO other users can chat!

If one user's request takes 20 seconds, every other user has to wait. That's terrible.

# ✅ GOOD: This handles multiple users
@app.post("/chat")
async def chat_async(request: ChatRequest):
    async with aiohttp.ClientSession() as session:
        async with session.post(...) as resp:
            async for chunk in resp.content.iter_chunked(1024):
                yield chunk  # Stream immediately
    # Other users can chat while this one is waiting!

With async, your server can handle hundreds of users chatting simultaneously.

# ❌ BAD: Blocking code
@app.post("/chat")
def chat_sync(request: ChatRequest):
    response = openai.ChatCompletion.create(...)  # Blocks for 20s
    return response

# ✅ GOOD: Async code
@app.post("/chat")
async def chat_async(request: ChatRequest):
    async with aiohttp.ClientSession() as session:
        async with session.post(...) as resp:
            async for chunk in resp.content.iter_chunked(1024):
                yield chunk  # Stream immediately

Why FastAPI?

  • Native async/await support
  • Automatic OpenAPI documentation
  • Type hints with Pydantic
  • High performance (comparable to Node.js/Go for I/O-bound tasks)

Database: Postgres + Redis (Two Tools, Two Jobs)

Think of databases like different types of storage:

  • Postgres = Your filing cabinet (permanent storage, organized)
  • Redis = Your sticky notes (temporary, super fast)

Postgres: The Filing Cabinet

Postgres stores your chat history permanently. It's like a filing cabinet where you can:

  • Find all messages from a specific conversation
  • Search through old chats
  • Keep everything organized with relationships

Here's a simple schema to get started:

-- Each conversation gets a session
CREATE TABLE chat_sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL,
    thread_id VARCHAR(255) NOT NULL,  -- Unique ID for this conversation
    created_at TIMESTAMP DEFAULT NOW(),
    metadata JSONB  -- Flexible storage for extra data
);

-- Each message in a conversation
CREATE TABLE chat_messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id UUID REFERENCES chat_sessions(id),
    role VARCHAR(20) NOT NULL,  -- 'user' or 'assistant'
    content TEXT NOT NULL,
    tokens_used INTEGER,  -- Track costs
    created_at TIMESTAMP DEFAULT NOW()
);

-- Indexes make queries fast
CREATE INDEX idx_thread_id ON chat_sessions(thread_id);
CREATE INDEX idx_session_messages ON chat_messages(session_id, created_at);

Redis: The Sticky Notes

Redis is in-memory (super fast) but temporary. We use it for:

  • Rate limiting: "Has this user sent too many messages?"
  • Caching: "We just looked this up, don't look it up again"
  • Session data: "What's the current state of this conversation?"

Why Two Databases? Postgres is slow for "how many requests did this user make in the last hour?" Redis is perfect for that. Use the right tool for the job.

import redis
from datetime import timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def check_rate_limit(user_id: str, limit: int = 50, window: int = 3600) -> bool:
    key = f"rate_limit:{user_id}"
    current = redis_client.incr(key)
    
    if current == 1:
        redis_client.expire(key, window)
    
    return current <= limit

3. Making It Feel Instant: The Streaming Flow

Have you noticed how ChatGPT starts typing immediately, even before it finishes thinking? That's not magic — it's streaming.

The Problem: Waiting Feels Terrible

Imagine you ask a question and wait 10 seconds with a spinner. That feels broken. Users expect to see something happen immediately.

The Solution: Show Progress As It Happens

We use a Dual-Path Strategy:

  1. Optimistic UI: Show the user's message instantly (don't wait for the server)
  2. Streaming Response: Show the bot's response word-by-word as it's generated

Think of it like a live sports broadcast vs. a recorded game. Live feels more engaging, even if the quality is the same.

sequenceDiagram
    participant User
    participant Server
    participant LLM
    participant DB

    User->>Server: POST /chat (msg="Hello")
    
    par Async Save
        Server->>DB: INSERT User Message
    and Async Generation
        Server->>LLM: Stream Completion
    end
    
    loop Every Chunk
        LLM-->>Server: "H" ... "e" ... "l" ... "l" ... "o"
        Server-->>User: SSE Event: data: {"text": "H"}
    end
    
    Server->>DB: UPDATE Bot Message (Full Text)
    Server-->>User: SSE Event: [DONE]

Key Insight: Notice the Async Save — we don't wait for the database to finish saving before calling the LLM. We do both at the same time. Every millisecond counts when you want to feel instant.

How to Implement Streaming (The Code)

Here's a simple FastAPI endpoint that streams responses:

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import json

app = FastAPI()

@app.post("/chat")
async def chat_stream(request: ChatRequest):
    async def event_generator():
        # Save user message in background (don't wait)
        asyncio.create_task(save_user_message(request.message, request.thread_id))
        
        # Start streaming immediately
        async for chunk in stream_llm_response(request.message, request.thread_id):
            yield {
                "event": "message",
                "data": json.dumps({"text": chunk})
            }
        
        # Tell the client we're done
        yield {
            "event": "done",
            "data": json.dumps({"status": "complete"})
        }
    
    return EventSourceResponse(event_generator())

Making It Even Faster: Time To First Token

The Goal: User sees the first word in less than 300ms.

How to Achieve It:

  1. Don't wait for the database — save messages in the background
  2. Start the LLM call immediately — don't wait for anything
  3. Send tokens as soon as you get them — don't buffer too much
  4. Keep connections alive — reuse HTTP connections

Think of it like a race. Every millisecond you save makes the experience feel faster.

async def stream_llm_response(message: str, thread_id: str):
    # Retrieve context in parallel with LLM initialization
    context_task = asyncio.create_task(get_context(thread_id))
    
    # Start LLM stream immediately
    stream = openai.ChatCompletion.acreate(
        model="gpt-4",
        messages=await context_task,  # Wait only when needed
        stream=True
    )
    
    buffer = []
    async for chunk in stream:
        token = chunk.choices[0].delta.get("content", "")
        if token:
            buffer.append(token)
            
            # Flush buffer every 3 tokens or 50ms
            if len(buffer) >= 3:
                yield "".join(buffer)
                buffer = []
    
    # Flush remaining
    if buffer:
        yield "".join(buffer)

4. Memory: Making the Bot Remember

A chatbot without memory is like talking to someone with amnesia. Every message, they forget everything you said before.

The Problem: Context Windows Are Limited

LLMs have a limit on how much text they can process at once (called a "context window"). GPT-4 can handle about 8,000 tokens (roughly 6,000 words). If your conversation is longer, you need a strategy.

The Solution: Thread IDs + Smart Context Management

Instead of sending the entire conversation history every time, we use a Thread ID — a unique identifier for each conversation.

How It Works:

  1. Each conversation gets a unique thread_id (like "conv_abc123")
  2. When the user sends a message, we look up the last N messages for that thread
  3. We send only those recent messages to the LLM

For Short Conversations: Send everything. Simple.

For Long Conversations: Use a "sliding window" — keep the last 20 messages, and summarize everything older.

Think of it like a conversation summary: "Earlier, the user mentioned they're a Python developer and asked about APIs. Now they're asking about authentication."

Here's how you'd implement this in code:

async def get_conversation_context(thread_id: str, max_tokens: int = 4000):
    # Step 1: Get the last 20 messages for this conversation
    recent_messages = await db.fetch(
        """
        SELECT role, content
        FROM chat_messages
        WHERE session_id = (
            SELECT id FROM chat_sessions WHERE thread_id = $1
        )
        ORDER BY created_at DESC
        LIMIT 20
        """,
        thread_id
    )
    
    # Step 2: Count how many tokens this would use
    total_tokens = sum(count_tokens(msg['content']) for msg in recent_messages)
    
    # Step 3: If it fits, use it. If not, summarize.
    if total_tokens <= max_tokens:
        # Fits! Return all messages
        return [{"role": m['role'], "content": m['content']} for m in reversed(recent_messages)]
    else:
        # Too long! Summarize old messages, keep recent ones
        return await get_summarized_context(thread_id, recent_messages)

async def get_summarized_context(thread_id: str, messages: List):
    # Get existing summary (if we've summarized before)
    summary = await db.fetchval(
        "SELECT summary FROM conversation_summaries WHERE thread_id = $1",
        thread_id
    )
    
    # If no summary exists, create one from old messages
    if not summary:
        old_messages = messages[:-10]  # Everything except last 10
        summary = await summarize_messages(old_messages)  # Use LLM to summarize
        await db.execute(
            "INSERT INTO conversation_summaries (thread_id, summary) VALUES ($1, $2)",
            thread_id, summary
        )
    
    # Combine: summary of old stuff + last 10 actual messages
    recent = messages[-10:]
    return [
        {"role": "system", "content": f"Previous conversation: {summary}"},
        *[{"role": m['role'], "content": m['content']} for m in recent]
    ]

In Plain English: If the conversation is short, send everything. If it's long, summarize the old parts and keep the recent messages.

Sliding Window Strategy

graph LR
    A[Full History] -->|"Check Token Count"| B{Under Limit?}
    B -->|Yes| C[Use All Messages]
    B -->|No| D[Summarize Old Messages]
    D --> E[Combine Summary + Recent N Messages]
    E --> F[Send to LLM]
    C --> F

Implementation Details:

  • Window Size: Last 20 messages (configurable)
  • Token Budget: 4000 tokens for context (leaves room for response)
  • Summarization Trigger: When context exceeds 3500 tokens
  • Summary Storage: Cache summaries in Postgres with TTL

5. Giving the Bot "Hands": RAG and Tools

An LLM by itself is like a smart person locked in a room with no access to the outside world. They can talk, but they can't look things up or do things.

The Problem: LLMs Hallucinate

If you ask an LLM "What's the weather?" it might make something up because it doesn't have access to real-time data. We need to give it tools to access real information.

The Solution: Let the LLM Choose When to Use Tools

Instead of always searching your knowledge base (which is slow and expensive), we let the LLM decide: "Do I need to look something up, or can I answer this directly?"

Example Tools:

  1. search_knowledge_base: Search your company docs/PDFs
  2. get_user_data: Look up the user's account info
  3. calculator: Do math (LLMs are bad at math)

How It Works:

  • User: "Hi" → Bot: "Hello!" (no tools needed)
  • User: "Where is my order?" → Bot: Uses get_user_data → "Your order #123 is shipping"
  • User: "How does your API work?" → Bot: Uses search_knowledge_base → Finds docs → Explains

The Smart Part: The bot doesn't search your knowledge base for "Thanks" — it just responds directly. This saves time and money.

Tool Selection Flow

flowchart TD
    A[User Query] --> B[LLM with Tool Definitions]
    B --> C{Tool Needed?}
    C -->|No| D[Direct Response]
    C -->|Yes| E[Select Tool]
    E --> F{Tool Type?}
    F -->|RAG| G[Vector Search]
    F -->|API| H[External API Call]
    F -->|Function| I[Execute Function]
    G --> J[Combine Results]
    H --> J
    I --> J
    J --> K[LLM Generates Final Response]
    D --> L[Stream to User]
    K --> L

How Tools Work (Simple Example)

Here's a simplified version of how tool calling works:

# Step 1: Define what tools are available
tools = [
    {
        "name": "search_knowledge_base",
        "description": "Search company documentation",
        "parameters": {"query": "string"}
    },
    {
        "name": "get_user_data",
        "description": "Get user account info",
        "parameters": {"user_id": "string"}
    }
]

# Step 2: Send user message + tool definitions to LLM
response = await llm.chat(
    messages=[{"role": "user", "content": "Where is my order?"}],
    tools=tools
)

# Step 3: Check if LLM wants to use a tool
if response.tool_calls:
    # LLM said: "I need to call get_user_data"
    for tool_call in response.tool_calls:
        if tool_call["name"] == "get_user_data":
            # Actually call the tool
            user_data = db_query(tool_call["args"]["user_id"])
            
            # Send tool result back to LLM
            final_response = await llm.chat(
                messages=[
                    {"role": "user", "content": "Where is my order?"},
                    {"role": "assistant", "content": None, "tool_calls": tool_calls},
                    {"role": "tool", "content": user_data}
                ]
            )
            return final_response
else:
    # LLM can answer directly, no tools needed
    return response

The Flow: User asks → LLM decides if it needs tools → If yes, call tool → Send result back to LLM → LLM generates final answer.

Tool Definition Schema

tools = [
    {
        "type": "function",
        "function": {
            "name": "search_knowledge_base",
            "description": "Search company documentation and knowledge base",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query"
                    },
                    "top_k": {
                        "type": "integer",
                        "description": "Number of results",
                        "default": 5
                    }
                },
                "required": ["query"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_user_data",
            "description": "Get user account information",
            "parameters": {
                "type": "object",
                "properties": {
                    "user_id": {
                        "type": "string",
                        "description": "User ID"
                    }
                },
                "required": ["user_id"]
            }
        }
    }
]

6. Production Reality: What Will Break

When you launch, things will go wrong. Here's what to expect and how to handle it:

1. Rate Limits & Cost: Protecting Your Budget

The Problem: One user could write a script that sends 10,000 messages per minute, draining your API budget in hours.

The Solution: Rate limiting — like a bouncer at a club, you limit how many requests each user can make.

Simple Approach: "Each user gets 50 requests per hour. After that, they wait."

Here's a simple rate limiter:

import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def check_rate_limit(user_id: str, limit: int = 50, window: int = 3600):
    """
    Check if user has exceeded their rate limit.
    limit = max requests (e.g., 50)
    window = time window in seconds (e.g., 3600 = 1 hour)
    """
    key = f"rate_limit:{user_id}"
    
    # Increment counter (starts at 0, becomes 1, 2, 3...)
    current = redis_client.incr(key)
    
    # On first request, set expiration (delete key after 1 hour)
    if current == 1:
        redis_client.expire(key, window)
    
    # Check if over limit
    if current > limit:
        return False, 0  # Blocked, 0 remaining
    
    remaining = limit - current
    return True, remaining  # Allowed, with remaining count

# Usage in your API endpoint
@app.post("/chat")
async def chat(request: ChatRequest):
    allowed, remaining = check_rate_limit(request.user_id, limit=50, window=3600)
    
    if not allowed:
        raise HTTPException(
            status_code=429,
            detail="Too many requests. Try again in an hour."
        )
    
    # Continue with chat...

How It Works:

  1. User makes request → increment counter in Redis
  2. If counter > limit → block the request
  3. After 1 hour → counter resets (key expires)

Advanced: Track Token Usage Too

You can also limit by tokens (not just requests):

def check_token_budget(user_id: str, tokens_used: int, daily_budget: int = 100000):
    """Limit total tokens per day, not just requests"""
    key = f"token_budget:{user_id}:{date.today()}"
    current = redis_client.incrby(key, tokens_used)
    
    if current == tokens_used:  # First request today
        redis_client.expire(key, 86400)  # Expire after 24 hours
    
    return current <= daily_budget

This prevents one user from using all your tokens in a single day.

2. When OpenAI Goes Down: Circuit Breakers

The Problem: OpenAI (or any external service) can go down. If you keep trying to call it, you waste time and frustrate users.

The Solution: Circuit Breaker Pattern

Think of it like a fuse in your house. If something keeps failing, you "blow the fuse" and stop trying for a while.

How It Works:

  1. Normal (CLOSED): Everything works, calls go through
  2. Failing (OPEN): After 3 failures, stop trying for 60 seconds
  3. Testing (HALF_OPEN): After 60 seconds, try once to see if it's fixed

Simple Implementation:

class CircuitBreaker:
    def __init__(self, failure_threshold=3, timeout=60):
        self.failure_count = 0
        self.state = "CLOSED"  # CLOSED, OPEN, or HALF_OPEN
        self.last_failure_time = None
    
    def call(self, func, *args, **kwargs):
        # If we're in "OPEN" state, don't even try
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > timeout:
                self.state = "HALF_OPEN"  # Try again
            else:
                raise Exception("Service is down, try later")
        
        try:
            result = func(*args, **kwargs)
            # Success! Reset
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= failure_threshold:
                self.state = "OPEN"  # Stop trying
            
            raise e

With Fallback:

async def call_llm_with_fallback(prompt: str):
    try:
        return await breaker.call(openai_call, prompt)
    except Exception:
        # Try backup provider
        return await anthropic_call(prompt)
        # Or show a friendly error: "I'm having trouble. Try again in a minute."

Think About It: What's better — showing an error immediately, or making the user wait 30 seconds for a timeout?

3. Observability: Seeing What's Actually Happening

The Problem: The bot gives a weird answer. Why? Did it retrieve the wrong document? Did the LLM misunderstand? You have no idea.

The Solution: Tracing

Tracing is like having a security camera for your chatbot. You can see:

  • What the user asked
  • What documents were retrieved
  • What the LLM was thinking
  • How long each step took
  • What went wrong

Why This Matters: When a user complains "the bot gave me wrong information," you can trace back and see: "Oh, it retrieved document #5 instead of document #2. Let me fix the search."

Simple Approach: Log everything with a request ID, then use a tool like LangSmith or Arize Phoenix to visualize it.

Simple Approach: Log Everything

At minimum, log these things for every request:

import time
import uuid

async def handle_chat_request(message: str, thread_id: str):
    request_id = str(uuid.uuid4())
    start_time = time.time()
    
    # Log the request
    logger.info(f"[{request_id}] User message: {message}")
    
    # Step 1: Get context
    context_start = time.time()
    context = await get_conversation_context(thread_id)
    context_time = (time.time() - context_start) * 1000  # milliseconds
    
    # Step 2: Search if needed
    rag_start = time.time()
    rag_results = await vector_search(message)
    rag_time = (time.time() - rag_start) * 1000
    
    # Step 3: Generate response
    llm_start = time.time()
    response = await llm.generate(messages=context + [{"role": "user", "content": message}])
    llm_time = (time.time() - llm_start) * 1000
    
    total_time = (time.time() - start_time) * 1000
    
    # Log everything
    logger.info(f"""
    [{request_id}] Request complete
    - Context retrieval: {context_time}ms
    - RAG search: {rag_time}ms
    - LLM generation: {llm_time}ms
    - Total: {total_time}ms
    - Tokens used: {response.usage.total_tokens}
    """)
    
    return response

What to Track:

  • Request ID: So you can trace a specific conversation
  • Timing: How long each step takes (helps find bottlenecks)
  • Tokens: Track costs
  • Errors: What failed and why

Better Approach: Use a Tool

Tools like LangSmith or Arize Phoenix make this easier — they automatically track everything and give you a nice dashboard. But you can start with simple logging and upgrade later.

Think About It: If a user complains about a bad answer, can you find their request ID and see exactly what happened? If not, you need better observability.

7. Summary: What You Need for Production

Here's the checklist for a production-ready chatbot:

  1. Async Backend: Use async/await so multiple users can chat simultaneously
  2. Streaming (SSE): Show responses word-by-word, not all at once
  3. Memory: Store chat history and retrieve it efficiently
  4. Tools/RAG: Give the bot access to real data when needed
  5. Rate Limiting: Protect your budget from abuse
  6. Circuit Breakers: Don't keep trying when services are down
  7. Observability: Know what's happening so you can fix issues
  8. Error Handling: Fail gracefully, don't crash

Production Readiness Checklist

  • Async/await throughout (no blocking I/O)
  • SSE streaming implemented with proper event formatting
  • Thread-based conversation context management
  • Sliding window or summarization for long conversations
  • Tool/function calling for RAG and external APIs
  • Rate limiting per user (requests and tokens)
  • Circuit breakers for external LLM providers
  • Fallback models or graceful error messages
  • Distributed tracing (LangSmith/Phoenix)
  • Cost tracking per request/user
  • Error logging with request IDs
  • Health checks and monitoring endpoints
  • Load testing completed (1000+ concurrent users)

Challenge for You

Scenario: A user uploads a 50-page PDF and asks questions about it. Constraint: The PDF parsing takes 10 seconds. You cannot block the chat window. Question: How do you modify the architecture to handle "Document Ingestion" asynchronously while keeping the user updated in the chat?

Hint: Think about Task Queues like Celery/BullMQ.

Solution Approach

sequenceDiagram
    participant User
    participant API
    participant Queue
    participant Worker
    participant VectorDB
    participant Chat

    User->>API: Upload PDF + Question
    API->>Queue: Enqueue Ingestion Task
    API->>User: "Processing your document..."
    
    Queue->>Worker: Process PDF Task
    Worker->>Worker: Parse & Chunk PDF (10s)
    Worker->>VectorDB: Store Embeddings
    
    User->>Chat: "Is my document ready?"
    Chat->>Queue: Check Task Status
    Queue-->>Chat: "Processing... 60%"
    Chat-->>User: "Almost done! 60% processed"
    
    Worker->>Queue: Task Complete
    Queue->>Chat: Notify Completion
    Chat-->>User: "Document ready! Ask me anything."

Implementation Pattern:

  1. Task Queue: Use Celery (Python) or BullMQ (Node.js)
  2. Status Updates: Store task status in Redis with progress percentage
  3. SSE Notifications: Stream status updates to the user
  4. Polling Fallback: Client polls for status if SSE disconnects
from celery import Celery

celery_app = Celery('chatbot', broker='redis://localhost:6379')

@celery_app.task(bind=True)
def process_document(self, file_path: str, user_id: str, thread_id: str):
    # Update progress
    self.update_state(state='PROGRESS', meta={'progress': 0})
    
    # Parse PDF
    chunks = parse_pdf(file_path)
    self.update_state(state='PROGRESS', meta={'progress': 50})
    
    # Generate embeddings
    embeddings = generate_embeddings(chunks)
    self.update_state(state='PROGRESS', meta={'progress': 80})
    
    # Store in vector DB
    store_in_vector_db(embeddings, thread_id)
    self.update_state(state='PROGRESS', meta={'progress': 100})
    
    return {'status': 'complete', 'chunks': len(chunks)}

Discussion Prompts for Engineers

  • How would you design rate limiting for a freemium model (different limits for free vs. paid users)?
  • What's your strategy for handling context window limits when conversations exceed 100 messages?
  • How do you ensure idempotency when a user's request times out but the LLM call succeeds?
  • Would you use a single Redis instance or shard rate limiting by user_id? Why?
  • How do you balance between streaming speed (smaller chunks) and network efficiency (larger chunks)?
  • What's your fallback strategy when both primary and backup LLM providers are down?
  • How would you implement A/B testing for different prompt strategies without affecting user experience?
  • What metrics would you alert on in production? (e.g., p95 latency > 5s, error rate > 1%)

Takeaway

Building a production AI chatbot isn't about calling an API. It's about designing a system that:

  • Feels fast — users see responses immediately through streaming
  • Remembers — context management so conversations make sense
  • Scales — handles thousands of users without breaking
  • Fails gracefully — when things go wrong, users get helpful errors
  • Is observable — you can see what's happening and fix issues

The difference between a demo and a product? Architecture. Not the AI model, not the prompts — the system design around it.

Start simple, add complexity as you need it. You don't need all of this on day one, but you should know where you're heading.


For more on building production AI systems, check out our AI Bootcamp for Software Engineers.

Share this post

Continue Reading

Weekly Bytes of AI

Technical deep-dives for engineers building production AI systems.

Architecture patterns, system design, cost optimization, and real-world case studies. No fluff, just engineering insights.

Unsubscribe anytime. We respect your inbox.

Ready to go deeper?

Go beyond articles. Build production AI systems with hands-on workshops and our intensive AI Bootcamp.