Ai logo

JetBrains AI

Supercharge your tools with AI-powered features inside many JetBrains products

News Releases

Koog × A2A:使用 Kotlin 构建互联的 AI 智能体

Read this post in other languages:

如果您曾尝试过构建由多个 AI 智能体组成的系统,很可能会遇到难题。 一开始很简单:您有一个智能体负责撰写博文,另一个智能体负责校对博文,或许还有第三个智能体负责建议或生成图像。 单独来看,它们都很高效。 但要让它们协同工作呢? 往往这个时候就会开始出现问题。

每个智能体都有自己的一套“语言”:一个使用不同的 API 接口,另一个有自己的消息格式,而且它们可能都有特定的身份验证要求。 要让它们彼此通信,就意味着需要为每一个连接编写自定义的集成代码。 结果,您无法专注于让各个智能体变得更智能、更快速或更有用,而是被困在为它们搭建沟通桥梁上。

A2A 的作用:跨智能体通信层

这就是 Agent2Agent (A2A) 协议发挥作用的地方

借助 A2A,您的智能体可以通过标准化协议直接通信,相当于为您的 AI 生态系统提供了一个通用的翻译器。 您的博客撰写智能体可以无缝地将内容传递给校对智能体,校对智能体再触发图像生成智能体,而校对智能体可能会循环返回修正意见,图像生成智能体则可能会请求风格澄清。 所有这些都通过一个统一的通信层进行编排。

A2A 无需管理数十个点对点连接,它提供了以下功能:

  • 即插即用连接:智能体可以自动发现彼此并建立连接。
  • 标准化消息发送:统一的格式、清晰的协议,没有翻译难题。
  • 内置编排:只需定义一次工作流,然后让 A2A 来处理协调工作。
  • 不增加复杂度的可扩缩性:添加或重用智能体,无需重写现有连接。

结果是什么? 您可以将时间花在提升智能体的功能上,而不是调试它们之间的通信。 最棒的是,您可以使用任何喜欢的语言或框架来实现智能体。 对于 JVM 用户来说,Koog 是一个绝佳选择,从 0.5.0 版本开始,它与 A2A 生态系统无缝集成。

Koog 的作用:内部编排引擎

Koog 是一个基于 Kotlin 的框架,用于构建针对 JVM、Android、iOS、WebAssembly 和浏览器内应用程序的 AI 智能体。 它擅长以下方面:

  • 复杂工作流管理:设计基于图的策略,支持循环、分支、回退和并行分支执行。
  • 随时可用的组件:内置节点支持调用 LLM 和外部工具、总结消息历史记录以及执行完整策略。
  • 工具编排:将代码中的任何函数转换为 AI 智能体可以使用的工具,支持顺序执行甚至并行执行。
  • 原生 MCP 集成:使用 Kotlin MCP SDK 无缝连接到任何 MCP 服务器。
  • 内存和存储支持:对智能体内存和 RAG(检索增强生成)工作流的内置支持,具备高效的上下文管理。
  • 容错能力:内置重试、检查点设置、恢复机制和状态持久性,以确保可靠执行。
  • 可观测性:完整的智能体事件处理、日志记录,以及对 OpenTelemetry 的支持,并具有与 Langfuse 和 W&B Weave 的内置集成。

简而言之,Koog 非常适合构建可靠的 AI 智能体。

为何将 Koog 与 A2A 结合使用

Koog 和 A2A 覆盖了 AI 智能体堆栈的不同层。 将两者结合使用时,它们可以相互补充,填补彼此的空白。

Koog 已经解决了现实企业应用中 AI 编排最复杂的部分

A2A 则补上了缺失的一环:它使您的 Koog 智能体能够与生态系统中任何其他兼容 A2A 的智能体进行通信。 无需为每个外部服务构建自定义集成,Koog AI 工作流可以自动发现并使用其他智能体。

结果是完美匹配:Koog 的高级工作流成为任何智能体都可以请求的 A2A 任务,同时您的 Koog 智能体可以充分利用 A2A 生态系统的全部能力。 而且由于 Koog 可以在后端、设备端和浏览器内环境中运行,您能够以前所未有的广度和效率交付互联互通的 AI。

这是如何实现的? 我们来看看吧!

A2A 协议

A2A 协议定义了智能体间通信的基本要素:

  • 通过标准化智能体卡片(描述其功能的 JSON 文档)实现智能体发现
  • 使用一致的架构进行请求和响应的消息格式
  • 具有清晰状态的任务生命周期管理:已提交 → 进行中 → 已完成/失败。
  • 支持 JSON-RPC、gRPC 和 REST 等的传输层
  • 使用标准 OAuth2、API 密钥和 JWT 令牌的安全方案
  • 使用标准化错误代码的错误处理

智能体卡片:数字名片

A2A 生态系统中的每个智能体都会通过一张“智能体卡片”来发布其功能。这种卡片是一个标准化 JSON 文件,托管在智能体域上的特定 URL 下,例如:/.well-known/agent-card.json。 智能体卡片相当于数字名片,能让其他智能体发现其提供的服务。

智能体卡片通常包含以下内容:

  • 基本信息:比如智能体名称、描述和版本。
  • 技能:智能体能做什么(例如,起草文档、校对文本、分析数据和生成图像)。
  • 端点:如何联系该智能体。 
  • 其他可选信息:已启用的功能、身份验证等。

这种发现机制省去了手动集成工作的需要。 当一个智能体需要特定技能时,它只需检查相关的智能体卡片,就能了解如何与该服务进行交互。

在 Koog 中,智能体卡片使用 Kotlin 数据类进行定义:

val agentCard = AgentCard(
    name = "Blog Writer",
    description = "创建高质量博文和文章的 AI 智能体",
    url = "https://api.blog-writer.com/a2a/v1",
    version = "1.0.0",
    capabilities = AgentCapabilities(streaming = true),
    defaultInputModes = listOf("text/plain"),
    defaultOutputModes = listOf("text/markdown"),
    skills = listOf(
        AgentSkill(
            id = "write-post",
            name = "Blog Post Writing",
            description = "生成任何主题的精彩博文",
            tags = listOf("writing", "content", "blog"),
            examples = listOf("Write a post about AI trends")
        )
    )
)

通用消息传递:一个简单的模式

A2A 为所有智能体间通信使用单一的标准化消息格式。 这种简洁性非常强大,智能体无需学习数十种不同的 API,只需理解一种通信模式即可。

每次交互都遵循相同的流程:

  1. 发送带有任务请求和参数的消息。
  2. 接收即时结果或用于跟踪的任务。
  3. 对于较长时间的操作,通过实时通道获取动态

这种通用方式意味着添加新的智能体功能时无需更改通信协议。 无论您是让智能体总结文本还是生成复杂报告,消息结构始终保持一致。

在 Koog 中,使用已实现的对象和协议创建和发送消息非常简单:

val message = Message(
    role = Role.User,
    parts = listOf(
        TextPart("撰写一篇关于 AI 智能体未来的博文")
    ),
    contextId = "blog-project-456"
)

val request = Request(
    data = MessageSendParams(
        message = message,
        configuration = MessageConfiguration(
            blocking = false, // 获得第一次回答
            historyLength = 5 // 包括上下文
        )
    )
)

val response = client.sendMessage(request)

消息格式通过不同的 Part 类型支持丰富内容,包括用于纯文本内容的 TextPart、用于文件附件的 FilePart,以及用于结构化 JSON 数据的 DataPart。

这种统一结构意味着您的 Koog 智能体能够与任何兼容 A2A 的智能体无缝通信,无论目标智能体用于文本处理、文件分析还是复杂的数据转换。

任务生命周期:智能工作流

A2A 会根据任务的复杂性和持续时间智能管理不同类型的工作:

即时消息:像文本格式设置或快速计算这样的简单操作会直接在 AI 的回答中返回结果。 无需等待或跟踪。

长时间运行的任务:文档分析或多步骤工作流之类的复杂操作会被调度并返回一个任务。 发出请求的智能体随后可以监控进度并在任务完成后检索结果。

实时动态:对于耗时较长的操作,服务器发送事件 (SSE) 会提供实时进度更新。 这使智能体能够随时了解最新情况,而无需持续进行轮询。

class BlogWriterExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        val task = Task(
            contextId = context.contextId,
            status = TaskStatus(
                state = TaskState.Submitted,
                message = Message(
                    role = Role.Agent,
                    parts = listOf(TextPart("已收到博文撰写请求")),
                    contextId = context.contextId,
    			taskId = context.taskId,
                )
            )
        )

        eventProcessor.sendTaskEvent(task)
	 ...
    }
}

内置安全性:仅采用行业标准

A2A 并未重新发明安全机制。 相反,它依赖于经过验证且被广泛采用的标准,如 OAuth2、API 密钥和标准 HTTPS。

这种方式意味着开发者无需学习新的身份验证方案。 如果您了解现代 Web API 的安全性,您就已经了解了 A2A 的安全性。 该系统继承了这些既定标准所带来的所有工具、最佳做法与安全审核。

"securitySchemes": {
   "google": {
       "openIdConnectUrl": "https://accounts.google.com/.well-known/openid-configuration",
       "type": "openIdConnect"
   }
}
class AuthorizedA2AServer(
    agentExecutor: AgentExecutor,
    agentCard: AgentCard,
    agentCardExtended: AgentCard? = null,
    taskStorage: TaskStorage = InMemoryTaskStorage(),
    messageStorage: MessageStorage = InMemoryMessageStorage(),
    private val authService: AuthService, // 负责身份验证的服务
) : A2AServer(
    agentExecutor = agentExecutor,
    agentCard = agentCard,
    agentCardExtended = agentCardExtended,
    taskStorage = taskStorage,
    messageStorage = messageStorage,
) {

    private suspend fun authenticateAndAuthorize(
        ctx: ServerCallContext,
        requiredPermission: String
    ): AuthenticatedUser {
        val token = ctx.headers["Authorization"]?.firstOrNull()
            ?: throw A2AInvalidParamsException("缺少授权令牌")

        val user = authService.authenticate(token)
            ?: throw A2AInvalidParamsException("授权令牌无效")

        if (requiredPermission !in user.permissions) {
            throw A2AUnsupportedOperationException("权限不足")
        }

        return user
    }

   override suspend fun onSendMessage(
        request: Request,
        ctx: ServerCallContext
    ): Response {
        val user = authenticateAndAuthorize(ctx, requiredPermission = "send_message")

        // 通过上下文状态将用户数据传递给智能体执行器
        val enrichedCtx = ctx.copy(
            state = ctx.state + (AuthStateKeys.USER to user)
        )

        // 委托给具有丰富上下文的父实现
        return super.onSendMessage(request, enrichedCtx)
    }

   // 其余的封装 A2A 方法
   // ...
}

如何将 Koog 智能体与 A2A 集成

Koog 框架内置了 A2A 客户端和服务器。 这意味着您的 Koog 智能体可以与其他支持 A2A 的智能体无缝通信,同时也能让自己被外界发现。 下面是一个简单的示例,展示了如何实现这一点。

如何将 Koog 智能体封装到 A2A 服务器中

第一步,为智能体定义一个策略。 Koog 提供了方便的转换器(toKoogMessagetoA2AMessage),可以在 Koog 消息格式与 A2A 消息格式之间进行无缝转换,而无需手动序列化。 专用节点(如 nodeA2ASendMessage)会处理消息交换过程,这就使通信工作流的实现变得简单直接:

fun blogpostWritingStrategy() = strategy("blogpost-writer-strategy") {
    val blogpostRequest by node { input ->
        val userMessage = input.toKoogMessage().content

        llm.writeSession {
            user {
                +"根据用户的要求撰写一篇博文"
                +xml {
                    tag("user_request") {
                        +userMessage
                    }
                }
            }

            requestLLM().toA2AMessage()
        }
    }

    val sendMessage by nodeA2ARespondMessage()

    nodeStart then blogpostRequest then sendMessage the nodeFinish
}

第二步,定义智能体本身。 安装 A2AServer 功能后,您的智能体就变得可被发现并且可被生态系统中的其他智能体访问,从而能够创建复杂的网络,让各种专用智能体进行无缝协作。

fun createBlogpostWritingAgent(
    requestContext: RequestContext,
    eventProcessor: SessionEventProcessor
): AIAgent {
     // 获取当前对话上下文中的现有消息
     val messageHistory = requestContext.messageStorage.getAll().map { it.toKoogMessage() }

     val agentConfig = AIAgentConfig(
        prompt = prompt("blogpost") {
            system("You are a blogpost writing agent")

            messages(messageHistory)
        },
        model = GoogleModels.Gemini2_5Flash,
        maxAgentIterations = 5
    )

    return agent = AIAgent(
        promptExecutor = MultiLLMPromptExecutor(
            LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
        ),
        strategy = blogpostWritingStrategy(),
        agentConfig = agentConfig
    ) {
        install(A2AAgentServer) {
            this.context = requestContext
            this.eventProcessor = eventProcessor
        }

        handleEvents {
            onAgentFinished { ctx ->
                // 使用来自智能体的回答更新当前对话上下文
                val resultMessge = ctx.result as A2AMessage
                requestContext.messageStorage.save(resultMessge)
            }
        }
    }
}

第三步,我们需要将智能体封装到执行器中,然后定义一个服务器。

class BlogpostAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        createBlogpostWritingAgent(context, eventProcessor)
            .run(context.params.message)
    }
}

val a2aServer = A2AServer(
    agentExecutor = BlogpostAgentExecutor(),
    agentCard = agentCard,
)

最后一步是定义服务器传输并运行该服务器。

val transport = HttpJSONRPCServerTransport(
    requestHandler = a2aServer
)

transport.start(
    engineFactory = Netty,
    port = 8080,
    path = "/a2a",
    wait = true,
    agentCard = agentCard,
    agentCardPath = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH
)

现在,您的智能体已准备好处理请求了! 

如何从 Koog 智能体调用其他支持 A2A 的智能体

首先,您需要配置一个 A2A 客户端,并连接它以获取智能体卡片。

val agentUrl = "https://example.com"

val cardResolver = UrlAgentCardResolver(
    baseUrl = agentUrl,
    path = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH,
)

val transport = HttpJSONRPCClientTransport(
    url = agentUrl,
)

val a2aClient = A2AClient(
    transport = transport,
    agentCardResolver = cardResolver
)

// 初始化客户端并获取卡片
a2aClient.connect()

然后,您可以在您的策略中使用 nodeA2ASendMessagenodeA2ASendMessageStreaming 来调用这些客户端,并接收消息或任务响应。

val agentId = "agent_id"
val agent = AIAgent(
    promptExecutor = MultiLLMPromptExecutor(
        LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
    ),
    strategy = strategy("a2a") {
        val nodePrepareRequest by node<String, A2AClientRequest> { input ->
            A2AClientRequest(
                agentId = agentId,
                callContext = ClientCallContext.Default,
                params = MessageSendParams(
                    message = A2AMessage(
                        messageId = Uuid.random().toString(),
                        role = Role.User,
                        parts = listOf(
                            TextPart(input)
                        )
                    )
                )
            )
        }
        val nodeA2A by nodeA2AClientSendMessage(agentId)
        
        val nodeProcessResponse by node {
            // Process event
            when (it) {
                is A2AMessage -> it.parts
                    .filterIsInstance()
                    .joinToString(separator = "n") { it.text }
                
                is Task -> it.artifacts
                    .orEmpty()
                    .flatMap { it.parts }
                    .filterIsInstance()
                    .joinToString(separator = "n") { it.text }
            }
        }

        nodeStart then nodePrepareRequest then nodeA2A then nodeProcessResponse then nodeFinish

    },
    agentConfig = agentConfig
) {
   install(A2AAgentClient) {
        this.a2aClients = mapOf(agentId to client)
    }
}

agent.run("撰写关于 A2A 和 Koog 集成的博文")

后续步骤

如需深入了解 Koog 和 A2A,请查看以下实用资料:

Koog 文档

A2A 规范

Koog A2A 示例

本博文英文原作者:

Andrey Bragin

Andrey Bragin

image description

Discover more