JetBrains AI
Supercharge your tools with AI-powered features inside many JetBrains products
Koog × A2A:使用 Kotlin 构建互联的 AI 智能体
如果您曾尝试过构建由多个 AI 智能体组成的系统,很可能会遇到难题。 一开始很简单:您有一个智能体负责撰写博文,另一个智能体负责校对博文,或许还有第三个智能体负责建议或生成图像。 单独来看,它们都很高效。 但要让它们协同工作呢? 往往这个时候就会开始出现问题。
每个智能体都有自己的一套“语言”:一个使用不同的 API 接口,另一个有自己的消息格式,而且它们可能都有特定的身份验证要求。 要让它们彼此通信,就意味着需要为每一个连接编写自定义的集成代码。 结果,您无法专注于让各个智能体变得更智能、更快速或更有用,而是被困在为它们搭建沟通桥梁上。
A2A 的作用:跨智能体通信层
这就是 Agent2Agent (A2A) 协议发挥作用的地方。
借助 A2A,您的智能体可以通过标准化协议直接通信,相当于为您的 AI 生态系统提供了一个通用的翻译器。 您的博客撰写智能体可以无缝地将内容传递给校对智能体,校对智能体再触发图像生成智能体,而校对智能体可能会循环返回修正意见,图像生成智能体则可能会请求风格澄清。 所有这些都通过一个统一的通信层进行编排。
A2A 无需管理数十个点对点连接,它提供了以下功能:
- 即插即用连接:智能体可以自动发现彼此并建立连接。
- 标准化消息发送:统一的格式、清晰的协议,没有翻译难题。
- 内置编排:只需定义一次工作流,然后让 A2A 来处理协调工作。
- 不增加复杂度的可扩缩性:添加或重用智能体,无需重写现有连接。
结果是什么? 您可以将时间花在提升智能体的功能上,而不是调试它们之间的通信。 最棒的是,您可以使用任何喜欢的语言或框架来实现智能体。 对于 JVM 用户来说,Koog 是一个绝佳选择,从 0.5.0 版本开始,它与 A2A 生态系统无缝集成。
Koog 的作用:内部编排引擎
Koog 是一个基于 Kotlin 的框架,用于构建针对 JVM、Android、iOS、WebAssembly 和浏览器内应用程序的 AI 智能体。 它擅长以下方面:
- 复杂工作流管理:设计基于图的策略,支持循环、分支、回退和并行分支执行。
- 随时可用的组件:内置节点支持调用 LLM 和外部工具、总结消息历史记录以及执行完整策略。
- 工具编排:将代码中的任何函数转换为 AI 智能体可以使用的工具,支持顺序执行甚至并行执行。
- 原生 MCP 集成:使用 Kotlin MCP SDK 无缝连接到任何 MCP 服务器。
- 内存和存储支持:对智能体内存和 RAG(检索增强生成)工作流的内置支持,具备高效的上下文管理。
- 容错能力:内置重试、检查点设置、恢复机制和状态持久性,以确保可靠执行。
- 可观测性:完整的智能体事件处理、日志记录,以及对 OpenTelemetry 的支持,并具有与 Langfuse 和 W&B Weave 的内置集成。
简而言之,Koog 非常适合构建可靠的 AI 智能体。
为何将 Koog 与 A2A 结合使用
Koog 和 A2A 覆盖了 AI 智能体堆栈的不同层。 将两者结合使用时,它们可以相互补充,填补彼此的空白。
Koog 已经解决了现实企业应用中 AI 编排最复杂的部分。
A2A 则补上了缺失的一环:它使您的 Koog 智能体能够与生态系统中任何其他兼容 A2A 的智能体进行通信。 无需为每个外部服务构建自定义集成,Koog AI 工作流可以自动发现并使用其他智能体。
结果是完美匹配:Koog 的高级工作流成为任何智能体都可以请求的 A2A 任务,同时您的 Koog 智能体可以充分利用 A2A 生态系统的全部能力。 而且由于 Koog 可以在后端、设备端和浏览器内环境中运行,您能够以前所未有的广度和效率交付互联互通的 AI。
这是如何实现的? 我们来看看吧!
A2A 协议
A2A 协议定义了智能体间通信的基本要素:
- 通过标准化智能体卡片(描述其功能的 JSON 文档)实现智能体发现。
- 使用一致的架构进行请求和响应的消息格式。
- 具有清晰状态的任务生命周期管理:已提交 → 进行中 → 已完成/失败。
- 支持 JSON-RPC、gRPC 和 REST 等的传输层。
- 使用标准 OAuth2、API 密钥和 JWT 令牌的安全方案。
- 使用标准化错误代码的错误处理。

智能体卡片:数字名片
A2A 生态系统中的每个智能体都会通过一张“智能体卡片”来发布其功能。这种卡片是一个标准化 JSON 文件,托管在智能体域上的特定 URL 下,例如:/.well-known/agent-card.json。 智能体卡片相当于数字名片,能让其他智能体发现其提供的服务。
智能体卡片通常包含以下内容:
- 基本信息:比如智能体名称、描述和版本。
- 技能:智能体能做什么(例如,起草文档、校对文本、分析数据和生成图像)。
- 端点:如何联系该智能体。
- 其他可选信息:已启用的功能、身份验证等。
这种发现机制省去了手动集成工作的需要。 当一个智能体需要特定技能时,它只需检查相关的智能体卡片,就能了解如何与该服务进行交互。
在 Koog 中,智能体卡片使用 Kotlin 数据类进行定义:
val agentCard = AgentCard( name = "Blog Writer", description = "创建高质量博文和文章的 AI 智能体", url = "https://api.blog-writer.com/a2a/v1", version = "1.0.0", capabilities = AgentCapabilities(streaming = true), defaultInputModes = listOf("text/plain"), defaultOutputModes = listOf("text/markdown"), skills = listOf( AgentSkill( id = "write-post", name = "Blog Post Writing", description = "生成任何主题的精彩博文", tags = listOf("writing", "content", "blog"), examples = listOf("Write a post about AI trends") ) ) )
通用消息传递:一个简单的模式
A2A 为所有智能体间通信使用单一的标准化消息格式。 这种简洁性非常强大,智能体无需学习数十种不同的 API,只需理解一种通信模式即可。
每次交互都遵循相同的流程:
- 发送带有任务请求和参数的消息。
- 接收即时结果或用于跟踪的任务。
- 对于较长时间的操作,通过实时通道获取动态。
这种通用方式意味着添加新的智能体功能时无需更改通信协议。 无论您是让智能体总结文本还是生成复杂报告,消息结构始终保持一致。
在 Koog 中,使用已实现的对象和协议创建和发送消息非常简单:
val message = Message( role = Role.User, parts = listOf( TextPart("撰写一篇关于 AI 智能体未来的博文") ), contextId = "blog-project-456" ) val request = Request( data = MessageSendParams( message = message, configuration = MessageConfiguration( blocking = false, // 获得第一次回答 historyLength = 5 // 包括上下文 ) ) ) val response = client.sendMessage(request)
消息格式通过不同的 Part 类型支持丰富内容,包括用于纯文本内容的 TextPart、用于文件附件的 FilePart,以及用于结构化 JSON 数据的 DataPart。
这种统一结构意味着您的 Koog 智能体能够与任何兼容 A2A 的智能体无缝通信,无论目标智能体用于文本处理、文件分析还是复杂的数据转换。
任务生命周期:智能工作流
A2A 会根据任务的复杂性和持续时间智能管理不同类型的工作:
即时消息:像文本格式设置或快速计算这样的简单操作会直接在 AI 的回答中返回结果。 无需等待或跟踪。
长时间运行的任务:文档分析或多步骤工作流之类的复杂操作会被调度并返回一个任务。 发出请求的智能体随后可以监控进度并在任务完成后检索结果。
实时动态:对于耗时较长的操作,服务器发送事件 (SSE) 会提供实时进度更新。 这使智能体能够随时了解最新情况,而无需持续进行轮询。
class BlogWriterExecutor : AgentExecutor { override suspend fun execute( context: RequestContext, eventProcessor: SessionEventProcessor ) { val task = Task( contextId = context.contextId, status = TaskStatus( state = TaskState.Submitted, message = Message( role = Role.Agent, parts = listOf(TextPart("已收到博文撰写请求")), contextId = context.contextId, taskId = context.taskId, ) ) ) eventProcessor.sendTaskEvent(task) ... } }
内置安全性:仅采用行业标准
A2A 并未重新发明安全机制。 相反,它依赖于经过验证且被广泛采用的标准,如 OAuth2、API 密钥和标准 HTTPS。
这种方式意味着开发者无需学习新的身份验证方案。 如果您了解现代 Web API 的安全性,您就已经了解了 A2A 的安全性。 该系统继承了这些既定标准所带来的所有工具、最佳做法与安全审核。
"securitySchemes": { "google": { "openIdConnectUrl": "https://accounts.google.com/.well-known/openid-configuration", "type": "openIdConnect" } }
class AuthorizedA2AServer( agentExecutor: AgentExecutor, agentCard: AgentCard, agentCardExtended: AgentCard? = null, taskStorage: TaskStorage = InMemoryTaskStorage(), messageStorage: MessageStorage = InMemoryMessageStorage(), private val authService: AuthService, // 负责身份验证的服务 ) : A2AServer( agentExecutor = agentExecutor, agentCard = agentCard, agentCardExtended = agentCardExtended, taskStorage = taskStorage, messageStorage = messageStorage, ) { private suspend fun authenticateAndAuthorize( ctx: ServerCallContext, requiredPermission: String ): AuthenticatedUser { val token = ctx.headers["Authorization"]?.firstOrNull() ?: throw A2AInvalidParamsException("缺少授权令牌") val user = authService.authenticate(token) ?: throw A2AInvalidParamsException("授权令牌无效") if (requiredPermission !in user.permissions) { throw A2AUnsupportedOperationException("权限不足") } return user } override suspend fun onSendMessage( request: Request, ctx: ServerCallContext ): Response { val user = authenticateAndAuthorize(ctx, requiredPermission = "send_message") // 通过上下文状态将用户数据传递给智能体执行器 val enrichedCtx = ctx.copy( state = ctx.state + (AuthStateKeys.USER to user) ) // 委托给具有丰富上下文的父实现 return super.onSendMessage(request, enrichedCtx) } // 其余的封装 A2A 方法 // ... }
如何将 Koog 智能体与 A2A 集成
Koog 框架内置了 A2A 客户端和服务器。 这意味着您的 Koog 智能体可以与其他支持 A2A 的智能体无缝通信,同时也能让自己被外界发现。 下面是一个简单的示例,展示了如何实现这一点。
如何将 Koog 智能体封装到 A2A 服务器中
第一步,为智能体定义一个策略。 Koog 提供了方便的转换器(toKoogMessage 和 toA2AMessage),可以在 Koog 消息格式与 A2A 消息格式之间进行无缝转换,而无需手动序列化。 专用节点(如 nodeA2ASendMessage)会处理消息交换过程,这就使通信工作流的实现变得简单直接:
fun blogpostWritingStrategy() = strategy("blogpost-writer-strategy") { val blogpostRequest by node { input -> val userMessage = input.toKoogMessage().content llm.writeSession { user { +"根据用户的要求撰写一篇博文" +xml { tag("user_request") { +userMessage } } } requestLLM().toA2AMessage() } } val sendMessage by nodeA2ARespondMessage() nodeStart then blogpostRequest then sendMessage the nodeFinish }
第二步,定义智能体本身。 安装 A2AServer 功能后,您的智能体就变得可被发现并且可被生态系统中的其他智能体访问,从而能够创建复杂的网络,让各种专用智能体进行无缝协作。
fun createBlogpostWritingAgent( requestContext: RequestContext, eventProcessor: SessionEventProcessor ): AIAgent { // 获取当前对话上下文中的现有消息 val messageHistory = requestContext.messageStorage.getAll().map { it.toKoogMessage() } val agentConfig = AIAgentConfig( prompt = prompt("blogpost") { system("You are a blogpost writing agent") messages(messageHistory) }, model = GoogleModels.Gemini2_5Flash, maxAgentIterations = 5 ) return agent = AIAgent( promptExecutor = MultiLLMPromptExecutor( LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")), ), strategy = blogpostWritingStrategy(), agentConfig = agentConfig ) { install(A2AAgentServer) { this.context = requestContext this.eventProcessor = eventProcessor } handleEvents { onAgentFinished { ctx -> // 使用来自智能体的回答更新当前对话上下文 val resultMessge = ctx.result as A2AMessage requestContext.messageStorage.save(resultMessge) } } } }
第三步,我们需要将智能体封装到执行器中,然后定义一个服务器。
class BlogpostAgentExecutor : AgentExecutor { override suspend fun execute( context: RequestContext, eventProcessor: SessionEventProcessor ) { createBlogpostWritingAgent(context, eventProcessor) .run(context.params.message) } } val a2aServer = A2AServer( agentExecutor = BlogpostAgentExecutor(), agentCard = agentCard, )
最后一步是定义服务器传输并运行该服务器。
val transport = HttpJSONRPCServerTransport( requestHandler = a2aServer ) transport.start( engineFactory = Netty, port = 8080, path = "/a2a", wait = true, agentCard = agentCard, agentCardPath = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH )
现在,您的智能体已准备好处理请求了!
如何从 Koog 智能体调用其他支持 A2A 的智能体
首先,您需要配置一个 A2A 客户端,并连接它以获取智能体卡片。
val agentUrl = "https://example.com" val cardResolver = UrlAgentCardResolver( baseUrl = agentUrl, path = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH, ) val transport = HttpJSONRPCClientTransport( url = agentUrl, ) val a2aClient = A2AClient( transport = transport, agentCardResolver = cardResolver ) // 初始化客户端并获取卡片 a2aClient.connect()
然后,您可以在您的策略中使用 nodeA2ASendMessage 或 nodeA2ASendMessageStreaming 来调用这些客户端,并接收消息或任务响应。
val agentId = "agent_id" val agent = AIAgent( promptExecutor = MultiLLMPromptExecutor( LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")), ), strategy = strategy("a2a") { val nodePrepareRequest by node<String, A2AClientRequest> { input -> A2AClientRequest( agentId = agentId, callContext = ClientCallContext.Default, params = MessageSendParams( message = A2AMessage( messageId = Uuid.random().toString(), role = Role.User, parts = listOf( TextPart(input) ) ) ) ) } val nodeA2A by nodeA2AClientSendMessage(agentId) val nodeProcessResponse by node { // Process event when (it) { is A2AMessage -> it.parts .filterIsInstance() .joinToString(separator = "n") { it.text } is Task -> it.artifacts .orEmpty() .flatMap { it.parts } .filterIsInstance() .joinToString(separator = "n") { it.text } } } nodeStart then nodePrepareRequest then nodeA2A then nodeProcessResponse then nodeFinish }, agentConfig = agentConfig ) { install(A2AAgentClient) { this.a2aClients = mapOf(agentId to client) } } agent.run("撰写关于 A2A 和 Koog 集成的博文")
后续步骤
如需深入了解 Koog 和 A2A,请查看以下实用资料:
本博文英文原作者: