Skip to main content
Agent2Agent (A2A) 是谷歌提出的用于实现对话式 AI 智能体间通信的协议。LangSmith 实现了对 A2A 的支持,允许您的智能体通过标准化协议与其他兼容 A2A 的智能体进行通信。 A2A 端点可在 Agent Server/a2a/{assistant_id} 路径下使用。

支持的方法

Agent Server 支持以下 A2A RPC 方法:
  • message/send:向助手发送消息并接收完整响应
  • message/stream:发送消息并使用服务器发送事件 (SSE) 实时流式传输响应
  • tasks/get:检索先前创建任务的状态和结果

智能体卡片发现

每个助手会自动公开一个 A2A 智能体卡片,该卡片描述了其功能,并提供了其他智能体连接所需的信息。您可以使用以下方式检索任何助手的智能体卡片:
GET /.well-known/agent-card.json?assistant_id={assistant_id}
智能体卡片包括助手的名称、描述、可用技能、支持的输入/输出模式以及用于通信的 A2A 端点 URL。

要求

要使用 A2A,请确保已安装以下依赖项:
  • langgraph-api >= 0.4.21
使用以下命令安装:
pip install "langgraph-api>=0.4.21"

使用概述

要启用 A2A:
  • 升级到使用 langgraph-api>=0.4.21。
  • 部署具有基于消息的状态结构的智能体。
  • 使用端点与其他兼容 A2A 的智能体连接。

创建兼容 A2A 的智能体

此示例创建一个兼容 A2A 的智能体,该智能体使用 OpenAI 的 API 处理传入消息并维护对话状态。该智能体定义了基于消息的状态结构,并处理 A2A 协议的消息格式。 为了与 A2A “text” 部分 兼容,智能体的状态中必须有一个 messages 键。 A2A 协议使用两个标识符来维护对话连续性:
  • contextId:将消息分组到一个对话线程中(类似于会话 ID)
  • taskId:标识该对话中的每个独立请求
在第一条消息中,省略 contextIdtaskId - 智能体会生成并返回它们。对于对话中的所有后续消息,包含先前响应中的 contextIdtaskId 以保持线程连续性。 LangSmith 追踪: Langsmith 部署的 A2A 端点会自动将 A2A 的 contextId 转换为 LangSmith 追踪的 thread_id,将对话中的所有消息分组到单个线程下。 例如:
"""LangGraph A2A 对话式智能体。

支持带有消息输入的 A2A 协议,用于对话式交互。
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any, Dict, List, TypedDict

from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from openai import AsyncOpenAI


class Context(TypedDict):
    """智能体的上下文参数。"""
    my_configurable_param: str


@dataclass
class State:
    """智能体的输入状态。

    为 A2A 对话消息定义初始结构。
    """
    messages: List[Dict[str, Any]]


async def call_model(state: State, runtime: Runtime[Context]) -> Dict[str, Any]:
    """处理对话消息并使用 OpenAI 返回输出。"""
    # 初始化 OpenAI 客户端
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # 处理传入消息
    latest_message = state.messages[-1] if state.messages else {}
    user_content = latest_message.get("content", "No message content")

    # 为 OpenAI API 创建消息
    openai_messages = [
        {
            "role": "system",
            "content": "你是一个有用的对话智能体。保持回答简洁且引人入胜。"
        },
        {
            "role": "user",
            "content": user_content
        }
    ]

    try:
        # 调用 OpenAI API
        response = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=openai_messages,
            max_tokens=100,
            temperature=0.7
        )

        ai_response = response.choices[0].message.content

    except Exception as e:
        ai_response = f"我收到了你的消息,但在处理时遇到了问题。错误:{str(e)[:50]}..."

    # 创建响应消息
    response_message = {
        "role": "assistant",
        "content": ai_response
    }

    return {
        "messages": state.messages + [response_message]
    }


# 定义图
graph = (
    StateGraph(State, context_schema=Context)
    .add_node(call_model)
    .add_edge("__start__", "call_model")
    .compile()
)

智能体间通信

一旦您的智能体通过 langgraph dev 在本地运行或部署到生产环境,您就可以使用 A2A 协议促进它们之间的通信。 此示例演示了两个智能体如何通过向彼此的 A2A 端点发送 JSON-RPC 消息进行通信。该脚本模拟了一个多轮对话,其中每个智能体处理对方的响应并继续对话。
#!/usr/bin/env python3
"""使用 LangGraph A2A 端点模拟智能体间对话。"""

import asyncio
import aiohttp
import os
import uuid


def extract_text(result: dict) -> str:
    """从 A2A 结果中尽力提取响应文本。"""
    for art in result.get("result", {}).get("artifacts", []) or []:
        for part in art.get("parts", []) or []:
            if part.get("kind") == "text" and part.get("text"):
                return part["text"]

    msg = (result.get("result", {}).get("status", {}) or {}).get("message", {}) or {}
    for part in msg.get("parts", []) or []:
        if part.get("kind") == "text" and part.get("text"):
            return part["text"]

    return "(未找到文本)"


async def send_message(session, port, assistant_id, text, context_id=None, task_id=None):
    """发送 A2A 消息。返回 (response_text, returned_context_id, returned_task_id)。"""
    url = f"http://127.0.0.1:{port}/a2a/{assistant_id}"

    message = {
        "role": "user",
        "parts": [{"kind": "text", "text": text}],
        "messageId": str(uuid.uuid4()),
    }

    # A2A 多轮连续性:跨轮次/智能体重用 contextId 和 taskId
    if context_id:
        message["contextId"] = context_id
    if task_id:
        message["taskId"] = task_id

    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "message/send",
        "params": {"message": message},
    }

    headers = {"Accept": "application/json"}
    async with session.post(url, json=payload, headers=headers) as response:
        result = await response.json()

    returned_context_id = result.get("result", {}).get("contextId") or context_id
    returned_task_id = result.get("result", {}).get("id")
    return extract_text(result), returned_context_id, returned_task_id


async def simulate_conversation():
    """模拟两个智能体之间的对话。"""

    # 助手 ID
    agent_a_id = os.getenv("AGENT_A_ID")
    agent_b_id = os.getenv("AGENT_B_ID")

    if not agent_a_id or not agent_b_id:
        print("请设置 AGENT_A_ID 和 AGENT_B_ID 环境变量")
        return

    message = "你好!让我们开始对话吧。"
    context_id = None
    task_id = None

    async with aiohttp.ClientSession() as session:
        for i in range(3):
            print(f"--- 第 {i + 1} 轮 ---")

            message, context_id, task_id = await send_message(
                session, 2024, agent_a_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔵 智能体 A: {message}")

            message, context_id, task_id = await send_message(
                session, 2025, agent_b_id, message,
                context_id=context_id,
                task_id=task_id,
            )
            print(f"🔴 智能体 B: {message}\n")


if __name__ == "__main__":
    asyncio.run(simulate_conversation())
完整的可运行示例,请参见:

禁用 A2A

要禁用 A2A 端点,请在您的 langgraph.json 配置文件中将 disable_a2a 设置为 true
{
  "$schema": "https://langgra.ph/schema.json",
  "http": {
    "disable_a2a": true
  }
}