Agent Hand-off Protocols & Communication
Memory File: Agents
Version: v1.0
Last Updated: 2026-03-31
Agent Communication Rules
Redis Stream Architecture
# Stream naming convention
stream_name = "{source}_{target}" # e.g., "signals_reasoning"
consumer_group = "{target}_agents" # e.g., "reasoning_agents"
# Standard event envelope
event = {
"type": "signal_data|decision|execution|grade",
"data": {...},
"trace_id": "uuid-string",
"timestamp": "ISO-8601",
"source_agent": "SignalGenerator|ReasoningAgent|..."
}
Trace ID Propagation (MANDATORY)
from api.constants import FieldName
# ALL agents must follow this pattern
async def process_event(self, event_data: dict) -> None:
# 1. Extract incoming trace_id — use FieldName, never string literals
incoming_trace_id = event_data.get(FieldName.TRACE_ID)
# 2. Generate new trace_id for this processing
current_trace_id = str(uuid.uuid4())
# 3. Log the handoff
log_structured("info", "agent handoff",
agent=self.agent_id,
incoming_trace_id=incoming_trace_id,
current_trace_id=current_trace_id)
# 4. Process with current_trace_id
result = await self._do_work(event_data, current_trace_id)
# 5. Publish with current_trace_id — FieldName keys on writes too
await self.redis.publish("output_stream", {
FieldName.TYPE: "agent_result",
FieldName.DATA: result,
FieldName.TRACE_ID: current_trace_id,
FieldName.SOURCE: self.agent_id,
})
Dict Key Rule (NO RAW STRINGS IN PAYLOADS)
Every read/write of an agent payload or DB row dict must go through FieldName.
Raw string keys (data["side"], event.get("trace_id")) are an anti-pattern — they
silently break under field renames and hide producer/consumer drift.
# ❌ WRONG
side = data.get("side") or data.get("action")
symbol = pos["symbol"]
# ✅ RIGHT
side = data.get(FieldName.SIDE) or data.get(FieldName.ACTION)
symbol = pos[FieldName.SYMBOL]
If the field isn't in class FieldName yet, add it there first, then reference it.
Specific Hand-off Patterns
SignalGenerator → ReasoningAgent
# SignalGenerator output (signals stream)
signal_event = {
"type": "trading_signal",
"data": {
"symbol": "BTC/USD",
"signal_type": "momentum_buy",
"confidence": 0.85,
"indicators": {
"rsi": 35.2,
"macd_cross": True,
"volume_spike": 1.5
}
},
"trace_id": trace_id
}
ReasoningAgent → ExecutionEngine
# ReasoningAgent output (decisions stream)
decision_event = {
"type": "trading_decision",
"data": {
"symbol": "BTC/USD",
"action": "buy",
"quantity": 0.1,
"order_type": "market",
"reasoning": "Momentum signal with RSI oversold",
"confidence": 0.82,
"risk_metrics": {
"atr_stop": 42000,
"position_size_pct": 0.05,
"rr_ratio": 2.0
}
},
"trace_id": trace_id
}
ExecutionEngine → GradeAgent
# ExecutionEngine output (executions stream)
execution_event = {
"type": "trade_execution",
"data": {
"order_id": "uuid",
"symbol": "BTC/USD",
"side": "buy",
"quantity": 0.1,
"fill_price": 43250.75,
"execution_time_ms": 1200,
"broker_fees": 0.43,
"slippage_bps": 2.1
},
"trace_id": trace_id
}
Agent State Synchronization
AgentStateRegistry Integration
# Every agent must register and update state
from api.services.agent_state import AgentStateRegistry
class YourAgent:
def __init__(self, redis_client):
self.redis = redis_client
self.agent_id = "YourAgent"
self.state_registry = AgentStateRegistry(redis_client)
async def process_event(self, event_data: dict) -> None:
# Record event start
await self.state_registry.record_event(self.agent_id)
# ... process event ...
# Update heartbeat
await self.state_registry.update_heartbeat(self.agent_id)
Consumer Group Management
# Standard consumer group setup
async def setup_consumer_groups():
groups = {
"signals": ["reasoning_agents"],
"decisions": ["execution_agents"],
"executions": ["grading_agents"],
"grades": ["learning_agents"]
}
for stream, consumer_groups in groups.items():
for group in consumer_groups:
# Use positional arguments (CRITICAL for tests)
await redis.xgroup_create(stream, group, "$", mkstream=True)
Error Recovery Patterns
Dead Letter Queue Handling
# Failed events go to DLQ after retries
async def handle_processing_error(event_data: dict, error: Exception):
await redis.xadd("dlq_events", {
"failed_event": json.dumps(event_data),
"error": str(error),
"timestamp": datetime.now(timezone.utc).isoformat(),
"retry_count": event_data.get("retry_count", 0)
})
Circuit Breaker per Agent
class AgentCircuitBreaker:
def __init__(self, failure_threshold: int = 5):
self.failure_threshold = failure_threshold
self.failure_count = 0
self.last_failure_time = None
async def call(self, func, *args, **kwargs):
if self.is_open():
log_structured("warning", "circuit breaker open",
agent=self.agent_id)
return None
try:
result = await func(*args, **kwargs)
self.reset()
return result
except Exception as exc:
self.record_failure()
log_structured("error", "agent circuit breaker triggered",
agent=self.agent_id,
exc_info=True)
raise