Agent Codex: AG1_AetherBus Library
Version: (e.g., 0.6.0 - Increment after cleanup)
Agent Type: Core Infrastructure Library / Message Bus SDK
Primary Goal: To provide a reliable, asynchronous, Redis Streams-based message bus for inter-agent communication within the AG1 ecosystem. It defines core message structures (Envelope), publishing/subscribing mechanisms, and RPC-like patterns.
Key Components & Modules:
-
envelope.py- TheEnvelopeClass:- Purpose: Defines the standardized message wrapper for all AetherBus communications.
- Core Fields:
role,content,agent_name,user_id,session_code,reply_to,correlation_id,envelope_type,meta,timestamp,envelope_id. - Functionality: Serialization (
to_dict), deserialization (from_dict), unique ID generation.
-
keys.py-StreamKeyBuilderClass:- Purpose: Provides a centralized and consistent way to generate Redis stream names and keys for various AetherBus entities (agent inboxes, edge responses, temporary RPC replies, registration channels, etc.).
- Functionality: Methods like
agent_inbox(name),edge_response(edge_name, user_id),edge_register(edge_name),temp_response(unique_id_part).
-
bus.py- Core Bus Operations:build_redis_url(...): Constructs the Redis connection URL from configuration (environment variables or direct parameters).publish_envelope(redis_client, stream_name, envelope): Serializes anEnvelopeand publishes it to a specified Redis Stream usingXADD.subscribe_simple(redis_client, stream_name, callback, group_name, consumer_name, mkstream, block_ms, is_temp_stream): A simplified, robust function for an agent to subscribe to a single stream using a consumer group, process messages via a callback, and handle acknowledgments.subscribe(redis_client, stream_patterns_or_names, callback, group, consumer, mkstream, batch_size, block_ms): (If this is a more general or older version) A function for subscribing to one or more streams/patterns. Focus for cleanup might be to ensuresubscribe_simpleis the primary, well-tested method for single stream listening by agents, or to makesubscribeequally robust.
-
rpc.py- Request-Response Pattern Implementation:bus_rpc_call(redis_client, target_stream, request_envelope, timeout):- Sends
request_envelopetotarget_stream. - Crucially, it manages a temporary, unique reply stream name which it sets in
request_envelope.reply_tobefore sending. - Listens on this temporary reply stream for a single response envelope (as a JSON string).
- Handles timeouts.
- Returns the JSON string of the response envelope or
None.
- Sends
bus_rpc_envelope(redis_client, target_inbox, request_envelope, timeout):- A wrapper around
bus_rpc_call. - Ensures
request_envelope.reply_tois appropriately set (using a fallback if necessary) before callingbus_rpc_call. - Takes the JSON string response from
bus_rpc_calland deserializes it into anEnvelopeobject (or an error dictionary).
- A wrapper around
-
bus_adapterV2.py-BusAdapterV2Class:- Purpose: An abstraction layer for agents to easily manage subscriptions to their inboxes and other streams.
- Functionality:
- Initializes with an
agent_id,core_handler(likeagent.handle_bus_envelope),redis_client, andpatternsto listen on. - Uses
subscribe_simple(orsubscribe) internally to manage the Redis Stream subscriptions and consumer groups. - Provides
start()andstop()methods for managing the lifecycle of the listeners. - Handles dispatching received messages to the
core_handler. - May include methods like
add_subscription()for dynamic subscriptions.
- Initializes with an
Dependencies:
redis(asyncio version:redis.asyncio)- Python 3.x (specify version, e.g., 3.9+)
Key Design Principles for Robustness:
- Clear error handling and logging in all bus operations.
- Resilience to Redis connection issues (e.g., retry mechanisms where appropriate).
- Proper message acknowledgment (
XACK) in subscription loops to prevent reprocessing of faulty messages. - Consistent envelope serialization and deserialization.
- Well-defined and unique stream naming via
StreamKeyBuilder. - Non-blocking operations suitable for an
asyncioenvironment.
Interaction with Other Agents: This library is the fundamental communication layer. All AetherBus agents (Muse2, HeartBeat, Echo, EthAgent, Gatekeeper, uFetch_ASIOne_Client_Agent, ufetch_asi_edge_handler, Relay, etc.) will use these components to send and receive messages.