Agents API Reference
Complete API documentation for DIMOS agent classes.
Table of Contents
Agent Base Classes
Agent
Base class for all DIMOS agents, managing memory and subscriptions.
class Agent:
def __init__(self,
dev_name: str = "NA",
agent_type: str = "Base",
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
pool_scheduler: Optional[ThreadPoolScheduler] = None)
Parameters:
dev_name(str): Agent identifier for logging and debuggingagent_type(str): Type classification (e.g., "Base", "Vision", "Planning")agent_memory(AbstractAgentSemanticMemory): Optional semantic memory systempool_scheduler(ThreadPoolScheduler): Thread pool for async operations. If None, uses global scheduler
Attributes:
dev_name(str): Agent nameagent_type(str): Agent typeagent_memory(AbstractAgentSemanticMemory): Memory instancedisposables(CompositeDisposable): Resource cleanup managerpool_scheduler(ThreadPoolScheduler): Execution scheduler
Methods:
dispose_all()
def dispose_all(self) -> None
Disposes of all active subscriptions managed by the agent.
LLMAgent
Abstract base class for LLM-based agents.
class LLMAgent(Agent):
def __init__(self,
dev_name: str = "NA",
input_query_stream: Optional[Observable] = None,
input_video_stream: Optional[Observable] = None,
system_query: str = "",
skills: Union[AbstractSkill, List[AbstractSkill], SkillLibrary] = None,
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
pool_scheduler: Optional[ThreadPoolScheduler] = None)
Additional Parameters:
input_query_stream(Observable[str]): Stream of text queriesinput_video_stream(Observable[np.ndarray]): Stream of video framessystem_query(str): System prompt defining agent behaviorskills(Union[AbstractSkill, List, SkillLibrary]): Available skills/tools
Additional Attributes:
skill_library(SkillLibrary): Registered skillsresponse_subject(Subject): Observable for agent responses
Additional Methods:
get_response_observable()
def get_response_observable(self) -> Observable
Get observable stream of agent responses for chaining.
Returns: Observable[str] emitting agent responses
Example:
planner = PlanningAgent(...)
executor = OpenAIAgent(
input_query_stream=planner.get_response_observable()
)
OpenAIAgent
Agent powered by OpenAI's GPT models (GPT-4, GPT-4o, GPT-4o-mini, etc.).
Constructor
class OpenAIAgent(LLMAgent):
def __init__(self,
dev_name: str = "NA",
input_query_stream: Optional[Observable] = None,
input_video_stream: Optional[Observable] = None,
input_video_url_stream: Optional[Observable] = None,
system_query: str = "",
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
max_input_tokens_per_request: int = 2000,
max_output_tokens_per_request: int = 2000,
model_name: str = "gpt-4o-mini",
prompt_builder: Optional[PromptBuilder] = None,
tokenizer: Optional[AbstractTokenizer] = None,
rag_query_n: int = 4,
rag_similarity_threshold: Optional[float] = None,
skills: Union[AbstractSkill, List[AbstractSkill], SkillLibrary] = None,
response_model: Optional[BaseModel] = None,
frame_processor: Optional[FrameProcessor] = None,
image_detail: str = "auto",
pool_scheduler: Optional[ThreadPoolScheduler] = None,
process_all_inputs: Optional[bool] = None)
Parameters
Input/Output
input_query_stream(Observable[str]): Text query input streaminput_video_stream(Observable[np.ndarray]): Video frame stream for vision modelsinput_video_url_stream(Observable[str]): Video URL stream (alternative to frame stream)system_query(str): System prompt defining agent persona and behavior
Model Configuration
-
model_name(str): OpenAI model identifier- Text-only:
"gpt-4o-mini","gpt-4" - Vision:
"gpt-4o","gpt-4-turbo","gpt-4o-mini" - Default:
"gpt-4o-mini"
- Text-only:
-
max_input_tokens_per_request(int): Maximum tokens for input context (default: 2000) -
max_output_tokens_per_request(int): Maximum tokens for response (default: 2000)
Vision Configuration
-
image_detail(str): Image processing quality"low": Faster, cheaper, less detail (512x512)"high": Slower, more expensive, better detail"auto": Automatic based on image size (default)
-
frame_processor(FrameProcessor): Custom video frame processing
Memory & RAG
agent_memory(AbstractAgentSemanticMemory): Semantic memory instancerag_query_n(int): Number of memory items to retrieve (default: 4)rag_similarity_threshold(float): Minimum similarity for retrieval (0-1, optional)
Skills
skills(Union[AbstractSkill, List, SkillLibrary]): Available skills for tool calling- Single skill class:
skills=MySkill - List of skills:
skills=[Skill1, Skill2] - Skill library:
skills=MySkillLibrary()
- Single skill class:
Advanced
prompt_builder(PromptBuilder): Custom prompt construction logictokenizer(AbstractTokenizer): Custom token counterresponse_model(BaseModel): Pydantic model for structured outputspool_scheduler(ThreadPoolScheduler): Custom thread poolprocess_all_inputs(bool): Whether to process all inputs or skip when busy
Methods
run_query()
def run_query(self, query: str,
base64_image: Optional[str] = None,
thinking_budget_tokens: int = 0) -> str
Execute a single query synchronously.
Parameters:
query(str): Text querybase64_image(str): Optional base64-encoded imagethinking_budget_tokens(int): Extended thinking tokens (o1 models)
Returns: str - Agent response
Example:
agent = OpenAIAgent(
dev_name="QueryAgent",
skills=robot.get_skills(),
model_name="gpt-4o-mini"
)
response = agent.run_query("Move forward 2 meters")
print(response)
run_observable_query()
def run_observable_query(self, query: str,
base64_image: Optional[str] = None,
thinking_budget_tokens: int = 0) -> Observable
Execute query and return response as observable.
Parameters: Same as run_query()
Returns: Observable[str] - Response stream
Example:
response_stream = agent.run_observable_query("What do you see?")
response_stream.subscribe(
on_next=lambda r: print(f"Response: {r}")
)
Usage Examples
Basic Text Agent
from dimos.agents.agent import OpenAIAgent
from reactivex.subject import Subject
query_subject = Subject()
agent = OpenAIAgent(
dev_name="TextAgent",
input_query_stream=query_subject.pipe(),
system_query="You are a helpful assistant.",
model_name="gpt-4o-mini"
)
query_subject.on_next("Hello!")
Vision Agent
agent = OpenAIAgent(
dev_name="VisionAgent",
input_query_stream=query_stream,
input_video_stream=robot.get_ros_video_stream(),
system_query="Describe what you see and navigate safely.",
model_name="gpt-4o",
image_detail="high"
)
Agent with Skills
agent = OpenAIAgent(
dev_name="SkillAgent",
input_query_stream=query_stream,
skills=robot.get_skills(),
system_query="Execute user commands using your skills.",
model_name="gpt-4o-mini"
)
Agent with Memory
from dimos.agents.memory.chroma_impl import OpenAISemanticMemory
memory = OpenAISemanticMemory(collection_name="robot_memory")
memory.create()
agent = OpenAIAgent(
dev_name="MemoryAgent",
input_query_stream=query_stream,
agent_memory=memory,
rag_query_n=5,
rag_similarity_threshold=0.7,
model_name="gpt-4o"
)
PlanningAgent
Specialized agent for breaking down complex tasks into executable steps.
Constructor
class PlanningAgent(OpenAIAgent):
def __init__(self,
dev_name: str = "PlanningAgent",
input_query_stream: Optional[Observable] = None,
skills: Union[AbstractSkill, List[AbstractSkill], SkillLibrary] = None,
model_name: str = "gpt-4o",
system_query: str = None,
**kwargs)
Parameters
dev_name(str): Agent identifier (default: "PlanningAgent")input_query_stream(Observable[str]): User queriesskills(SkillLibrary): Available skills for planningmodel_name(str): LLM model (default: "gpt-4o")system_query(str): Custom system prompt (optional)**kwargs: Additional OpenAIAgent parameters
Default System Prompt
If not provided, uses a specialized planning prompt that instructs the agent to:
- Break down complex tasks into steps
- Consider available skills
- Create logical, executable sequences
- Output one step at a time
Usage Example
from dimos.agents.planning_agent import PlanningAgent
from dimos.agents.agent import OpenAIAgent
# Planning agent
planner = PlanningAgent(
dev_name="MissionPlanner",
input_query_stream=web_interface.query_stream,
skills=robot.get_skills(),
model_name="gpt-4o"
)
# Execution agent
executor = OpenAIAgent(
dev_name="StepExecutor",
input_query_stream=planner.get_response_observable(),
skills=robot.get_skills(),
model_name="gpt-4o-mini",
system_query="Execute the given step."
)
# User: "Patrol the perimeter"
# Planner outputs: "Step 1: Move forward 5 meters"
# Executor executes: Move(distance=5)
# Planner outputs: "Step 2: Turn left 90 degrees"
# Executor executes: SpinLeft(degrees=90)
# ...
ClaudeAgent
Agent powered by Anthropic's Claude models.
Constructor
class ClaudeAgent(LLMAgent):
def __init__(self,
dev_name: str = "NA",
input_query_stream: Optional[Observable] = None,
input_video_stream: Optional[Observable] = None,
system_query: str = "",
agent_memory: Optional[AbstractAgentSemanticMemory] = None,
max_input_tokens_per_request: int = 2000,
max_output_tokens_per_request: int = 2000,
model_name: str = "claude-3-5-sonnet-20241022",
rag_query_n: int = 4,
rag_similarity_threshold: Optional[float] = None,
skills: Union[AbstractSkill, List[AbstractSkill], SkillLibrary] = None,
thinking_budget_tokens: int = 0,
pool_scheduler: Optional[ThreadPoolScheduler] = None)
Parameters
Similar to OpenAIAgent with Claude-specific differences:
-
model_name(str): Claude model identifier"claude-3-5-sonnet-20241022"(default)"claude-3-opus-20240229""claude-3-sonnet-20240229""claude-3-haiku-20240307"
-
thinking_budget_tokens(int): Extended thinking tokens for reasoning
Key Differences from OpenAI
- Tool calling format: Uses Claude's native tool format
- Vision handling: Different image format requirements
- API authentication: Requires
ANTHROPIC_API_KEYenvironment variable
Usage Example
from dimos.agents.claude_agent import ClaudeAgent
agent = ClaudeAgent(
dev_name="ClaudeNavigator",
input_query_stream=query_stream,
input_video_stream=robot.get_ros_video_stream(),
skills=robot.get_skills(),
system_query="You are a navigation assistant.",
model_name="claude-3-5-sonnet-20241022"
)
Memory Interfaces
AbstractAgentSemanticMemory
Base interface for memory implementations.
class AbstractAgentSemanticMemory(ABC):
def __init__(self, connection_type: str = 'local')
@abstractmethod
def connect(self)
@abstractmethod
def create(self)
@abstractmethod
def add_vector(self, vector_id: str, vector_data: str)
@abstractmethod
def get_vector(self, vector_id: str)
@abstractmethod
def query(self, query_texts: str,
n_results: int = 4,
similarity_threshold: Optional[float] = None)
@abstractmethod
def delete_vector(self, vector_id: str)
OpenAISemanticMemory
ChromaDB-backed memory using OpenAI embeddings.
class OpenAISemanticMemory(ChromaAgentSemanticMemory):
def __init__(self,
collection_name: str = "my_collection",
model: str = "text-embedding-3-large",
dimensions: int = 1024)
Parameters:
collection_name(str): ChromaDB collection namemodel(str): OpenAI embedding modeldimensions(int): Embedding vector dimensions
Example:
from dimos.agents.memory.chroma_impl import OpenAISemanticMemory
memory = OpenAISemanticMemory(
collection_name="robot_observations",
model="text-embedding-3-large",
dimensions=1024
)
memory.create()
# Store
memory.add_vector("obs1", "Saw a red ball in the living room")
# Query
results = memory.query("objects in living room", n_results=5)
LocalSemanticMemory
ChromaDB-backed memory using local SentenceTransformer models.
class LocalSemanticMemory(ChromaAgentSemanticMemory):
def __init__(self,
collection_name: str = "my_collection",
model_name: str = "sentence-transformers/all-MiniLM-L6-v2")
Parameters:
collection_name(str): ChromaDB collection namemodel_name(str): HuggingFace model identifier
Example:
from dimos.agents.memory.chroma_impl import LocalSemanticMemory
memory = LocalSemanticMemory(
collection_name="local_memory",
model_name="sentence-transformers/all-mpnet-base-v2"
)
memory.create()
Common Patterns
Agent Chaining
# Plan → Execute pattern
planner = PlanningAgent(
input_query_stream=user_input,
skills=robot.get_skills()
)
executor = OpenAIAgent(
input_query_stream=planner.get_response_observable(),
skills=robot.get_skills()
)
Multi-Input Agent
from dimos.stream.stream_merger import create_stream_merger
# Merge text and video
merged = create_stream_merger(
text_stream=query_stream,
video_stream=video_stream,
scheduler=pool_scheduler
)
agent = OpenAIAgent(
input_query_stream=merged,
skills=robot.get_skills(),
model_name="gpt-4o"
)
Response Monitoring
agent = OpenAIAgent(...)
# Subscribe to responses
agent.get_response_observable().subscribe(
on_next=lambda r: print(f"Agent said: {r}"),
on_error=lambda e: print(f"Error: {e}")
)
Error Handling
All agents handle errors gracefully:
agent.get_response_observable().subscribe(
on_next=lambda response: handle_response(response),
on_error=lambda error: handle_error(error)
)
def handle_error(error):
if isinstance(error, TimeoutError):
print("Agent timed out")
elif isinstance(error, APIError):
print("API error occurred")
else:
print(f"Unexpected error: {error}")
Environment Variables
Required environment variables:
# OpenAI
export OPENAI_API_KEY="sk-..."
# Anthropic (for ClaudeAgent)
export ANTHROPIC_API_KEY="sk-ant-..."
# Optional: API base URL override
export OPENAI_API_BASE="https://custom.api.endpoint"
Related Documentation: