Video RAG Agent
A complete recipe that combines video processing, document/transcript retrieval, and a tool-calling agent into one pipeline. Insert a video and a question — the agent automatically searches frames, transcripts, and documents to answer it.
Full Pipeline
import pixeltable as pxt
from pixeltable.functions.video import frame_iterator, extract_audio
from pixeltable.functions.audio import audio_splitter
from pixeltable.functions.string import string_splitter
from pixeltable.functions.openai import chat_completions, transcriptions
from pixeltable.functions.huggingface import clip, sentence_transformer
from pixeltable.functions.anthropic import messages, invoke_tools
from pixeltable.functions import image as pxt_image
from datetime import datetime
pxt.create_dir('vrag', if_exists='ignore')
# ── 1. Video ingestion ──────────────────────────────────────────────
videos = pxt.create_table('vrag.videos', {
'video': pxt.Video,
'title': pxt.String,
}, if_exists='ignore')
# ── 2. Keyframe extraction + CLIP visual search ─────────────────────
frames = pxt.create_view('vrag.frames', videos,
iterator=frame_iterator(videos.video, keyframes_only=True),
if_exists='ignore')
frames.add_computed_column(
thumbnail=pxt_image.b64_encode(
pxt_image.thumbnail(frames.frame, size=(320, 320))),
if_exists='ignore')
frames.add_embedding_index('frame',
embedding=clip.using(model_id='openai/clip-vit-base-patch32'),
if_exists='ignore')
# Describe each frame with a vision LLM
frames.add_computed_column(
description=chat_completions(
messages=[{
'role': 'user',
'content': [
{'type': 'text', 'text': 'Describe this video frame in one sentence.'},
{'type': 'image_url', 'image_url': {'url': frames.frame}}
]
}],
model='gpt-4o-mini'
).choices[0].message.content,
if_exists='ignore')
# ── 3. Audio extraction → transcription → sentence embedding ────────
videos.add_computed_column(
audio=extract_audio(videos.video, format='mp3'),
if_exists='ignore')
audio_chunks = pxt.create_view('vrag.audio_chunks', videos,
iterator=audio_splitter(audio=videos.audio, duration=30.0),
if_exists='ignore')
audio_chunks.add_computed_column(
transcription=transcriptions(
audio=audio_chunks.audio_chunk, model='whisper-1'),
if_exists='ignore')
sentences = pxt.create_view('vrag.sentences',
audio_chunks.where(audio_chunks.transcription != None),
iterator=string_splitter(
text=audio_chunks.transcription.text, separators='sentence'),
if_exists='ignore')
embed_fn = sentence_transformer.using(model_id='all-MiniLM-L6-v2')
sentences.add_embedding_index('text', string_embed=embed_fn, if_exists='ignore')
# ── 4. Query functions (become agent tools) ──────────────────────────
@pxt.query
def search_video_frames(query_text: str):
"""Search video frames by visual similarity using CLIP."""
sim = frames.frame.similarity(string=query_text)
return frames.order_by(sim, asc=False).limit(10).select(
frames.description, frames.thumbnail, sim=sim)
@pxt.query
def search_transcripts(query_text: str):
"""Search video transcripts by semantic similarity."""
sim = sentences.text.similarity(string=query_text)
return sentences.where(sim > 0.5).order_by(sim, asc=False).select(
sentences.text, sim=sim).limit(20)
@pxt.udf
def web_search(keywords: str) -> str:
"""Search the web for additional context."""
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.news(keywords=keywords, max_results=5))
return '\n'.join(
f"{r['title']}: {r['body']}" for r in results
) if results else 'No results.'
# ── 5. Context assembly ─────────────────────────────────────────────
@pxt.udf
def assemble_context(
question: str,
tool_outputs: list | None,
transcript_context: list | None,
frame_context: list | None,
) -> str:
parts = [f"QUESTION: {question}"]
tool_str = str(tool_outputs) if tool_outputs else 'N/A'
parts.append(f"\n[TOOL RESULTS]\n{tool_str}")
if transcript_context:
transcript_str = '\n'.join(
f"- {item.get('text', '')}"
for item in transcript_context if isinstance(item, dict)
) or 'N/A'
else:
transcript_str = 'N/A'
parts.append(f"\n[TRANSCRIPT EXCERPTS]\n{transcript_str}")
if frame_context:
frame_str = '\n'.join(
f"- {item.get('description', '')}"
for item in frame_context if isinstance(item, dict)
) or 'N/A'
else:
frame_str = 'N/A'
parts.append(f"\n[VISUAL DESCRIPTIONS]\n{frame_str}")
return '\n'.join(parts)
# ── 6. Agent pipeline ───────────────────────────────────────────────
tools = pxt.tools(web_search, search_transcripts, search_video_frames)
agent = pxt.create_table('vrag.agent', {
'prompt': pxt.String,
'timestamp': pxt.Timestamp,
'system_prompt': pxt.String,
'max_tokens': pxt.Int,
'temperature': pxt.Float,
}, if_exists='ignore')
# Step 1: Initial LLM call — tool selection
agent.add_computed_column(
initial_response=messages(
model='claude-sonnet-4-20250514',
messages=[{'role': 'user', 'content': [{'type': 'text', 'text': agent.prompt}]}],
tools=tools,
tool_choice=tools.choice(required=True),
max_tokens=agent.max_tokens,
model_kwargs={'system': agent.system_prompt, 'temperature': agent.temperature},
), if_exists='ignore')
# Step 2: Execute the tools the LLM selected
agent.add_computed_column(
tool_output=invoke_tools(tools, agent.initial_response),
if_exists='ignore')
# Step 3: RAG context from transcripts and frames
agent.add_computed_column(
transcript_context=search_transcripts(agent.prompt),
if_exists='ignore')
agent.add_computed_column(
frame_context=search_video_frames(agent.prompt),
if_exists='ignore')
# Step 4: Assemble all context
agent.add_computed_column(
context=assemble_context(
agent.prompt, agent.tool_output,
agent.transcript_context, agent.frame_context),
if_exists='ignore')
# Step 5: Final LLM call with full context
agent.add_computed_column(
final_response=messages(
model='claude-sonnet-4-20250514',
messages=[{'role': 'user', 'content': [{'type': 'text', 'text': agent.context}]}],
max_tokens=agent.max_tokens,
model_kwargs={
'system': 'Answer based on the video transcripts, visual descriptions, and tool results. Cite timestamps when possible.',
'temperature': agent.temperature,
},
), if_exists='ignore')
# Step 6: Extract answer
agent.add_computed_column(
answer=agent.final_response.content[0].text,
if_exists='ignore')
Usage
# Insert videos
videos.insert([
{'video': 'lecture.mp4', 'title': 'ML Lecture'},
{'video': 'https://example.com/demo.mp4', 'title': 'Product Demo'},
])
# Ask a question — the full pipeline runs automatically
agent.insert([{
'prompt': 'What visual examples does the lecturer use to explain gradient descent?',
'timestamp': datetime.now(),
'system_prompt': 'Use search_video_frames for visual content and search_transcripts for spoken content.',
'max_tokens': 1024,
'temperature': 0.7,
}])
result = agent.order_by(agent.timestamp, asc=False).limit(1).select(agent.answer).collect()
How It Works
The pipeline is a chain of computed columns. Inserting a row into agent triggers these steps automatically:
- Initial LLM call — Claude selects which tools to call (transcript search, frame search, web search)
- Tool execution —
invoke_tools()runs the selected@pxt.query/@pxt.udffunctions - RAG retrieval — Transcript and frame similarity searches run in parallel as computed columns
- Context assembly — A UDF merges tool outputs, transcript excerpts, and visual descriptions
- Final LLM call — Claude synthesizes everything into a grounded answer
Key building blocks
| Concept | Function | Purpose |
|---|---|---|
frame_iterator | pxt.create_view(..., iterator=frame_iterator(...)) | Extract video keyframes |
audio_splitter | pxt.create_view(..., iterator=audio_splitter(...)) | Split audio into chunks |
transcriptions | t.add_computed_column(transcription=transcriptions(...)) | Transcribe audio chunks |
string_splitter | pxt.create_view(..., iterator=string_splitter(...)) | Split transcript into sentences |
add_embedding_index | t.add_embedding_index('col', embedding=fn) | Enable similarity search |
@pxt.query | def search_transcripts(query_text: str): ... | Reusable retrieval + agent tool |
pxt.tools() | tools = pxt.tools(fn1, fn2) | Bundle functions as LLM tools |
invoke_tools() | invoke_tools(tools, response) | Execute the tools the LLM chose |
Adapting This Recipe
- Swap providers: Replace
messages(Anthropic) withchat_completions(OpenAI/Together/etc.) — see providers.md for import and output shapes - Add document RAG: Add a
document_splitterview and asearch_documentsquery function to the tools list - Use local models: Replace OpenAI transcription with
whisper.transcribe()and useollama.chat_completionsfor the LLM — see workflows.md → Local LLM Pipeline - Serve via API: Wrap the pipeline in a FastAPI endpoint — see workflows.md → FastAPI App Pattern