Advanced RAG Techniques
Overview
This document covers sophisticated RAG patterns including agents, query engines, multi-agent systems, and advanced architectural patterns for production deployments.
Agents
Agent Definition
An agent is "a specific system that uses an LLM, memory, and tools, to handle inputs from outside users." This contrasts with "agentic" systems, which represent a broader class incorporating LLM decision-making.
Agent Loop Cycle
Standard agent execution follows this sequence:
- Retrieves latest message and chat history
- Sends tool schemas and conversation context to LLM API
- LLM responds with either direct answer or tool invocation list
- Each tool executes independently
- Results append to chat history
- Process repeats until completion
Agent Types
FunctionAgent
Primary implementation leveraging provider-native function/tool calling.
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.llms.openai import OpenAI
def multiply(a: float, b: float) -> float:
"""Useful for multiplying two numbers."""
return a * b
agent = FunctionAgent(
tools=[multiply],
llm=OpenAI(model="gpt-4o-mini"),
system_prompt="You are a helpful assistant..."
)
response = await agent.run("What is 5 times 7?")
ReActAgent
Uses reasoning-act prompting strategy:
from llama_index.core.agent import ReActAgent
agent = ReActAgent.from_tools(
tools=[query_tool, calculator_tool],
llm=llm,
verbose=True
)
CodeActAgent
Generates and executes code for tool interactions:
from llama_index.core.agent import CodeActAgent
agent = CodeActAgent.from_tools(
tools=[python_tool],
llm=llm
)
Tool Integration
Function-based Tools
Simplest approach using Python functions with docstrings:
def search_documents(query: str) -> str:
"""Search document database for relevant information.
Args:
query: The search query string
"""
results = vector_index.query(query)
return str(results)
QueryEngineTool
Wrap query engines as tools:
from llama_index.core.tools import QueryEngineTool, ToolMetadata
query_tool = QueryEngineTool(
query_engine=vector_query_engine,
metadata=ToolMetadata(
name="document_search",
description="Searches technical documentation for answers"
)
)
agent = ReActAgent.from_tools([query_tool], llm=llm)
FunctionTool
Enhanced configuration for custom functions:
from llama_index.core.tools import FunctionTool
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Weather in {city}: Sunny, 72°F"
weather_tool = FunctionTool.from_defaults(
fn=get_weather,
name="weather",
description="Get current weather for a city"
)
Memory Management
Default Memory
All agents use ChatMemoryBuffer by default:
from llama_index.core.memory import ChatMemoryBuffer
memory = ChatMemoryBuffer.from_defaults(token_limit=40000)
response = await agent.run(
"What did we discuss earlier?",
memory=memory
)
Persistent Memory
class PersistentMemory(ChatMemoryBuffer):
def __init__(self, storage_path, **kwargs):
super().__init__(**kwargs)
self.storage_path = storage_path
def save(self):
with open(self.storage_path, 'w') as f:
json.dump(self.get_all(), f)
def load(self):
with open(self.storage_path, 'r') as f:
messages = json.load(f)
self.put_messages(messages)
Multi-Modal Capabilities
Process images alongside text:
from llama_index.core.llms import ChatMessage, ImageBlock, TextBlock
msg = ChatMessage(
role="user",
blocks=[
TextBlock(text="Analyze this image:"),
ImageBlock(path="./screenshot.png"),
],
)
response = await agent.run(msg)
Query Engines
Query Engine Basics
A query engine is a "generic interface that allows you to ask questions over your data."
Basic Usage
query_engine = index.as_query_engine()
response = query_engine.query("Who is Paul Graham?")
Streaming Responses
query_engine = index.as_query_engine(streaming=True)
streaming_response = query_engine.query("Who is Paul Graham?")
streaming_response.print_response_stream()
Advanced Query Engine Patterns
RouterQueryEngine
Route queries to specialized handlers:
from llama_index.core.query_engine import RouterQueryEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.tools import QueryEngineTool
summary_tool = QueryEngineTool.from_defaults(
query_engine=summary_query_engine,
description="Useful for summarization questions"
)
vector_tool = QueryEngineTool.from_defaults(
query_engine=vector_query_engine,
description="Useful for factual questions"
)
query_engine = RouterQueryEngine(
selector=LLMSingleSelector.from_defaults(),
query_engine_tools=[summary_tool, vector_tool]
)
SubQuestionQueryEngine
Decompose complex queries into sub-questions:
from llama_index.core.query_engine import SubQuestionQueryEngine
query_engine_tools = [
QueryEngineTool(
query_engine=essay_engine,
metadata=ToolMetadata(
name="essays",
description="Paul Graham essays collection"
)
),
QueryEngineTool(
query_engine=articles_engine,
metadata=ToolMetadata(
name="articles",
description="Technical articles"
)
)
]
query_engine = SubQuestionQueryEngine.from_defaults(
query_engine_tools=query_engine_tools,
use_async=True
)
How it works:
- Breaks complex query into sub-questions
- Routes each to appropriate query engine
- Executes in parallel (if
use_async=True) - Synthesizes comprehensive answer
Use cases: Multi-source analysis, comparative questions, comprehensive research
Joint QA-Summary Engine
Combine question answering with summarization:
from llama_index.core.query_engine import (
SubQuestionQueryEngine,
RouterQueryEngine
)
# Combine multiple capabilities
hybrid_engine = RouterQueryEngine(
selector=selector,
query_engine_tools=[
qa_tool,
summary_tool,
comparison_tool
]
)
Custom Query Engine
from llama_index.core.query_engine import CustomQueryEngine
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.response_synthesizers import BaseSynthesizer
class MyQueryEngine(CustomQueryEngine):
retriever: BaseRetriever
response_synthesizer: BaseSynthesizer
def custom_query(self, query_str: str):
nodes = self.retriever.retrieve(query_str)
response = self.response_synthesizer.synthesize(query_str, nodes)
return response
Multi-Agent Systems
Coordination Pattern
from llama_index.core.agent.workflow import AgentWorkflow
# Define specialized agents
research_agent = FunctionAgent(
tools=[search_tool, summarize_tool],
system_prompt="You are a research specialist"
)
analysis_agent = FunctionAgent(
tools=[analyze_tool, visualize_tool],
system_prompt="You are a data analyst"
)
# Orchestrate agents
multi_agent = AgentWorkflow(
agents=[research_agent, analysis_agent]
)
response = await multi_agent.run("Research and analyze market trends")
Agent Handoff
Agents coordinate by handing off control:
class CoordinatorAgent:
def __init__(self, specialist_agents):
self.specialists = specialist_agents
async def delegate(self, task, task_type):
"""Route task to appropriate specialist"""
if task_type == "research":
return await self.specialists['research'].run(task)
elif task_type == "analysis":
return await self.specialists['analysis'].run(task)
Hierarchical Multi-Agent
# Manager agent
manager = FunctionAgent(
tools=[delegate_to_research, delegate_to_analysis],
system_prompt="You are a project manager. Delegate tasks to specialists."
)
# Worker agents
research_agent = FunctionAgent(tools=[search_tool])
analysis_agent = FunctionAgent(tools=[analyze_tool])
# Hierarchical execution
response = await manager.run("Complete comprehensive market analysis")
Response Synthesis Patterns
Response Modes
create_and_refine: Builds answer sequentially, refining with each chunk
tree_summarize: Builds tree of summaries bottom-up
simple_summarize: Concatenates all chunks, single LLM call
compact: Similar to simple but handles context overflow
from llama_index.core.response_synthesizers import ResponseMode
query_engine = index.as_query_engine(
response_mode=ResponseMode.TREE_SUMMARIZE
)
Custom Response Synthesizer
from llama_index.core.response_synthesizers import BaseSynthesizer
class CustomSynthesizer(BaseSynthesizer):
def synthesize(self, query_str, nodes, **kwargs):
# Custom synthesis logic
context = "\n\n".join([node.get_content() for node in nodes])
prompt = f"Query: {query_str}\n\nContext:\n{context}\n\nAnswer:"
response = llm.complete(prompt)
return response
Advanced Retrieval Patterns
Fusion Retriever
Combines multiple retrievers with query rewriting:
from llama_index.core.retrievers import QueryFusionRetriever
retriever = QueryFusionRetriever(
[vector_retriever, bm25_retriever],
similarity_top_k=5,
num_queries=4, # Generate query variations
mode="reciprocal_rerank",
use_async=True
)
How it works:
- Generates multiple query variations
- Retrieves with each variation across all retrievers
- Fuses results using reciprocal rank
- Returns top-k from fused results
Auto Retriever
LLM automatically generates metadata filters:
from llama_index.core.retrievers import VectorIndexAutoRetriever
from llama_index.core.vector_stores import MetadataInfo, VectorStoreInfo
vector_store_info = VectorStoreInfo(
content_info="Technical documentation",
metadata_info=[
MetadataInfo(
name="category",
type="str",
description="Document category (tutorial, reference, guide)"
),
MetadataInfo(
name="difficulty",
type="str",
description="Content difficulty (beginner, intermediate, advanced)"
)
]
)
retriever = VectorIndexAutoRetriever(
index,
vector_store_info=vector_store_info
)
# LLM automatically generates appropriate filters
nodes = retriever.retrieve("Find beginner tutorials on Python")
Knowledge Graph Retriever
Leverage graph relationships:
from llama_index.core import KnowledgeGraphIndex
kg_index = KnowledgeGraphIndex.from_documents(documents)
kg_retriever = kg_index.as_retriever(
include_text=True,
retriever_mode="keyword"
)
Prompt Engineering
Custom Prompts
from llama_index.core import PromptTemplate
qa_prompt_tmpl = """
Context information is below.
---------------------
{context_str}
---------------------
Given the context information and not prior knowledge, answer the query.
Query: {query_str}
Answer: """
qa_prompt = PromptTemplate(qa_prompt_tmpl)
query_engine = index.as_query_engine(
text_qa_template=qa_prompt
)
Advanced Prompt Functions
from llama_index.core.prompts import PromptTemplate
def custom_prompt_fn(query_str, **kwargs):
"""Dynamically inject few-shot examples"""
examples = retrieve_similar_examples(query_str)
examples_str = "\n".join(examples)
return f"""
Here are some examples:
{examples_str}
Now answer: {query_str}
"""
query_engine = index.as_query_engine(
text_qa_template=PromptTemplate(custom_prompt_fn)
)
Observability & Debugging
Callback Instrumentation
from llama_index.core.callbacks import CallbackManager, LlamaDebugHandler
llama_debug = LlamaDebugHandler(print_trace_on_end=True)
callback_manager = CallbackManager([llama_debug])
Settings.callback_manager = callback_manager
# Detailed execution traces
response = query_engine.query(query_str)
Event Tracking
from llama_index.core.callbacks import CBEventType, EventPayload
# Retrieve specific events
for i, (start_event, end_event) in enumerate(
llama_debug.get_event_pairs(CBEventType.SUB_QUESTION)
):
qa_pair = end_event.payload[EventPayload.SUB_QUESTION]
print(f"Sub Question {i}: {qa_pair.sub_q.sub_question}")
print(f"Answer: {qa_pair.answer}")
Custom Callbacks
from llama_index.core.callbacks.base import BaseCallbackHandler
class MetricsCallback(BaseCallbackHandler):
def on_event_start(self, event_type, payload, **kwargs):
if event_type == CBEventType.RETRIEVE:
print(f"Retrieval started: {payload}")
def on_event_end(self, event_type, payload, **kwargs):
if event_type == CBEventType.RETRIEVE:
print(f"Retrieval completed: {payload}")
Workflow Orchestration
Sequential Workflow
async def rag_workflow(query: str):
# Step 1: Retrieve
nodes = retriever.retrieve(query)
# Step 2: Rerank
reranked_nodes = reranker.postprocess_nodes(nodes, query)
# Step 3: Synthesize
response = synthesizer.synthesize(query, reranked_nodes)
# Step 4: Evaluate
evaluation = evaluator.evaluate(query, response)
return response, evaluation
Parallel Workflow
async def parallel_rag_workflow(query: str):
# Parallel retrieval from multiple sources
results = await asyncio.gather(
vector_retriever.aretrieve(query),
bm25_retriever.aretrieve(query),
kg_retriever.aretrieve(query)
)
# Combine and rerank
all_nodes = [node for result in results for node in result]
reranked = reranker.postprocess_nodes(all_nodes, query)
# Synthesize
response = await synthesizer.asynthesize(query, reranked)
return response
Production Deployment Patterns
API Server
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class QueryRequest(BaseModel):
query: str
top_k: int = 5
@app.post("/query")
async def query_endpoint(request: QueryRequest):
query_engine = index.as_query_engine(similarity_top_k=request.top_k)
response = await query_engine.aquery(request.query)
return {"response": str(response)}
Streaming Server
from fastapi.responses import StreamingResponse
@app.post("/query/stream")
async def query_stream(request: QueryRequest):
query_engine = index.as_query_engine(streaming=True)
response = query_engine.query(request.query)
async def generate():
for text in response.response_gen:
yield text
return StreamingResponse(generate(), media_type="text/plain")
Batch Processing
async def batch_query_processing(queries: List[str]):
"""Process multiple queries efficiently"""
# Batch retrieval
all_nodes = await asyncio.gather(*[
retriever.aretrieve(query) for query in queries
])
# Batch synthesis
responses = await asyncio.gather(*[
synthesizer.asynthesize(query, nodes)
for query, nodes in zip(queries, all_nodes)
])
return responses
Relevance to Your Pipelines
Current Implementation
src/agentic_retriever/:
- ✅ Agentic retrieval with LLM routing
- Opportunity: Expand to multi-agent architecture
src/17_query_planning_agent.py:
- ✅ Query planning implementation
- Opportunity: Add sub-question decomposition
src-iLand/retrieval/router.py:
- ✅ Two-stage routing (index + strategy)
- ✅ LLM-based strategy selection
- Opportunity: Convert to full agent with memory
Integration Opportunities
-
Convert Router to Agent:
# Transform router into FunctionAgent agent = FunctionAgent( tools=[vector_tool, bm25_tool, hybrid_tool, metadata_tool], llm=llm, memory=ChatMemoryBuffer.from_defaults() ) -
Multi-Agent iLand System:
# Specialized agents deed_agent = FunctionAgent(tools=[deed_search_tool]) legal_agent = FunctionAgent(tools=[legal_interpretation_tool]) geo_agent = FunctionAgent(tools=[geographic_search_tool]) # Coordinator coordinator = AgentWorkflow(agents=[deed_agent, legal_agent, geo_agent]) -
Add Sub-Question Decomposition:
# For complex queries query_engine = SubQuestionQueryEngine.from_defaults( query_engine_tools=retriever_tools, use_async=True ) -
Streaming CLI:
# Add streaming to retrieval CLI query_engine = index.as_query_engine(streaming=True) response = query_engine.query(user_query) for text in response.response_gen: print(text, end="", flush=True)