Agent Implementation Patterns
This document provides complete implementation patterns for the three-agent pipeline.
Overview
Our system uses three specialized agents in sequence:
- Scout Agent (Haiku) - Fast filter, 90% rejection rate
- Analyst Agent (Sonnet) - Deep analysis, generates trade thesis
- Risk Manager Agent (Sonnet) - Final approval, position sizing
Scout Agent (Haiku)
Purpose
Rapid triage of signals to filter obvious noise before expensive analysis.
Model
claude-3-5-haiku-20241022
Tools
None - Pure reasoning, no external tool calls
System Prompt
You are a financial market scout specializing in social sentiment analysis.
Your job is RAPID TRIAGE. You have 3 seconds to decide if a signal deserves deeper analysis.
EVALUATION CRITERIA:
1. Quality of Discourse
- Is this genuine analysis or spam?
- Are posts from established accounts or bots?
- Is there actual DD (due diligence) or just hype?
2. Signal Strength
- How many quality mentions (not just volume)?
- Is sentiment coherent or contradictory?
- Is there cross-platform correlation?
3. Timing & Catalyst
- Is there a news catalyst or just random pump?
- Does market data align with social sentiment?
- Is this early momentum or late FOMO?
4. Red Flags
- Pump & dump language ("moon", "to the moon", "buy now")
- Coordinated posting (same time, similar language)
- New accounts with high activity
- Contradictory price action (sentiment up, price down)
OUTPUT FORMAT (JSON only, no other text):
{
"score": <0-10>,
"reasoning": "<one sentence explaining score>",
"decision": "PASS" | "ESCALATE",
"red_flags": ["flag1", "flag2"] or []
}
Thresholds:
- Score 0-5: PASS (noise)
- Score 6-10: ESCALATE (worth deeper analysis)
BE DECISIVE. Err on the side of filtering. False positives are expensive.
Input Format
input_context = {
"ticker": "GME",
"signal_type": "momentum_spike",
"metrics": {
"mentions_15m": 47,
"quality_score_avg": 7.2,
"sentiment": 0.65,
"momentum": 2.5,
"cross_platform": True
},
"market_data": {
"price": 45.67,
"change_pct": 2.3,
"volume_vs_avg": 1.8
},
"top_post_summary": "User DeepFuckingValue (500K karma, 4yr account): 'GME - Updated DD on short interest...' (15K upvotes, 50 awards)"
}
Expected Output
{
"score": 8,
"reasoning": "High quality author with detailed DD, strong engagement, price action confirms social sentiment",
"decision": "ESCALATE",
"red_flags": []
}
Implementation
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
import json
class ScoutAgent:
def __init__(self):
self.options = ClaudeAgentOptions(
model="claude-3-5-haiku-20241022",
allowed_tools=[],
system_prompt="""[System prompt from above]""",
max_turns=1 # Single turn only
)
self.client = ClaudeSDKClient(self.options)
async def triage(self, signal: dict) -> dict:
"""Rapid triage of signal"""
prompt = f"""
Analyze this trading signal:
Ticker: {signal['ticker']}
Signal Type: {signal['signal_type']}
Metrics:
- 15m mentions: {signal['metrics']['mentions_15m']}
- Quality score: {signal['metrics']['quality_score_avg']}/10
- Sentiment: {signal['metrics']['sentiment']}
- Momentum: {signal['metrics']['momentum']}x
Market Data:
- Price: ${signal['market_data']['price']} ({signal['market_data']['change_pct']:+.1f}%)
- Volume: {signal['market_data']['volume_vs_avg']}x normal
Top Evidence:
{signal['top_post_summary']}
Provide JSON triage assessment.
"""
await self.client.connect()
await self.client.query(prompt)
response_text = ""
async for message in self.client.receive_response():
if hasattr(message, 'content'):
for block in message.content:
if hasattr(block, 'text'):
response_text = block.text
await self.client.disconnect()
# Parse JSON response
result = json.loads(response_text)
return result
Token Usage
- Input: ~300-500 tokens (condensed context)
- Output: ~50-100 tokens (JSON only)
- Cost: ~$0.0001 per signal
Analyst Agent (Sonnet)
Purpose
Deep analysis of high-quality signals to generate actionable trade recommendations.
Model
claude-3-5-sonnet-20241022
Tools
Read- Access historical analysis dataBash- Run data analysis scripts (optional)
System Prompt
You are a quantitative trading analyst specializing in momentum and sentiment-driven trades.
Your expertise:
- Social sentiment analysis (Reddit, Twitter)
- Technical analysis (price action, volume, options flow)
- Risk/reward assessment
- Trade structure optimization
ANALYSIS FRAMEWORK:
1. Social Sentiment Quality Assessment
- WHO is talking? (check author reputation, track record)
- WHAT is the thesis? (genuine analysis vs hype)
- HOW strong is conviction? (position sizes, time horizon)
- WHEN did this start? (early wave vs late FOMO)
2. Market Context Analysis
- Price Action: Breakout, consolidation, reversal pattern?
- Volume: Is this real volume or algorithmic wash?
- Options Flow: Smart money positioning or retail gambling?
- Historical Patterns: Similar setups in the past?
3. Catalyst Identification
- News catalyst (earnings, FDA approval, product launch)?
- Technical catalyst (resistance break, squeeze potential)?
- Sentiment catalyst (viral post, influencer mention)?
4. Risk Assessment
- Is this early or late in the move?
- What are downside catalysts? (earnings miss, dilution, regulation)
- Liquidity risk (can we exit easily)?
- Correlation risk (other positions in same sector)?
5. Trade Structure Recommendation
- Best vehicle: shares, calls, puts, spreads?
- Entry strategy: market, limit, scale-in?
- Risk parameters: stop loss, position size
- Profit targets: take-profit levels, time-based exit
OUTPUT FORMAT (JSON only, no other text):
{
"recommend": true/false,
"confidence": <0-100>,
"thesis": "<2-3 sentence trade thesis>",
"catalyst": "<primary catalyst>",
"risks": ["risk1", "risk2", "risk3"],
"trade_structure": {
"action": "BUY_CALLS" | "BUY_SHARES" | "SELL_PUTS" | "BUY_CALL_SPREAD",
"strike": <number or null>,
"expiry": "<YYYY-MM-DD or null>",
"entry_price_target": <price>,
"stop_loss": <price>,
"take_profit": <price>,
"time_horizon": "intraday" | "swing" | "position"
},
"reasoning": "<detailed multi-paragraph analysis>",
"similar_historical_signals": "<comparison to past similar setups>"
}
If recommend = false, provide rejection reasoning.
Input Format
input_context = {
"ticker": "GME",
"scout_score": 8,
# Aggregated metrics
"metrics": {
"mentions_15m": 47,
"quality_score_avg": 7.2,
"sentiment": 0.65,
"momentum": 2.5
},
# Top evidence (not all posts, just best)
"top_posts": [
{
"author": "DeepFuckingValue",
"author_karma": 500000,
"text": "Full DD text...",
"engagement": {"upvotes": 15000, "awards": 50},
"quality": 9.5
},
# ... 4 more top posts
],
# Market data
"market": {
"price": 45.67,
"change_pct": 2.3,
"volume": "15M",
"volume_vs_avg": 1.8,
"options_flow": {
"unusual_calls": True,
"call_put_ratio": 3.2,
"top_strikes": [50, 55, 60]
},
"technical": {
"rsi": 65,
"pattern": "ascending triangle"
}
},
# Historical context
"history": {
"last_analyzed": "2024-01-15",
"past_signals_count": 3,
"past_performance": {
"avg_return": 0.15,
"win_rate": 0.67
},
"current_position": None
}
}
Implementation
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
import json
class AnalystAgent:
def __init__(self):
self.options = ClaudeAgentOptions(
model="claude-3-5-sonnet-20241022",
allowed_tools=["Read"],
system_prompt="""[System prompt from above]""",
max_turns=3
)
self.client = ClaudeSDKClient(self.options)
async def analyze(self, signal: dict, context: dict) -> dict:
"""Deep analysis of signal"""
# Format top posts for readability
posts_text = "\n\n".join([
f"Post {i+1}: {post['author']} ({post['author_karma']} karma)\n"
f"Quality: {post['quality']}/10\n"
f"Engagement: {post['engagement']['upvotes']} upvotes, {post['engagement']['awards']} awards\n"
f"Content: {post['text'][:300]}..."
for i, post in enumerate(context['top_posts'])
])
prompt = f"""
Deep Analysis Request: {signal['ticker']}
SIGNAL STRENGTH:
Scout approved this signal with score {context['scout_score']}/10
SOCIAL SENTIMENT:
15m mentions: {context['metrics']['mentions_15m']}
Quality score: {context['metrics']['quality_score_avg']}/10
Sentiment: {context['metrics']['sentiment']}
Momentum: {context['metrics']['momentum']}x
TOP EVIDENCE:
{posts_text}
MARKET DATA:
Price: ${context['market']['price']} ({context['market']['change_pct']:+.1f}%)
Volume: {context['market']['volume']} ({context['market']['volume_vs_avg']}x avg)
Options Activity: {json.dumps(context['market']['options_flow'], indent=2)}
Technical: RSI {context['market']['technical']['rsi']}, Pattern: {context['market']['technical']['pattern']}
HISTORICAL CONTEXT:
{json.dumps(context['history'], indent=2)}
Provide comprehensive trade analysis in JSON format.
"""
await self.client.connect()
await self.client.query(prompt)
response_text = ""
async for message in self.client.receive_response():
if hasattr(message, 'content'):
for block in message.content:
if hasattr(block, 'text'):
response_text += block.text
await self.client.disconnect()
result = json.loads(response_text)
return result
Token Usage
- Input: ~2000-3000 tokens (full context)
- Output: ~500-1000 tokens (detailed analysis)
- Cost: ~$0.01-0.02 per analysis
Risk Manager Agent (Sonnet)
Purpose
Final approval gate with portfolio-aware position sizing and risk validation.
Model
claude-3-5-sonnet-20241022
Tools
Read- Access portfolio state, position data
System Prompt
You are a risk management specialist. You are the final gate before trade execution.
RISK RULES (STRICTLY ENFORCED):
1. Maximum position size: 2% of account value per trade
2. Stop loss: Minimum 5% below entry (tighter stops allowed)
3. Risk/Reward ratio: Minimum 1:2 (reward must be 2x risk)
4. Maximum daily loss: 5% of account value
5. Maximum concurrent positions: 5 open trades
6. Sector concentration: Maximum 20% of portfolio in any sector
7. Correlation limits: Avoid highly correlated positions (>0.7)
YOUR RESPONSIBILITIES:
1. Calculate appropriate position size based on 2% rule
2. Verify stop loss and R:R ratio meet minimums
3. Check portfolio for:
- Existing positions in same ticker
- Correlated positions (same sector/theme)
- Total exposure by sector
4. Verify sufficient buying power
5. Assess systemic risks (market volatility, geopolitical)
6. Final approve or reject
POSITION SIZING FORMULA:
```python
account_value = 50000 # example
risk_per_trade = account_value * 0.02 # $1000
entry = 100
stop = 95
risk_per_share = entry - stop # $5
position_size = risk_per_trade / risk_per_share # 200 shares = $20,000
OUTPUT FORMAT (JSON only): { "approved": true/false, "position_size_usd": <dollar amount>, "position_size_shares": <number of shares>, "risk_amount_usd": <dollar risk>, "risk_pct": <percent of account>, "reward_risk_ratio": <number>, "portfolio_impact": { "total_exposure": <percentage of account>, "sector_exposure": <percentage in this sector>, "correlation_concern": true/false }, "warnings": ["warning1", "warning2"], "risk_assessment": "<brief summary>", "rejection_reason": "<if rejected, why>" }
NEVER approve trades that violate risk rules. Safety over profits.
### Input Format
```python
input_context = {
"ticker": "GME",
"analysis": {
"recommend": True,
"confidence": 75,
"trade_structure": {
"action": "BUY_SHARES",
"entry_price_target": 46.00,
"stop_loss": 43.50,
"take_profit": 52.00
}
},
"portfolio": {
"total_value": 50000,
"cash_available": 25000,
"positions": [
{"ticker": "AAPL", "value": 10000, "sector": "Tech"},
{"ticker": "MSFT", "value": 8000, "sector": "Tech"}
],
"open_position_count": 2,
"daily_pnl": -500, # down $500 today
"sector_exposure": {
"Tech": 0.36,
"Retail": 0.0,
"Finance": 0.0
}
}
}
Implementation
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
import json
class RiskManagerAgent:
def __init__(self):
self.options = ClaudeAgentOptions(
model="claude-3-5-sonnet-20241022",
allowed_tools=["Read"],
system_prompt="""[System prompt from above]""",
max_turns=2
)
self.client = ClaudeSDKClient(self.options)
async def evaluate(self, analysis: dict, portfolio: dict) -> dict:
"""Final risk check and position sizing"""
prompt = f"""
Risk Management Review
PROPOSED TRADE:
Ticker: {analysis['ticker']}
Action: {analysis['analysis']['trade_structure']['action']}
Entry: ${analysis['analysis']['trade_structure']['entry_price_target']}
Stop Loss: ${analysis['analysis']['trade_structure']['stop_loss']}
Take Profit: ${analysis['analysis']['trade_structure']['take_profit']}
Confidence: {analysis['analysis']['confidence']}%
PORTFOLIO STATE:
Total Value: ${portfolio['total_value']:,}
Cash Available: ${portfolio['cash_available']:,}
Open Positions: {portfolio['open_position_count']}/5
Today's P&L: ${portfolio['daily_pnl']:+,}
EXISTING POSITIONS:
{json.dumps(portfolio['positions'], indent=2)}
SECTOR EXPOSURE:
{json.dumps(portfolio['sector_exposure'], indent=2)}
Calculate position size and perform final risk assessment.
Provide JSON response.
"""
await self.client.connect()
await self.client.query(prompt)
response_text = ""
async for message in self.client.receive_response():
if hasattr(message, 'content'):
for block in message.content:
if hasattr(block, 'text'):
response_text += block.text
await self.client.disconnect()
result = json.loads(response_text)
return result
Token Usage
- Input: ~800-1200 tokens
- Output: ~200-400 tokens
- Cost: ~$0.003-0.006 per check
Complete Pipeline Flow
import asyncio
from typing import Optional
class AgentPipeline:
def __init__(self):
self.scout = ScoutAgent()
self.analyst = AnalystAgent()
self.risk_manager = RiskManagerAgent()
async def process_signal(self, signal: dict, context: dict, portfolio: dict) -> Optional[dict]:
"""Run complete three-stage pipeline"""
# Stage 1: Scout (fast filter)
print(f"[Scout] Triaging {signal['ticker']}...")
scout_result = await self.scout.triage(signal)
if scout_result['decision'] == 'PASS':
print(f"[Scout] REJECTED: {scout_result['reasoning']}")
await log_rejection('scout', signal, scout_result)
return None
print(f"[Scout] ESCALATE (score: {scout_result['score']}/10)")
# Stage 2: Analyst (deep dive)
print(f"[Analyst] Deep analysis of {signal['ticker']}...")
context['scout_score'] = scout_result['score']
analyst_result = await self.analyst.analyze(signal, context)
if not analyst_result['recommend']:
print(f"[Analyst] REJECTED: {analyst_result['reasoning']}")
await log_rejection('analyst', signal, analyst_result)
return None
print(f"[Analyst] RECOMMEND (confidence: {analyst_result['confidence']}%)")
print(f"[Analyst] Thesis: {analyst_result['thesis']}")
# Stage 3: Risk Manager (final gate)
print(f"[Risk] Final approval check...")
risk_result = await self.risk_manager.evaluate(
{'ticker': signal['ticker'], 'analysis': analyst_result},
portfolio
)
if not risk_result['approved']:
print(f"[Risk] REJECTED: {risk_result['rejection_reason']}")
await log_rejection('risk', signal, risk_result)
return None
print(f"[Risk] APPROVED")
print(f"[Risk] Position: {risk_result['position_size_shares']} shares (${risk_result['position_size_usd']:,})")
print(f"[Risk] Risk: ${risk_result['risk_amount_usd']:.2f} ({risk_result['risk_pct']:.2f}%)")
# All stages passed - create alert
alert = {
"signal": signal,
"scout": scout_result,
"analysis": analyst_result,
"risk": risk_result,
"approved_at": datetime.now().isoformat()
}
await emit_discord_alert(alert)
return alert
async def log_rejection(stage: str, signal: dict, result: dict):
"""Log rejection to Postgres for analysis"""
# Implementation...
pass
async def emit_discord_alert(alert: dict):
"""Publish to Discord via Redis"""
# Implementation...
pass
Cost Analysis
Per 100 signals:
Scout (100 signals): $0.01 (filters 90)
Analyst (10 signals): $0.15 (rejects 5)
Risk Manager (5): $0.02 (approves 2-3)
-----------------------------------------
Total: $0.18 per 100 signals
Compare to single-agent approach:
Sonnet only (100): $1.50 per 100 signals
Savings: 88% cost reduction with better filtering
Testing Patterns
import pytest
@pytest.mark.asyncio
async def test_scout_filters_spam():
"""Scout should reject obvious pump & dump"""
signal = {
"ticker": "PUMP",
"metrics": {
"mentions_15m": 100, # High volume
"quality_score_avg": 2.0, # Low quality
"sentiment": 0.9 # Suspiciously positive
},
"top_post_summary": "New account: 'BUY NOW!!! MOON INCOMING!!!'"
}
scout = ScoutAgent()
result = await scout.triage(signal)
assert result['decision'] == 'PASS'
assert 'pump' in result['reasoning'].lower() or 'spam' in result['reasoning'].lower()
@pytest.mark.asyncio
async def test_analyst_generates_valid_structure():
"""Analyst output should match expected schema"""
# Test implementation...
pass
@pytest.mark.asyncio
async def test_risk_manager_enforces_2pct_rule():
"""Risk manager should cap position at 2% of account"""
# Test implementation...
pass
Monitoring & Improvement
Metrics to Track
-
Scout Performance
- Escalation rate (should be ~10%)
- False negative rate (good signals filtered)
- Average score of escalated signals
-
Analyst Performance
- Recommendation rate (should be ~50% of escalated)
- Win rate of recommended trades
- Average confidence vs actual outcome
-
Risk Manager Performance
- Rejection rate (should be minimal if analyst is good)
- Compliance with risk rules (should be 100%)
- Position sizing accuracy
Continuous Improvement
- Review rejected signals weekly
- Track which signal types convert to profitable trades
- Adjust Scout thresholds based on data
- Refine Analyst prompts based on missed opportunities
- Update risk rules based on portfolio performance
Deployment Notes
- Each agent runs in the same Docker container (main app)
- Agents are initialized once on startup
- Connections are persistent for session reuse
- Implement circuit breakers for API failures
- Cache common analyses to reduce redundant calls
- Log all agent interactions for debugging and auditing