How to Build an AI Agent From Scratch: The Complete 2026 Guide
Step-by-step guide to building a production AI agent — from the ReAct loop and tool calling to MCP integration, memory, and deployment.
Whether you're looking for an angel investor, a growth advisor, or just want to connect — I'm always open to great ideas.
Get in TouchAI, startups & growth insights. No spam.
TL;DR: Building an AI agent is not complicated — but most tutorials show you toys, not production systems. This guide takes you from first principles through a working ReAct agent with tool calling, then layers on MCP integration, persistent memory, error handling, and a deployment architecture that handles real traffic. Every code block runs. Every pattern comes from systems we've deployed. By the end, you will have a complete agent that can search the web, read files, execute code, and remember things across sessions — plus the mental model to build any agent from here.
Before we write a single line of code, we need a precise definition — because "AI agent" is one of the most overloaded terms in tech right now. A chatbot that answers questions is not an agent. A RAG pipeline is not an agent. A classifier running in a Lambda function is not an agent.
An AI agent is a system that:
The key word is "loop." An LLM call that generates a response is not an agent. An LLM call that generates a response, executes an action, observes the result, and decides what to do next — that is an agent.
This matters because the engineering problems are completely different. With a single LLM call, you have one shot, one output, one latency hit. With an agent loop, you have:
Understanding these challenges upfront saves you weeks of debugging later. The AI agent startup opportunity is real precisely because these engineering challenges have only recently become tractable — and most engineering teams are still learning how to navigate them.
Every agent, regardless of framework or LLM, follows the same fundamental loop. Here is the complete flow:
flowchart TD
START([User Goal / Task]) --> PERCEIVE
PERCEIVE[Perception\nGather inputs: task, memory,\ntool results, context] --> REASON
REASON[Reasoning\nLLM decides: is goal complete?\nIf not, which tool to call next?] --> DONE_CHECK
DONE_CHECK{Goal\nComplete?} -->|Yes| OUTPUT([Return Final Result])
DONE_CHECK -->|No| ACT
ACT[Action\nExecute tool call:\nsearch, read, write, call API] --> OBSERVE
OBSERVE[Observation\nCapture tool result,\nadd to context window] --> ERROR_CHECK
ERROR_CHECK{Error?} -->|Yes| RETRY_CHECK
ERROR_CHECK -->|No| PERCEIVE
RETRY_CHECK{Retries\nLeft?} -->|Yes| ACT
RETRY_CHECK -->|No| FAIL([Return Error + Partial Result])
style START fill:#4f46e5,color:#fff
style OUTPUT fill:#059669,color:#fff
style FAIL fill:#dc2626,color:#fffLet us walk through each node:
Perception — the agent ingests everything it knows: the original task, any prior conversation, results from tool calls it already made, and anything it retrieved from memory. This becomes the prompt context for the reasoning step.
Reasoning — the LLM analyzes the current state and decides: is the goal achieved? If yes, it returns a final answer. If not, it decides which tool to call and with what arguments. This is where the ReAct pattern (Reasoning + Acting) lives.
Action — the agent actually executes the tool. This is synchronous I/O in most implementations — the agent waits for the tool result before continuing.
Observation — the tool result is captured and appended to the context as an "observation" message. This is what closes the loop. Without observation, you just have a chain of actions with no feedback.
Error handling — every real agent needs retry logic at the action layer. Tools fail. Networks time out. APIs return unexpected status codes. We will cover this in depth in the error handling section.
The loop runs until one of three conditions:
The ReAct pattern (Reasoning + Acting) is the foundational architecture for most agents in production today. The model interleaves natural language reasoning with structured tool calls. Here is a minimal but production-quality implementation:
# agent.py
import json
import time
from typing import Any, Callable
from anthropic import Anthropic
client = Anthropic()
# --- Tool registry ---
def web_search(query: str) -> str:
"""Search the web for current information."""
# In production: use Brave Search API, Exa, or Serper
return f"Search results for '{query}': [mock results — replace with real API call]"
def read_file(path: str) -> str:
"""Read the contents of a file."""
try:
with open(path, "r") as f:
return f.read()
except FileNotFoundError:
return f"Error: File not found at {path}"
except Exception as e:
return f"Error reading file: {str(e)}"
def run_python(code: str) -> str:
"""Execute Python code and return stdout."""
import subprocess
result = subprocess.run(
["python3", "-c", code],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
return f"Error: {result.stderr}"
return result.stdout
TOOLS: dict[str, Callable] = {
"web_search": web_search,
"read_file": read_file,
"run_python": run_python,
}
# --- Tool definitions for the model ---
TOOL_DEFINITIONS = [
{
"name": "web_search",
"description": "Search the web for current information about a topic.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
}
},
"required": ["query"],
},
},
{
"name": "read_file",
"description": "Read the contents of a local file.",
"input_schema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file",
}
},
"required": ["path"],
},
},
{
"name": "run_python",
"description": "Execute Python code and return the output.",
"input_schema": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute",
}
},
"required": ["code"],
},
},
]
# --- The agent loop ---
class Agent:
def __init__(
self,
model: str = "claude-opus-4-5",
max_steps: int = 20,
system: str = "You are a helpful AI assistant with access to tools. Use tools to complete the user's task.",
):
self.model = model
self.max_steps = max_steps
self.system = system
def run(self, task: str) -> str:
messages = [{"role": "user", "content": task}]
steps = 0
while steps < self.max_steps:
steps += 1
print(f"\n[Step {steps}]")
# Reasoning: ask the model what to do next
response = client.messages.create(
model=self.model,
max_tokens=4096,
system=self.system,
tools=TOOL_DEFINITIONS,
messages=messages,
)
# Check stop condition
if response.stop_reason == "end_turn":
# Extract final text response
final_text = next(
(block.text for block in response.content if hasattr(block, "text")),
"Task completed.",
)
print(f"[Done] {final_text[:100]}...")
return final_text
# Process tool calls (action + observation)
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
tool_name = block.name
tool_input = block.input
print(f"[Tool] {tool_name}({json.dumps(tool_input)[:80]})")
# Execute the tool
if tool_name in TOOLS:
try:
result = TOOLS[tool_name](**tool_input)
except Exception as e:
result = f"Tool execution error: {str(e)}"
else:
result = f"Unknown tool: {tool_name}"
print(f"[Result] {str(result)[:100]}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result),
})
# Add assistant response and tool results to message history
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return "Max steps reached. Partial result may be available."
# --- Entry point ---
if __name__ == "__main__":
agent = Agent()
result = agent.run(
"Search for the latest news about AI agents in 2026, "
"then write a 3-sentence summary and save it to /tmp/summary.txt"
)
print(f"\nFinal result:\n{result}")
This is roughly 100 lines and handles the complete agent loop. A few things worth calling out:
stop_reason == "end_turn" — this is how Anthropic's API signals the model has finished. When the model has no more tool calls to make and believes it has completed the task, it returns end_turn.
max_steps guard — this is non-negotiable in production. Without it, a confused agent will loop forever and run up a $500 bill. Set it low in development (10-15 steps), higher in production for complex tasks (50-100 steps).
Error handling in tool execution — every tool call is wrapped in a try/except. When a tool fails, we return the error as a string result. This lets the model see what went wrong and either retry, use a different tool, or abort gracefully.
Message history format — the messages array alternates user and assistant roles. Tool results are sent back as user messages with type tool_result. This is the Anthropic tool use format; OpenAI has a slightly different structure.
Tool calling (also called function calling) is the mechanism by which a language model requests an external action. The model does not actually execute anything — it outputs a structured specification of what it wants to do, and your code executes it. Here is the complete sequence:
sequenceDiagram
participant User
participant Agent
participant LLM as LLM (Claude/GPT)
participant Tool as Tool Executor
participant API as External API/System
User->>Agent: "Find the weather in SF and email it to me"
Agent->>LLM: Messages + Tool Definitions
LLM-->>Agent: tool_use: get_weather(city="San Francisco")
Agent->>Tool: Execute get_weather("San Francisco")
Tool->>API: GET weather.api.com/sf
API-->>Tool: {temp: 62, condition: "Foggy"}
Tool-->>Agent: "62°F, Foggy"
Agent->>LLM: Messages + tool_result: "62°F, Foggy"
LLM-->>Agent: tool_use: send_email(to="...", body="SF weather: 62°F, Foggy")
Agent->>Tool: Execute send_email(...)
Tool->>API: POST mail.api.com/send
API-->>Tool: {status: "sent", message_id: "abc123"}
Tool-->>Agent: "Email sent. ID: abc123"
Agent->>LLM: Messages + tool_result: "Email sent"
LLM-->>Agent: end_turn: "Done! I sent the SF weather report."
Agent-->>User: "Done! I sent the SF weather report."The quality of your tool definitions directly determines agent performance. The model uses the description and schema to decide when and how to call each tool. A vague description leads to misuse; an overly strict schema leads to the model failing to call the tool at all.
Good tool definition:
{
"name": "search_documents",
"description": (
"Search through the company's internal document store. "
"Use this when the user asks about internal policies, procedures, "
"product specs, or any information that would be in our documentation. "
"Returns up to 5 relevant document excerpts with titles and URLs."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query. Be specific."
},
"limit": {
"type": "integer",
"description": "Max results to return (1-10, default 5)",
"default": 5
}
},
"required": ["query"]
}
}
Bad tool definition:
{
"name": "search",
"description": "Search for things.",
"input_schema": {
"type": "object",
"properties": {
"q": {"type": "string"}
},
"required": ["q"]
}
}
The bad version will cause the model to use this tool for every query, including ones that should go to other tools. It also does not tell the model what it returns, so the model cannot reason about whether the result was useful.
Here is the same agent in TypeScript using the OpenAI function calling format, which is the de-facto standard across most LLM providers:
// agent.ts
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
interface ToolResult {
type: "tool_result";
tool_use_id: string;
content: string;
}
// Tool implementations
const tools = {
async webSearch(query: string): Promise<string> {
// Replace with real search API
return `Search results for "${query}": [mock results]`;
},
async readFile(path: string): Promise<string> {
const { readFile } = await import("fs/promises");
try {
return await readFile(path, "utf-8");
} catch (e) {
return `Error reading file: ${(e as Error).message}`;
}
},
};
const toolDefinitions: Anthropic.Tool[] = [
{
name: "web_search",
description: "Search the web for current information.",
input_schema: {
type: "object" as const,
properties: {
query: { type: "string", description: "The search query" },
},
required: ["query"],
},
},
{
name: "read_file",
description: "Read the contents of a local file.",
input_schema: {
type: "object" as const,
properties: {
path: { type: "string", description: "Absolute path to the file" },
},
required: ["path"],
},
},
];
async function runAgent(task: string, maxSteps = 20): Promise<string> {
const messages: Anthropic.MessageParam[] = [
{ role: "user", content: task },
];
for (let step = 0; step < maxSteps; step++) {
const response = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 4096,
tools: toolDefinitions,
messages,
});
if (response.stop_reason === "end_turn") {
const textBlock = response.content.find((b) => b.type === "text");
return textBlock?.type === "text" ? textBlock.text : "Task completed.";
}
const toolResults: ToolResult[] = [];
for (const block of response.content) {
if (block.type !== "tool_use") continue;
const input = block.input as Record<string, string>;
let result: string;
try {
if (block.name === "web_search") {
result = await tools.webSearch(input.query);
} else if (block.name === "read_file") {
result = await tools.readFile(input.path);
} else {
result = `Unknown tool: ${block.name}`;
}
} catch (e) {
result = `Tool error: ${(e as Error).message}`;
}
toolResults.push({
type: "tool_result",
tool_use_id: block.id,
content: result,
});
}
messages.push({ role: "assistant", content: response.content });
messages.push({ role: "user", content: toolResults });
}
return "Max steps reached.";
}
// Run it
runAgent("What is the latest news about Claude 4?").then(console.log);
Writing bespoke tool implementations for every agent is not scalable. If you build 20 agents that all need web search, you end up with 20 slightly different implementations, all with different error handling, different rate limiting logic, and different schemas.
Model Context Protocol (MCP) solves this. MCP is an open standard from Anthropic that defines a universal protocol for connecting language models to tools and data sources. Think of it as USB-C for agent integrations: any MCP-compatible client can connect to any MCP-compatible server without custom glue code.
The MCP ecosystem is growing fast. As of March 2026, there are over 500 MCP servers available covering GitHub, Slack, Google Drive, Postgres, web search, browser automation, and more. If you are building SaaS with MCP integration, this changes your architecture significantly.
Here is how to connect a TypeScript agent to MCP tools:
// mcp-agent.ts
import Anthropic from "@anthropic-ai/sdk";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
interface MCPTool {
name: string;
description?: string;
inputSchema: Record<string, unknown>;
}
async function buildMCPAgent(serverCommand: string, serverArgs: string[]) {
// Connect to the MCP server
const transport = new StdioClientTransport({
command: serverCommand,
args: serverArgs,
});
const mcpClient = new Client(
{ name: "my-agent", version: "1.0.0" },
{ capabilities: {} }
);
await mcpClient.connect(transport);
// Discover available tools from the server
const { tools: mcpTools } = await mcpClient.listTools();
console.log(`Connected to MCP server. Found ${mcpTools.length} tools:`);
mcpTools.forEach((t) => console.log(` - ${t.name}: ${t.description}`));
// Convert MCP tools to Anthropic tool format
const anthropicTools: Anthropic.Tool[] = mcpTools.map((tool: MCPTool) => ({
name: tool.name,
description: tool.description ?? "",
input_schema: tool.inputSchema as Anthropic.Tool["input_schema"],
}));
// Tool executor that delegates to MCP
async function executeTool(
name: string,
input: Record<string, unknown>
): Promise<string> {
const result = await mcpClient.callTool({ name, arguments: input });
// MCP returns content array — extract text
const textContent = result.content
.filter((c: { type: string }) => c.type === "text")
.map((c: { type: string; text?: string }) => c.text ?? "")
.join("\n");
return textContent || JSON.stringify(result.content);
}
// Run the agent
const anthropicClient = new Anthropic();
async function run(task: string): Promise<string> {
const messages: Anthropic.MessageParam[] = [
{ role: "user", content: task },
];
for (let step = 0; step < 30; step++) {
const response = await anthropicClient.messages.create({
model: "claude-opus-4-5",
max_tokens: 4096,
tools: anthropicTools,
messages,
});
if (response.stop_reason === "end_turn") {
const text = response.content.find((b) => b.type === "text");
return text?.type === "text" ? text.text : "Done.";
}
const results: Anthropic.ToolResultBlockParam[] = [];
for (const block of response.content) {
if (block.type !== "tool_use") continue;
const result = await executeTool(
block.name,
block.input as Record<string, unknown>
);
results.push({
type: "tool_result",
tool_use_id: block.id,
content: result,
});
}
messages.push({ role: "assistant", content: response.content });
messages.push({ role: "user", content: results });
}
return "Max steps reached.";
}
return { run, disconnect: () => mcpClient.close() };
}
// Usage: connect to the filesystem MCP server
const agent = await buildMCPAgent("npx", [
"-y",
"@modelcontextprotocol/server-filesystem",
"/tmp",
]);
const result = await agent.run(
"List all .txt files in /tmp and summarize their contents"
);
console.log(result);
await agent.disconnect();
The key insight here: the agent code is identical regardless of what MCP server you connect to. You could swap in a GitHub MCP server, a Postgres MCP server, or a Slack MCP server, and the agent loop does not change. The MCP protocol handles tool discovery, schema validation, and result formatting.
This is why MCP is disrupting the way SaaS companies think about integrations. Instead of building a bespoke API integration, companies are shipping MCP servers and letting any AI agent connect natively.
The agent implementations above are stateless — every run starts fresh. For most production use cases, that is not good enough. Users expect their agent to remember previous conversations, preferences, and work. Here is the memory architecture we recommend:
block-beta
columns 3
block:WORKING["Working Memory\n(Context Window)"]:1
CW["Current messages\n+ tool results\n+ retrieved context\nMax: 200K tokens"]
end
block:SHORT["Short-Term Memory\n(Session State)"]:1
SS["Conversation history\nfor current session\nStored in Redis\nTTL: 24h"]
end
block:LONG["Long-Term Memory\n(Persistent Store)"]:1
LS["User preferences\nPast task summaries\nLearned facts\nStored in vector DB"]
end
space
RETRIEVAL["Memory Retrieval\nSemanticSearch + Recency\nWeighted score"]
space
CW --> RETRIEVAL
SS --> RETRIEVAL
LS --> RETRIEVAL
RETRIEVAL --> INJECT["Inject into\nnext prompt"]This is everything currently in the model's context: the conversation, tool results, and any retrieved context. It is ephemeral — gone when the API call ends. The practical limit is your model's context window (200K tokens for Claude claude-opus-4-5, 128K for GPT-4o).
You do not need to manage working memory explicitly for simple agents. For complex, long-running agents, you need a context management strategy — see our agent memory systems guide.
Conversation history that persists within a user session. Store in Redis with a TTL. This gives your agent continuity across multiple API calls without hitting a database for every turn.
# memory.py
import json
import time
from typing import Optional
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
CONVERSATION_TTL = 86400 # 24 hours
def save_conversation(session_id: str, messages: list) -> None:
key = f"conversation:{session_id}"
r.setex(key, CONVERSATION_TTL, json.dumps(messages))
def load_conversation(session_id: str) -> list:
key = f"conversation:{session_id}"
data = r.get(key)
return json.loads(data) if data else []
def append_message(session_id: str, role: str, content) -> None:
messages = load_conversation(session_id)
messages.append({"role": role, "content": content, "timestamp": time.time()})
save_conversation(session_id, messages)
Semantic memories that persist across sessions — user preferences, past task summaries, facts the agent learned. Stored in a vector database for semantic retrieval.
# long_term_memory.py
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import uuid
import time
openai = OpenAI()
qdrant = QdrantClient(host="localhost", port=6333)
COLLECTION = "agent_memory"
EMBEDDING_DIM = 1536 # text-embedding-3-small
# Initialize collection (run once)
def setup_collection():
qdrant.recreate_collection(
collection_name=COLLECTION,
vectors_config=VectorParams(size=EMBEDDING_DIM, distance=Distance.COSINE),
)
def embed(text: str) -> list[float]:
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def store_memory(user_id: str, content: str, memory_type: str = "fact") -> str:
"""Store a piece of information in long-term memory."""
vector = embed(content)
memory_id = str(uuid.uuid4())
qdrant.upsert(
collection_name=COLLECTION,
points=[
PointStruct(
id=memory_id,
vector=vector,
payload={
"user_id": user_id,
"content": content,
"memory_type": memory_type,
"created_at": time.time(),
},
)
],
)
return memory_id
def retrieve_memories(user_id: str, query: str, limit: int = 5) -> list[str]:
"""Retrieve relevant memories for a query."""
vector = embed(query)
results = qdrant.search(
collection_name=COLLECTION,
query_vector=vector,
query_filter={
"must": [{"key": "user_id", "match": {"value": user_id}}]
},
limit=limit,
)
return [hit.payload["content"] for hit in results]
# Integrate into agent
class MemoryAgent(Agent):
def run(self, task: str, user_id: str, session_id: str) -> str:
# Load session history
session_messages = load_conversation(session_id)
# Retrieve relevant long-term memories
memories = retrieve_memories(user_id, task)
memory_context = ""
if memories:
memory_context = (
"\n\nRelevant context from previous sessions:\n"
+ "\n".join(f"- {m}" for m in memories)
)
# Build messages with context
messages = session_messages or []
task_with_context = task + memory_context
if not messages:
messages = [{"role": "user", "content": task_with_context}]
else:
messages.append({"role": "user", "content": task})
# Run agent (same loop as before)
result = super().run_with_messages(messages)
# Save new memories from this session
# (In production: use the model to extract key facts to remember)
save_conversation(session_id, messages)
return result
Most agent tutorials skip error handling entirely. That is why most agent prototypes die in production. Here are the failure modes you need to handle:
Tool execution failures — the most common failure mode. The API returns a 429, the database is down, the file does not exist. Always catch these and return a structured error that the model can reason about.
Model hallucinating tool names — the model sometimes outputs a tool call for a tool that does not exist. Your dispatcher needs to handle this gracefully.
Infinite loops — the model gets stuck in a reasoning loop, calling the same tool repeatedly with the same arguments. Detect repeated tool calls and break the loop.
Context overflow — on very long tasks, the accumulated tool results can overflow the context window. You need a truncation strategy.
Partial completion — the agent completes 8 of 10 steps and fails on step 9. You need to decide: retry from the beginning, retry from step 9, or return partial results with a clear status.
Here is a production-grade error handler:
# error_handler.py
import time
import hashlib
import json
from dataclasses import dataclass, field
from typing import Callable, Any
@dataclass
class RetryConfig:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
exponential_base: float = 2.0
@dataclass
class AgentState:
steps: int = 0
tool_call_hashes: list = field(default_factory=list)
errors: list = field(default_factory=list)
def hash_tool_call(name: str, args: dict) -> str:
serialized = json.dumps({"name": name, "args": args}, sort_keys=True)
return hashlib.md5(serialized.encode()).hexdigest()
def execute_with_retry(
tool_fn: Callable,
tool_name: str,
tool_args: dict,
config: RetryConfig = RetryConfig(),
) -> tuple[str, bool]:
"""Execute a tool with exponential backoff retry. Returns (result, success)."""
last_error = None
for attempt in range(config.max_retries + 1):
try:
result = tool_fn(**tool_args)
return str(result), True
except Exception as e:
last_error = e
if attempt < config.max_retries:
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay,
)
print(f"[Retry] {tool_name} failed (attempt {attempt + 1}): {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
return f"Tool '{tool_name}' failed after {config.max_retries + 1} attempts: {last_error}", False
def detect_loop(state: AgentState, tool_name: str, tool_args: dict, window: int = 3) -> bool:
"""Detect if the agent is calling the same tool with the same args repeatedly."""
call_hash = hash_tool_call(tool_name, tool_args)
recent = state.tool_call_hashes[-window:]
if recent.count(call_hash) >= 2:
return True
state.tool_call_hashes.append(call_hash)
return False
def truncate_messages(messages: list, max_tokens: int = 150000) -> list:
"""Truncate old messages to fit context window. Keeps first message (task) and last N messages."""
if len(messages) <= 4:
return messages
# Rough token estimate: 4 chars per token
total_chars = sum(len(str(m)) for m in messages)
if total_chars / 4 < max_tokens:
return messages
# Keep the first message (original task) + last 10 messages
return [messages[0]] + messages[-10:]
The gap between a working prototype and a production agent is larger than most teams expect. Here are the layers you need to add, in order of priority:
1. Structured logging — every tool call, every reasoning step, every error needs to be logged with a unique trace ID. You cannot debug production agents without this. Use OpenTelemetry or a dedicated platform like LangSmith or Helicone.
2. Cost tracking — agent tasks can consume wildly different amounts of tokens depending on the task complexity. Track cost per run, cost per tool call, and cost per user. Set hard limits before they hit.
3. Evaluation — before you ship, you need a test suite. Not unit tests for your Python code — behavioral tests that verify the agent completes real tasks correctly. Build a golden dataset of 20-50 representative tasks with expected outcomes, and run it against every model or prompt change.
4. Human-in-the-loop checkpoints — for high-stakes actions (sending emails, deleting files, making purchases), pause and ask for confirmation. This is not a UX nicety — it is the difference between a useful agent and a dangerous one. This is how agentic coding tools handle their most impactful operations.
5. Idempotency — if your agent crashes mid-task and restarts, it must not duplicate actions. Give every tool call a unique ID, store which calls completed, and skip them on retry.
# production_agent.py
import uuid
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# Setup tracing
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent")
logger = logging.getLogger("agent")
class ProductionAgent(Agent):
def run(self, task: str, run_id: str = None) -> dict:
run_id = run_id or str(uuid.uuid4())
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("run_id", run_id)
span.set_attribute("task", task[:200])
try:
result = super().run(task)
span.set_attribute("status", "success")
return {
"run_id": run_id,
"status": "success",
"result": result,
}
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error", str(e))
logger.error(f"Agent run {run_id} failed: {e}")
return {
"run_id": run_id,
"status": "error",
"error": str(e),
}
The vibe coding era has made it dangerously easy to ship agents that work in demos but fail in production. The patterns above — tracing, cost tracking, evals, HITL, idempotency — are the difference between a demo and a product.
block-beta
columns 4
block:SYNC["Synchronous\n(REST API)"]:1
REST["FastAPI / Express\nHTTP POST /run\nSync response\nBest for: <30s tasks"]
end
block:ASYNC["Asynchronous\n(Queue)"]:1
QUEUE["Redis / SQS queue\nWebhook callback\nPoll /status/:id\nBest for: >30s tasks"]
end
block:STREAM["Streaming\n(SSE/WebSocket)"]:1
STREAM2["Server-Sent Events\nReal-time step updates\nProgress to client\nBest for: interactive"]
end
block:SCHED["Scheduled\n(Cron)"]:1
CRON["Cloud Scheduler\nRun at intervals\nNo user interaction\nBest for: automation"]
end
space
block:INFRA["Deployment Infrastructure"]:2
CON["Containers\n(Docker + K8s)\nPredictable resources\nComplex ops"]
SERVER["Serverless\n(Lambda / Cloud Run)\nAuto-scale to zero\nCold starts"]
end
spaceThe simplest deployment. Wrap your agent in a FastAPI endpoint and call it synchronously. Works for tasks that complete in under 30 seconds.
# server.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
app = FastAPI()
class RunRequest(BaseModel):
task: str
user_id: str
session_id: str
class RunResponse(BaseModel):
result: str
run_id: str
steps_used: int
agent = ProductionAgent()
@app.post("/run", response_model=RunResponse)
async def run_agent(req: RunRequest):
try:
result = await asyncio.to_thread(
agent.run,
task=req.task,
run_id=None,
)
return RunResponse(
result=result["result"],
run_id=result["run_id"],
steps_used=0, # track this in your agent
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
For tasks that take more than 30 seconds, use a queue. The client submits the task, gets a job ID, and polls for completion.
# async_server.py
from fastapi import FastAPI
from pydantic import BaseModel
import redis
import json
import uuid
import threading
app = FastAPI()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
class SubmitRequest(BaseModel):
task: str
user_id: str
@app.post("/submit")
def submit_task(req: SubmitRequest):
job_id = str(uuid.uuid4())
r.hset(f"job:{job_id}", mapping={"status": "queued", "task": req.task})
r.rpush("job_queue", job_id)
return {"job_id": job_id}
@app.get("/status/{job_id}")
def get_status(job_id: str):
data = r.hgetall(f"job:{job_id}")
if not data:
return {"error": "Job not found"}
return data
def worker():
agent = ProductionAgent()
while True:
_, job_id = r.blpop("job_queue")
data = r.hgetall(f"job:{job_id}")
if not data:
continue
r.hset(f"job:{job_id}", "status", "running")
try:
result = agent.run(data["task"])
r.hset(f"job:{job_id}", mapping={
"status": "done",
"result": result["result"],
})
except Exception as e:
r.hset(f"job:{job_id}", mapping={
"status": "failed",
"error": str(e),
})
# Start worker thread
threading.Thread(target=worker, daemon=True).start()
Package your agent in Docker for consistent, reproducible deployments:
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy source
COPY . .
# Run server
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8080"]
# docker-compose.yml
services:
agent:
build: .
ports:
- "8080:8080"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
- qdrant
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
volumes:
redis_data:
qdrant_data:
This stack — FastAPI + Redis + Qdrant — handles thousands of concurrent agent runs and is cheap to operate. For larger scale, replace Redis with SQS and Qdrant with Pinecone.
Serverless (AWS Lambda, Google Cloud Run) works well for agents if you handle the cold start problem. Cold starts on Python Lambda can be 2-5 seconds — acceptable for async patterns, painful for sync. Use provisioned concurrency for latency-sensitive deployments. This connects to how modern SaaS teams are building agentic applications — container-based deployments are winning because agents often need more control over their execution environment than serverless provides.
This guide got you from zero to a deployed production agent. There are three major topics we intentionally deferred because they each deserve their own deep-dives:
Multi-agent orchestration — coordinating multiple specialized agents on a single complex task. When one agent's output becomes another agent's input. Covered in multi-agent orchestration patterns.
Evaluation and testing — building systematic confidence that your agent works before shipping to users. Golden datasets, regression suites, CI/CD for agents.
Observability — actually knowing what your agent did in production. Trace IDs, step logs, cost tracking, quality degradation alerts.
These are not nice-to-haves. They are the difference between an agent that works in demos and one that runs reliably in production with real users.
Q: Which LLM should I use for building agents?
For most production agents in 2026, Claude claude-opus-4-5 or GPT-4o are the best choices. They have the strongest tool calling reliability and longest context windows. For cost-sensitive deployments, Claude Haiku 3.5 or GPT-4o-mini are surprisingly capable for structured tool calling tasks. Always benchmark on your specific task distribution — aggregate benchmarks do not predict per-task performance.
Q: How do I prevent my agent from running up a huge API bill?
Three controls: (1) max_steps hard limit on every agent run, (2) per-run and per-user budget caps enforced before calling the LLM API, (3) cost alerting when daily spend exceeds a threshold. Never deploy an agent without all three.
Q: What is the difference between an agent and a chain?
A chain is a fixed sequence of LLM calls — A → B → C, no branching, no feedback loops. An agent is a dynamic loop where the LLM decides at each step what to do next based on what it observed. Chains are deterministic and predictable; agents are flexible and powerful but harder to debug.
Q: Should I build my own agent framework or use LangChain/CrewAI?
For simple agents (single model, <10 tools), build your own — the loop is 50 lines of code and you eliminate a dependency. For complex multi-agent systems, use an established framework like LangGraph or the OpenAI Agents SDK. The framework gives you checkpointing, state management, human-in-the-loop, and observability out of the box.
Q: How do I handle the case where the agent takes a wrong action?
Two approaches: (1) prevention — use human-in-the-loop checkpoints for irreversible actions, validate tool arguments before execution, (2) recovery — implement undo capabilities for reversible actions, log every action to enable audit replay. For truly irreversible actions (sending emails, making payments), always require explicit confirmation.
Q: Is MCP worth adopting or is it too new?
MCP is stable and adoption is accelerating fast. Anthropic, OpenAI, Google, and Microsoft all support it. The ecosystem of pre-built servers is already extensive. If you are building a new agent system today, building on MCP gives you access to hundreds of integrations for free and future-proofs your tool layer. The switching cost is low — MCP is a protocol, not a framework.
Q: How do I evaluate whether my agent is working correctly?
Build a golden dataset: 20-50 real tasks with known correct outcomes. Run your agent against all of them before every deployment. Measure: task completion rate, accuracy of final answer, number of steps taken, and cost per task. A 10% drop in completion rate on your golden dataset is a signal to investigate before shipping.
Q: What is the hardest part of building agents in production?
Reliability. Not the loop — the loop is easy. The hard parts are: (1) tools that fail in unexpected ways, (2) the model making subtly wrong decisions that only manifest after many steps, (3) graceful degradation when things go wrong mid-task. The teams that build reliable agents invest heavily in error handling, retry logic, observability, and evaluation. The ones that skip those steps have demos, not products.
Want to go deeper? Read our guide on building AI agent startups for the product and business angles, or MCP integration for SaaS for the ecosystem layer.
The real reasons AI agent projects fail in production — from hallucination cascades and cost blowups to evaluation gaps and the POC-to-production chasm.
MCP adoption hit 97M+ monthly downloads. SaaS companies that integrate early gain first-mover advantage with AI agents — here's the complete implementation playbook.
Technical guide to building voice AI agents — platform comparison, latency optimization, architecture patterns, and real cost analysis for ElevenLabs, Vapi, Retell, and native multimodal models.