GenAI Agents with ResponsesAgent
Build and deploy LLM-powered agents using MLflow 3's ResponsesAgent interface.
ResponsesAgent Overview
ResponsesAgent is MLflow 3's recommended interface for building conversational agents. It provides:
- Standardized input/output format (OpenAI-compatible)
- Streaming support
- Integration with Databricks features (tracing, evaluation)
Basic Agent Structure
# agent.py
import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from typing import Generator
class MyAgent(ResponsesAgent):
def __init__(self):
from databricks_langchain import ChatDatabricks
self.llm = ChatDatabricks(endpoint="databricks-meta-llama-3-3-70b-instruct")
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""Non-streaming prediction."""
messages = [{"role": m.role, "content": m.content} for m in request.input]
response = self.llm.invoke(messages)
# MUST use helper methods for output items
return ResponsesAgentResponse(
output=[self.create_text_output_item(text=response.content, id="msg_1")]
)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""Streaming prediction."""
# Collect from non-streaming for simplicity
result = self.predict(request)
for item in result.output:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=item
)
# Export for MLflow
AGENT = MyAgent()
mlflow.models.set_model(AGENT)
LangGraph Agent Pattern
For agents with tools and complex logic, use LangGraph:
# agent.py
import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
output_to_responses_items_stream,
to_chat_completions_input,
)
from databricks_langchain import ChatDatabricks, UCFunctionToolkit
from langchain_core.messages import AIMessage
from langchain_core.runnables import RunnableLambda
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt.tool_node import ToolNode
from typing import Annotated, Any, Generator, Sequence, TypedDict
# Configuration
LLM_ENDPOINT = "databricks-meta-llama-3-3-70b-instruct"
SYSTEM_PROMPT = "You are a helpful assistant."
# State definition
class AgentState(TypedDict):
messages: Annotated[Sequence, add_messages]
class LangGraphAgent(ResponsesAgent):
def __init__(self):
self.llm = ChatDatabricks(endpoint=LLM_ENDPOINT)
self.tools = []
# Add UC Function tools
# uc_toolkit = UCFunctionToolkit(function_names=["catalog.schema.function"])
# self.tools.extend(uc_toolkit.tools)
self.llm_with_tools = self.llm.bind_tools(self.tools) if self.tools else self.llm
def _build_graph(self):
def should_continue(state):
last = state["messages"][-1]
if isinstance(last, AIMessage) and last.tool_calls:
return "tools"
return "end"
def call_model(state):
messages = [{"role": "system", "content": SYSTEM_PROMPT}] + state["messages"]
response = self.llm_with_tools.invoke(messages)
return {"messages": [response]}
graph = StateGraph(AgentState)
graph.add_node("agent", RunnableLambda(call_model))
if self.tools:
graph.add_node("tools", ToolNode(self.tools))
graph.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
graph.add_edge("tools", "agent")
else:
graph.add_edge("agent", END)
graph.set_entry_point("agent")
return graph.compile()
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Collect output items from streaming
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(output=outputs)
# Helper methods inherited from ResponsesAgent:
# - self.create_text_output_item(text, id) - for text responses
# - self.create_function_call_item(id, call_id, name, arguments) - for tool calls
# - self.create_function_call_output_item(call_id, output) - for tool results
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
messages = to_chat_completions_input([m.model_dump() for m in request.input])
graph = self._build_graph()
for event in graph.stream({"messages": messages}, stream_mode=["updates"]):
if event[0] == "updates":
for node_data in event[1].values():
if node_data.get("messages"):
yield from output_to_responses_items_stream(node_data["messages"])
# Export
mlflow.langchain.autolog()
AGENT = LangGraphAgent()
mlflow.models.set_model(AGENT)
Using Databricks-Hosted Models
Use exact endpoint names from the reference table in SKILL.md.
from databricks_langchain import ChatDatabricks
# Foundation Model APIs (pay-per-token) - use exact endpoint names
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-3-70b-instruct")
llm = ChatDatabricks(endpoint="databricks-claude-sonnet-4-6")
llm = ChatDatabricks(endpoint="databricks-gpt-5-1")
llm = ChatDatabricks(endpoint="databricks-gemini-3-flash")
# Custom fine-tuned model endpoint
llm = ChatDatabricks(endpoint="my-finetuned-model-endpoint")
# With parameters
llm = ChatDatabricks(
endpoint="databricks-meta-llama-3-3-70b-instruct",
temperature=0.1,
max_tokens=1000,
)
ChatContext for User/Conversation Info
from mlflow.types.responses import ResponsesAgentRequest, ChatContext
# Request with context
request = ResponsesAgentRequest(
input=[{"role": "user", "content": "Hello!"}],
context=ChatContext(
user_id="user@company.com",
conversation_id="conv-123"
)
)
# Access in agent
class MyAgent(ResponsesAgent):
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
user_id = request.context.user_id if request.context else None
conv_id = request.context.conversation_id if request.context else None
# Use for personalization, memory, etc.
Testing the Agent Locally
# test_agent.py
from agent import AGENT
from mlflow.types.responses import ResponsesAgentRequest, ChatContext
# Test request
request = ResponsesAgentRequest(
input=[{"role": "user", "content": "What is Databricks?"}],
context=ChatContext(user_id="test@example.com")
)
# Non-streaming
result = AGENT.predict(request)
print(result.model_dump(exclude_none=True))
# Streaming
for event in AGENT.predict_stream(request):
print(event)
Run via MCP:
run_python_file_on_databricks(file_path="./my_agent/test_agent.py")
Logging the Agent
See 6-logging-registration.md for full details.
import mlflow
from agent import AGENT, LLM_ENDPOINT
from mlflow.models.resources import DatabricksServingEndpoint
mlflow.set_registry_uri("databricks-uc")
resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT)]
with mlflow.start_run():
model_info = mlflow.pyfunc.log_model(
name="agent",
python_model="agent.py",
resources=resources,
pip_requirements=[
"mlflow==3.6.0",
"databricks-langchain",
"langgraph==0.3.4",
],
input_example={
"input": [{"role": "user", "content": "Hello!"}]
},
registered_model_name="main.agents.my_agent"
)
Deployment
See 7-deployment.md for async job-based deployment.
from databricks import agents
agents.deploy(
"main.agents.my_agent",
version="1",
tags={"source": "mcp"}
)
# Takes ~15 minutes
Query Deployed Agent
query_serving_endpoint(
name="my-agent-endpoint",
messages=[{"role": "user", "content": "What is Databricks?"}],
max_tokens=500
)