JATAYU — Agent 4 & 5: Decision Agent + Remediation Agent
Files: agents/decision_agent.py · agents/remediation_agent.py
Agent 4: Decision Agent
Purpose
The Decision Agent maps RCA results to concrete, safe remediation actions. It is the policy engine of the pipeline:
- Consumes RCA results from
jatayu.agent.rca.results - Looks up service metadata from the service registry (criticality, allowed policies, namespace)
- Applies 4 safety guards before authorising any action
- Publishes decision intents to
jatayu.agent.decision.intents
Safety Guards (in order)
| Guard | Condition | Outcome |
|---|---|---|
| Guard 1 | Critical service (paymentservice, checkoutservice) or auto_remediate=false | manual_approval required |
| Guard 2 | RCA confidence < 0.50 | investigate only |
| Guard 3 | ≥ 3 remediations already attempted for this service | escalate to on-call |
| Guard 4 | Action not in service's allowed policy list | Fall back to first allowed policy |
Failure → Action Mapping
pod_killed → rollout_restart
pod_instability → rollout_restart
readiness_failure → restart_pod
cpu_saturation → scale_out
service_degradation → restart_pod
network_degradation → mark_degraded
service_error → rollout_restart
critical_failure → rollout_restart
unknown_failure → investigate
Decision Agent Full Source Code
"""Decision Agent: map RCA results to remediation action intents."""
from __future__ import annotations
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from kafka.errors import KafkaError, KafkaTimeoutError, NoBrokersAvailable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from messaging.consumer import get_consumer
from messaging.healthcheck import check_kafka_connection
from messaging.producer import get_producer
from messaging.topics import TOPICS
from registry.registry_loader import (
get_criticality,
get_k8s_resource,
get_namespace,
get_remediation_policies,
is_safe_to_auto_remediate,
)
# Safety thresholds
CRITICAL_SERVICES_REQUIRE_APPROVAL = {"paymentservice", "checkoutservice"}
MAX_AUTO_REMEDIATIONS_PER_SERVICE = 3
# Failure type -> preferred action mapping
_FAILURE_ACTION_MAP = {
"pod_killed": "rollout_restart",
"pod_instability": "rollout_restart",
"readiness_failure": "restart_pod",
"cpu_saturation": "scale_out",
"service_degradation": "restart_pod",
"network_degradation": "mark_degraded",
"service_error": "rollout_restart",
"critical_failure": "rollout_restart",
"unknown_failure": "investigate",
}
# Track remediation counts per service per session
_remediation_counts: Dict[str, int] = {}
def _decide_action(rca: Dict[str, Any]) -> Dict[str, Any]:
"""Map RCA result to a concrete remediation action intent."""
service = rca.get("root_cause_service", "unknown")
failure_type = rca.get("failure_type", "unknown_failure")
confidence = float(rca.get("confidence_score", 0) or 0)
preferred_action = _FAILURE_ACTION_MAP.get(failure_type, "investigate")
policies = get_remediation_policies(service)
criticality = get_criticality(service)
safe_auto = is_safe_to_auto_remediate(service)
k8s_resource = get_k8s_resource(service)
namespace = get_namespace(service)
reason = "auto_remediation"
action = preferred_action
auto_execute = True
# Guard 1: critical services need human approval
if service in CRITICAL_SERVICES_REQUIRE_APPROVAL or not safe_auto:
action = "manual_approval"
reason = f"critical_service_requires_approval ({criticality})"
auto_execute = False
# Guard 2: low confidence -> investigate only
elif confidence < 0.50:
action = "investigate"
reason = "low_confidence_rca"
auto_execute = False
# Guard 3: too many remediations for this service
elif _remediation_counts.get(service, 0) >= MAX_AUTO_REMEDIATIONS_PER_SERVICE:
action = "escalate"
reason = f"remediation_limit_exceeded ({_remediation_counts.get(service, 0)} attempts)"
auto_execute = False
# Guard 4: action not in allowed policies
elif policies and action not in policies and preferred_action not in policies:
action = policies[0] if policies else "investigate"
reason = "policy_fallback"
action_params = {
"k8s_resource": k8s_resource or f"deployment/{service}",
"namespace": namespace,
"target_service": service,
}
if action == "scale_out":
action_params["scale_to"] = 2
return {
"event_type": "decision_intent",
"service": service,
"action": action,
"reason": reason,
"auto_execute": auto_execute,
"action_params": action_params,
"criticality": criticality,
"confidence_score": confidence,
"failure_type": failure_type,
"remediation_recommendation": rca.get("remediation_recommendation"),
"impacted_services": rca.get("impacted_services", []),
"evidence_summary": (rca.get("evidence") or [])[:3],
"snapshot_time": rca.get("snapshot_time"),
"scenario": rca.get("scenario"),
"run_id": rca.get("run_id"),
"decided_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
def run() -> None:
if not check_kafka_connection():
return
consume_topic = TOPICS["agent_rca_results"]
publish_topic = TOPICS["agent_decision_intents"]
print(f"[Decision] Starting agent | consume={consume_topic} | publish={publish_topic}")
consumer = get_consumer(
consume_topic,
group_id="jatayu-decision-agent",
auto_offset_reset="latest",
)
producer = get_producer()
try:
for msg in consumer:
rca = msg.value
if not isinstance(rca, dict):
continue
service = rca.get("root_cause_service", "unknown")
print(f"[Decision] Processing RCA for {service} (failure={rca.get('failure_type')})")
intent = _decide_action(rca)
key = intent.get("service")
try:
future = producer.send(publish_topic, value=intent, key=key)
future.get(timeout=10)
print(f"[Decision] Intent published: service={key}, action={intent['action']}, auto={intent['auto_execute']}")
if intent["auto_execute"]:
_remediation_counts[service] = _remediation_counts.get(service, 0) + 1
except (KafkaTimeoutError, NoBrokersAvailable) as exc:
print(f"[Decision][ERROR] Kafka unavailable: {exc}")
break
except KafkaError as exc:
print(f"[Decision][ERROR] Failed to publish: {exc}")
except KeyboardInterrupt:
print("[Decision] Stopping agent (keyboard interrupt)")
finally:
try:
producer.flush()
finally:
producer.close()
consumer.close()
if __name__ == "__main__":
run()
Decision Intent Schema (Output)
{
"event_type": "decision_intent",
"service": "cartservice",
"action": "rollout_restart",
"reason": "auto_remediation",
"auto_execute": true,
"action_params": {
"k8s_resource": "deployment/cartservice",
"namespace": "default",
"target_service": "cartservice"
},
"criticality": "medium",
"confidence_score": 0.90,
"failure_type": "pod_killed"
}
Agent 5: Remediation Agent
Purpose
The Remediation Agent executes (or simulates) the Kubernetes remediation actions decided by the Decision Agent:
- Consumes decision intents from
jatayu.agent.decision.intents - Simulates execution using
hash(service + action)for deterministic demo results - Builds kubectl-style command strings for display
- Publishes execution results to
jatayu.agent.remediation.results
Supported Actions
| Action | Method | Est. Duration | Success Rate |
|---|---|---|---|
restart_pod | kubectl rollout restart | 30s | 92% |
rollout_restart | kubectl rollout restart deployment | 45s | 95% |
scale_out | kubectl scale deployment --replicas | 20s | 98% |
mark_degraded | annotate deployment | 5s | 99% |
verify_endpoints | kubectl get endpoints | 10s | 99% |
manual_approval | PENDING HUMAN APPROVAL | — | — |
escalate | page_oncall_engineer | — | — |
investigate | gather_diagnostics | 60s | 99% |
Remediation Agent Full Source Code
"""Remediation/Execution Agent: simulate Kubernetes remediation actions."""
from __future__ import annotations
import time
from typing import Any, Dict, Optional
from kafka.errors import KafkaError, KafkaTimeoutError, NoBrokersAvailable
from messaging.consumer import get_consumer
from messaging.healthcheck import check_kafka_connection
from messaging.producer import get_producer
from messaging.topics import TOPICS
# Simulated execution results for each action type
_ACTION_SIMULATORS: Dict[str, Dict[str, Any]] = {
"restart_pod": {
"method": "kubectl rollout restart",
"estimated_duration_s": 30,
"side_effects": "Brief pod restart, ~30s downtime",
"success_rate": 0.92,
},
"rollout_restart": {
"method": "kubectl rollout restart deployment",
"estimated_duration_s": 45,
"side_effects": "Rolling restart, minimal downtime",
"success_rate": 0.95,
},
"scale_out": {
"method": "kubectl scale deployment --replicas",
"estimated_duration_s": 20,
"side_effects": "New pod scheduling, increased resource usage",
"success_rate": 0.98,
},
"mark_degraded": {
"method": "annotate deployment jatayu/status=degraded",
"estimated_duration_s": 5,
"side_effects": "Service marked as degraded, alerts generated",
"success_rate": 0.99,
},
"verify_endpoints": {
"method": "kubectl get endpoints | validate connectivity",
"estimated_duration_s": 10,
"side_effects": "Read-only verification, no service impact",
"success_rate": 0.99,
},
"manual_approval": {
"method": "PENDING_HUMAN_APPROVAL",
"estimated_duration_s": 0,
"side_effects": "Waiting for operator acknowledgment",
"success_rate": None,
},
"escalate": {
"method": "page_oncall_engineer",
"estimated_duration_s": 0,
"side_effects": "Incident escalated to on-call team",
"success_rate": None,
},
"investigate": {
"method": "gather_diagnostics",
"estimated_duration_s": 60,
"side_effects": "Read-only diagnostics, no changes",
"success_rate": 0.99,
},
}
def _simulate_execution(intent: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate executing a Kubernetes remediation action."""
action = intent.get("action", "investigate")
service = intent.get("service", "unknown")
params = intent.get("action_params", {})
sim = _ACTION_SIMULATORS.get(action, _ACTION_SIMULATORS["investigate"])
auto_execute = intent.get("auto_execute", False)
if not auto_execute:
status = "deferred"
message = f"Action deferred: {intent.get('reason', 'manual approval required')}"
success = False
else:
success_rate = sim.get("success_rate") or 0.90
# Simulate success (deterministic for demo based on service+action hash)
hash_val = (hash(service + action) % 100) / 100.0
success = hash_val < success_rate
if success:
status = "success"
message = f"Executed {action} on {params.get('k8s_resource', service)} in namespace {params.get('namespace', 'default')}"
else:
status = "failed"
message = f"Execution failed: {action} on {service} - resource temporarily unavailable"
kubectl_cmd = _build_kubectl_command(action, service, params)
return {
"event_type": "remediation_result",
"service": service,
"action": action,
"status": status,
"success": success,
"message": message,
"kubectl_command": kubectl_cmd,
"execution_method": sim["method"],
"estimated_duration_s": sim["estimated_duration_s"],
"side_effects": sim["side_effects"],
"action_params": params,
"failure_type": intent.get("failure_type"),
"impacted_services": intent.get("impacted_services", []),
"confidence_score": intent.get("confidence_score"),
"snapshot_time": intent.get("snapshot_time"),
"scenario": intent.get("scenario"),
"run_id": intent.get("run_id"),
"evidence_summary": intent.get("evidence_summary", []),
"executed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
def _build_kubectl_command(action: str, service: str, params: Dict[str, Any]) -> str:
ns = params.get("namespace", "default")
resource = params.get("k8s_resource", f"deployment/{service}")
commands = {
"restart_pod": f"kubectl rollout restart {resource} -n {ns}",
"rollout_restart": f"kubectl rollout restart {resource} -n {ns}",
"scale_out": f"kubectl scale {resource} --replicas={params.get('scale_to', 2)} -n {ns}",
"mark_degraded": f"kubectl annotate {resource} jatayu/status=degraded -n {ns}",
"verify_endpoints": f"kubectl get endpoints {service} -n {ns}",
"manual_approval": f"# MANUAL APPROVAL REQUIRED: {resource} -n {ns}",
"escalate": f"# ESCALATE: {resource} -n {ns} - contact on-call team",
"investigate": f"kubectl describe {resource} -n {ns}",
}
return commands.get(action, f"kubectl describe {resource} -n {ns}")
def run() -> None:
if not check_kafka_connection():
return
consume_topic = TOPICS["agent_decision_intents"]
publish_topic = TOPICS["agent_remediation_results"]
print(f"[Remediation] Starting agent | consume={consume_topic} | publish={publish_topic}")
consumer = get_consumer(
consume_topic,
group_id="jatayu-remediation-agent",
auto_offset_reset="latest",
)
producer = get_producer()
try:
for msg in consumer:
intent = msg.value
if not isinstance(intent, dict):
continue
service = intent.get("service", "unknown")
action = intent.get("action", "investigate")
print(f"[Remediation] Executing: service={service}, action={action}, auto={intent.get('auto_execute')}")
result = _simulate_execution(intent)
key = result.get("service")
try:
future = producer.send(publish_topic, value=result, key=key)
future.get(timeout=10)
print(f"[Remediation] Result published: service={key}, status={result['status']}, cmd={result['kubectl_command'][:60]}...")
except (KafkaTimeoutError, NoBrokersAvailable) as exc:
print(f"[Remediation][ERROR] Kafka unavailable: {exc}")
break
except KafkaError as exc:
print(f"[Remediation][ERROR] Failed to publish: {exc}")
except KeyboardInterrupt:
print("[Remediation] Stopping agent (keyboard interrupt)")
finally:
try:
producer.flush()
finally:
producer.close()
consumer.close()
if __name__ == "__main__":
run()
Remediation Result Schema (Output)
{
"event_type": "remediation_result",
"service": "cartservice",
"action": "rollout_restart",
"status": "success",
"success": true,
"message": "Executed rollout_restart on deployment/cartservice in namespace default",
"kubectl_command": "kubectl rollout restart deployment/cartservice -n default",
"execution_method": "kubectl rollout restart deployment",
"estimated_duration_s": 45,
"side_effects": "Rolling restart, minimal downtime",
"executed_at": "2024-01-15T10:30:45Z"
}