In this tutorial, we implement the agent communication protocol (ACP) by creating a flexible messaging system in accordance with Python ACP, by taking advantage of the Gemini API for the treatment of Google's natural language. From the installation and configuration of the Google-Generativateai library, the tutorial introduces basic abstractions, messages, performance and the acpmessing data class, which standardizes inter-agent communication. By defining the acpagent and acpmessagebroker classes, the guide shows how to create, send, transport and process structured messages from several autonomous agents. Thanks to clear code examples, users learn to implement the request, the request for shares and the dissemination information, while maintaining conversation threads, thanks and the management of errors.
import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)
We import essential Python modules, ranging from JSON manipulation and synchronization to the generation of unique identifiers and type annotations, to take charge of structured ACP implementation. It then recovers the reserved space of the key to the user's gemini API and configures the Google-Generativatif Client for subsequent calls to the Gemini language model.
class ACPMessageType(Enum):
"""Standard ACP message types"""
REQUEST = "request"
RESPONSE = "response"
INFORM = "inform"
QUERY = "query"
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
ERROR = "error"
ACK = "acknowledge"
The enumeration AcpmessageType defines the categories of basic messages used in the agent's communication protocol, including requests, responses, information emissions, requests and control actions such as subscription management, errors and thanks. By centralizing these types of messages, the protocol guarantees coherent manipulation and routing of inter-agent communications throughout the system.
class ACPPerformative(Enum):
"""ACP speech acts (performatives)"""
TELL = "tell"
ASK = "ask"
REPLY = "reply"
REQUEST_ACTION = "request-action"
AGREE = "agree"
REFUSE = "refuse"
PROPOSE = "propose"
ACCEPT = "accept"
REJECT = "reject"
The enumeration of the curriculum captures the variety of speech acts that agents can use during the interaction within the framework of the ACP, the mapping of high -level intentions, such as making requests, asking questions, giving orders or negotiating agreements, on standardized labels. This clear taxonomy allows agents to interpret and respond to messages in a contextually appropriate manner, ensuring robust and semantically rich communication.
@dataclass
class ACPMessage:
"""Agent Communication Protocol Message Structure"""
message_id: str
sender: str
receiver: str
performative: str
content: Dict(str, Any)
protocol: str = "ACP-1.0"
conversation_id: str = None
reply_to: str = None
language: str = "english"
encoding: str = "json"
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.conversation_id is None:
self.conversation_id = str(uuid.uuid4())
def to_acp_format(self) -> str:
"""Convert to standard ACP message format"""
acp_msg = {
"message-id": self.message_id,
"sender": self.sender,
"receiver": self.receiver,
"performative": self.performative,
"content": self.content,
"protocol": self.protocol,
"conversation-id": self.conversation_id,
"reply-to": self.reply_to,
"language": self.language,
"encoding": self.encoding,
"timestamp": self.timestamp
}
return json.dumps(acp_msg, indent=2)
@classmethod
def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
"""Parse ACP message from string format"""
data = json.loads(acp_string)
return cls(
message_id=data("message-id"),
sender=data("sender"),
receiver=data("receiver"),
performative=data("performative"),
content=data("content"),
protocol=data.get("protocol", "ACP-1.0"),
conversation_id=data.get("conversation-id"),
reply_to=data.get("reply-to"),
language=data.get("language", "english"),
encoding=data.get("encoding", "json"),
timestamp=data.get("timestamp", time.time())
)
The ACPMAGE data class summarizes all the fields required for a structured ACP exchange, including identifiers, participants, performance, payload and metadata such as the protocol version, language and horodatages. His method __post_init_ self-populate timestamp and conversation_id values, ensuring that each message is followed unique. Utility methods towards_acp_format and from_acp_format manages serialization towards and since the standardized JSON representation for transparent transmission and analysis.
class ACPAgent:
"""Agent implementing Agent Communication Protocol"""
def __init__(self, agent_id: str, name: str, capabilities: List(str)):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.model = genai.GenerativeModel("gemini-1.5-flash")
self.message_queue: List(ACPMessage) = ()
self.subscriptions: Dict(str, List(str)) = {}
self.conversations: Dict(str, List(ACPMessage)) = {}
def create_message(self, receiver: str, performative: str,
content: Dict(str, Any), conversation_id: str = None,
reply_to: str = None) -> ACPMessage:
"""Create a new ACP-compliant message"""
return ACPMessage(
message_id=str(uuid.uuid4()),
sender=self.agent_id,
receiver=receiver,
performative=performative,
content=content,
conversation_id=conversation_id,
reply_to=reply_to
)
def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
"""Send an INFORM message (telling someone a fact)"""
content = {"fact": fact, "data": data}
return self.create_message(receiver, ACPPerformative.TELL.value, content)
def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
"""Send a QUERY message (asking for information)"""
content = {"question": question, "query-type": query_type}
return self.create_message(receiver, ACPPerformative.ASK.value, content)
def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
"""Send a REQUEST message (asking someone to perform an action)"""
content = {"action": action, "parameters": parameters or {}}
return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
"""Send a REPLY message in response to another message"""
content = {"response": response_data, "original-question": original_msg.content}
return self.create_message(
original_msg.sender,
ACPPerformative.REPLY.value,
content,
conversation_id=original_msg.conversation_id,
reply_to=original_msg.message_id
)
def process_message(self, message: ACPMessage) -> Optional(ACPMessage):
"""Process incoming ACP message and generate appropriate response"""
self.message_queue.append(message)
conv_id = message.conversation_id
if conv_id not in self.conversations:
self.conversations(conv_id) = ()
self.conversations(conv_id).append(message)
if message.performative == ACPPerformative.ASK.value:
return self._handle_query(message)
elif message.performative == ACPPerformative.REQUEST_ACTION.value:
return self._handle_request(message)
elif message.performative == ACPPerformative.TELL.value:
return self._handle_inform(message)
return None
def _handle_query(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming query messages"""
question = message.content.get("question", "")
prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
try:
response = self.model.generate_content(prompt)
answer = response.text.strip()
except:
answer = "Unable to process query at this time"
return self.send_reply(message, {"answer": answer, "confidence": 0.8})
def _handle_request(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming action requests"""
action = message.content.get("action", "")
parameters = message.content.get("parameters", {})
if any(capability in action.lower() for capability in self.capabilities):
result = f"Executing {action} with parameters {parameters}"
status = "agreed"
else:
result = f"Cannot perform {action} - not in my capabilities"
status = "refused"
return self.send_reply(message, {"status": status, "result": result})
def _handle_inform(self, message: ACPMessage) -> Optional(ACPMessage):
"""Handle incoming information messages"""
fact = message.content.get("fact", "")
print(f"({self.name}) Received information: {fact}")
ack_content = {"status": "received", "fact": fact}
return self.create_message(message.sender, "acknowledge", ack_content,
conversation_id=message.conversation_id)
The acpagent class summarizes an autonomous entity capable of sending, receiving and processing messages in accordance with ACP using Gemini's language model. He manages his own queue queue, his conversation history and his subscriptions, and provides assistance methods (Send_inform, Send_Query, Send_request, Send_Reply) to correctly build the formatted acpmessing instances. Incoming messages are routed via Process_Message, which delegates specialized managers for requests, action requests and information messages.
class ACPMessageBroker:
"""Message broker implementing ACP routing and delivery"""
def __init__(self):
self.agents: Dict(str, ACPAgent) = {}
self.message_log: List(ACPMessage) = ()
self.routing_table: Dict(str, str) = {}
def register_agent(self, agent: ACPAgent):
"""Register an agent with the message broker"""
self.agents(agent.agent_id) = agent
self.routing_table(agent.agent_id) = "local"
print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
def route_message(self, message: ACPMessage) -> bool:
"""Route ACP message to appropriate recipient"""
if message.receiver not in self.agents:
print(f"✗ Receiver {message.receiver} not found")
return False
print(f"\n📨 ACP MESSAGE ROUTING:")
print(f"From: {message.sender} → To: {message.receiver}")
print(f"Performative: {message.performative}")
print(f"Content: {json.dumps(message.content, indent=2)}")
receiver_agent = self.agents(message.receiver)
response = receiver_agent.process_message(message)
self.message_log.append(message)
if response:
print(f"\n📤 GENERATED RESPONSE:")
print(f"From: {response.sender} → To: {response.receiver}")
print(f"Content: {json.dumps(response.content, indent=2)}")
if response.receiver in self.agents:
self.agents(response.receiver).process_message(response)
self.message_log.append(response)
return True
def broadcast_message(self, message: ACPMessage, recipients: List(str)):
"""Broadcast message to multiple recipients"""
for recipient in recipients:
msg_copy = ACPMessage(
message_id=str(uuid.uuid4()),
sender=message.sender,
receiver=recipient,
performative=message.performative,
content=message.content.copy(),
conversation_id=message.conversation_id
)
self.route_message(msg_copy)
Acpmessagebroker serves as a central router for ACP messages, maintaining a register of agents and messages. It provides methods to save agents, deliver individual messages via road_message, which manages research, logging and response, and to send the same message to several recipients with Broadcast_message.
def demonstrate_acp():
"""Comprehensive demonstration of Agent Communication Protocol"""
print("🤖 AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
print("=" * 60)
broker = ACPMessageBroker()
researcher = ACPAgent("agent-001", "Dr. Research", ("analysis", "research", "data-processing"))
assistant = ACPAgent("agent-002", "AI Assistant", ("information", "scheduling", "communication"))
calculator = ACPAgent("agent-003", "MathBot", ("calculation", "mathematics", "computation"))
broker.register_agent(researcher)
broker.register_agent(assistant)
broker.register_agent(calculator)
print(f"\n📋 REGISTERED AGENTS:")
for agent_id, agent in broker.agents.items():
print(f" • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
print(f"\n🔬 SCENARIO 1: Information Query (ASK performative)")
query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
broker.route_message(query_msg)
print(f"\n🔢 SCENARIO 2: Action Request (REQUEST-ACTION performative)")
calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
broker.route_message(calc_request)
print(f"\n📢 SCENARIO 3: Information Sharing (TELL performative)")
info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
broker.route_message(info_msg)
print(f"\n📊 PROTOCOL STATISTICS:")
print(f" • Total messages processed: {len(broker.message_log)}")
print(f" • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
print(f" • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
print(f"\n📋 SAMPLE ACP MESSAGE FORMAT:")
sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
print(sample_msg.to_acp_format())
The availability function_acp orchestrates a practical procedure for the entire ACP frame: it initializes a broker and three separate agents (researcher, AI and Mathbot assistant), records them and illustrates three key interaction scenarios, questioning information, requiring a calculation and sharing an update. After having sent each message and managed the responses, he prints summary statistics on messages. It presents a formatted ACP message, providing users with a clear and end -to -end example of how agents communicate under the protocol.
def setup_guide():
print("""
🚀 GOOGLE COLAB SETUP GUIDE:
1. Get Gemini API Key: https://makersuite.google.com/app/apikey
2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
3. Run: demonstrate_acp()
🔧 ACP PROTOCOL FEATURES:
• Standardized message format with required fields
• Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)
• Conversation tracking and message threading
• Error handling and acknowledgments
• Message routing and delivery confirmation
📝 EXTEND THE PROTOCOL:
```python
# Create custom agent
my_agent = ACPAgent("my-001", "CustomBot", ("custom-capability"))
broker.register_agent(my_agent)
# Send custom message
msg = my_agent.send_query("agent-001", "Your question here")
broker.route_message(msg)
```
""")
if __name__ == "__main__":
setup_guide()
demonstrate_acp()
Finally, the Setup_Guide function provides a rapid reference for the execution of the ACP demo in Google Colar, describing how to obtain and configure your Gemini API key and invoke the Routine Availates_ACP. It also sums up the features of the key protocol, such as standardized messages, performance and routing of messages. It provides a concise code extract illustrating how to record personalized agents and send tailor -made messages.
In conclusion, this tutorial implements multi-agent systems based on ACP capable of research, calculation and collaboration tasks. The scenarios of samples provided illustrate current use cases, information requests, calculation requests and the sharing of facts, while the broker provides a delivery and journalization of reliable messages. Readers are encouraged to extend the framework by adding new agent capacities, by integrating actions specific to the domain or by incorporating more sophisticated subscription and notification mechanisms.
Download the GitHub notebook. All the merit of this research goes to researchers in this project. Also, don't hesitate to follow us Twitter And don't forget to join our 95K + ML Subdreddit and subscribe to Our newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc .. as a visionary entrepreneur and engineer, AIF undertakes to exploit the potential of artificial intelligence for social good. His most recent company is the launch of an artificial intelligence media platform, Marktechpost, which stands out from its in-depth coverage of automatic learning and in-depth learning news which are both technically solid and easily understandable by a large audience. The platform has more than 2 million monthly views, illustrating its popularity with the public.
