Ai logo

JetBrains AI

Supercharge your tools with AI-powered features inside many JetBrains products

News Releases

Koog × A2A: Kotlin によるコネクテッド AI エージェントの構築

Read this post in other languages:

複数の AI エージェントを含むシステムを構築したことがある方は、「あの問題」に遭遇したことがあるかと思います。 最初は単純なことから着手し、1 つのエージェントでブログ記事を書き、別のエージェントで校正し、3 つ目のエージェントでは画像の提案や生成を行うかもしれません。 これらのエージェントは個別には効果的ですが、 連携させた途端に 動作が破綻し始めるかもしれません。

エージェントが話す「言語」はそれぞれに異なります。あるエージェントは異なる API インターフェースを使用し、別のエージェントには独自のメッセージ形式があります。また、どちらも特定の認証要件が伴う可能性もあります。 これらのエージェントを相互に通信させるには、接続のたびにカスタムの統合コードを書かなければなりません。 エージェントをよりスマート、高速、かつ利便性の高いものにする作業に集中できず、エージェント間を橋渡しすることに時間を取られてしまいます。

A2A の役割: クロスエージェント通信レイヤー

そこで Agent2Agent(A2A)プロトコルの出番です。

A2A を使用すると、エージェント同士が標準的なプロトコルで直接通信できます。AI エコシステムの汎用翻訳機として機能するのです。 ブログ執筆エージェントがコンテンツをシームレスに校正エージェントに渡し、校正エージェントが画像生成エージェントを呼び出します。それと同時に校正エージェントが訂正内容を返し、画像生成エージェントがスタイルの明確化をリクエストします。 このプロセス全体が 1 つの統一された通信レイヤーを通じて調整されます。

多数のポイントツーポイント接続を管理しなくても、A2A は以下を実現します。

  • プラグアンドプレイ方式の接続: エージェント同士が相互に検出と接続を自動的に行います。
  • 標準化されたメッセージング: 統一された形式と明確なプロトコルを採用しているため、言語変換に悩まされることはありません。
  • 組み込みのオーケストレーション: ワークフローを一度定義すれば、A2A に調整を任せることができます。
  • 手間いらずの拡張: 既存の接続を書き直さなくてもエージェントの追加や再利用が可能です。

その結果、 開発者はエージェント間の対話をデバッグするのではなく、エージェントの機能を改善することに時間を活用できるようになります。 しかも、任意の言語やフレームワークを使用してエージェントを実装できるという最大のメリットがあります。 Koog は JVM ユーザーに最適な選択肢で、0.5.0 バージョンの時点で A2A エコシステムとのシームレスな統合を実現しています。

Koog の役割: 内部オーケストレーションエンジン

Koog は JVM、Android、iOS、WebAssembly、およびブラウザー内アプリケーションをターゲットとする AI エージェントを構築するための Kotlin ベースのフレームワークです。 以下のような特長があります。

  • 複雑なワークフローの管理: ループ、分岐、フォールバック、および分岐の並列実行をサポートするグラフベースの戦略を設計します。
  • 既製のコンポーネント: LLM と外部ツールの呼び出し、メッセージ履歴の要約、および戦略全体の実行を担う組み込みのノードを活用できます。
  • ツールのオーケストレーション: コード内のあらゆる関数を AI エージェントが逐次的または並行的に使用できるツールに変換します。
  • ネイティブの MCP 統合: Kotlin MCP SDK を使用して任意の MCP サーバーにシームレスに接続します。
  • メモリとストレージのサポート: 効率的なコンテキスト管理機能を備えたエージェントメモリと RAG(検索拡張生成)ワークフローを組み込みでサポートしています。
  • 耐障害性: リトライ機能、チェックポイント機能、復旧メカニズム、および状態の永続性が組み込まれているため、信頼性の高い実行が保証されています。
  • オブザーバビリティ: 組み込みの Langfuse と W&B Weave との統合により、包括的なエージェントイベントの処理、ログ記録、および OpenTelemetry のサポートを提供します。

つまり、Koog は信頼性の高い AI エージェントの構築に最適です。

Koog と A2A を組み合わせる理由

Koog と A2A は AI エージェントスタックのさまざまなレイヤーに対応しています。 これらは組み合わせて使用することにより、相いに補完しながらギャップを埋めることができます。

実務での使用に必要な AI オーケストレーションの最も困難な部分は、Koog がすでに対応しています。

A2A は不足部分を補い、Koog エージェントがエコシステム内の他の A2A 対応エージェントと通信できるようにします。 そのため、個々の外部サービス用のカスタム統合を構築しなくても、Koog の AI ワークフローでは他のエージェントを自動的に検出して使用できます。

完璧な相性が成立します。Koog の高度なワークフローは任意のエージェントがリクエストを送信可能な A2A タスクとなり、Koog エージェントは A2A エコシステムを最大限に活用できます。 また、Koog はバックエンド、デバイス上、ブラウザー内の環境で動作するため、相互に接続された AI をこれまで以上に幅広く効果的に提供することができます。

これらはどのように実現しているのでしょうか? それでは確認していきましょう!

A2A プロトコル

A2A プロトコルは、以下のようなエージェント間通信用の基本的な構成要素を定義します。

  • 標準化されたエージェントカード(機能を記述した JSON ドキュメント)を介したエージェントの検出
  • 整合性のあるスキーマを使用したリクエストおよび応答のメッセージ形式
  • 「送信済み」→「処理中」→「完了/失敗」という明確な状態によるタスクのライフサイクル管理。
  • JSON-RPC、gRPC、REST などのトランスポートレイヤー
  • 標準の OAuth2、API キー、JWT トークンを使用したセキュリティスキーム
  • 標準化されたエラーコードを使用したエラー処理

エージェントカード: デジタル名刺

A2A エコシステムのすべてのエージェントは、エージェントのドメイン上にある URL(/.well-known/agent-card.json など)でホストされた「エージェントカード」という標準化された JSON ファイルを介して自身の機能を公開します。 エージェントカードはデジタル名刺のような役割を果たし、あるエージェントが提供するサービスを他のエージェントが検出できるようにします。

エージェントカードには通常、以下の内容が含まれます。

  • 基本情報: エージェント名、説明、バージョンなど。
  • スキル: エージェントが実行できる内容(ドキュメントの下書き作成、テキスト校正、データ解析、画像生成など)。
  • エンドポイント: エージェントへのアクセス方法。
  • その他任意の情報: 使用可能な機能や認証など。

この検出メカニズムがあるため、手動での統合作業が不要となっています。 あるエージェントが特定のスキルを必要としている場合、そのエージェントは関連するエージェントカードを確認するだけで、そのサービスとの対話方法を理解します。

Koog の場合、エージェントカードは Kotlin のデータクラスを使用して定義されています。

val agentCard = AgentCard(
    name = "Blog Writer",
    description = "高品質なブログの投稿と記事を作成するAIエージェント",
    url = "https://api.blog-writer.com/a2a/v1",
    version = "1.0.0",
    capabilities = AgentCapabilities(streaming = true),
    defaultInputModes = listOf("text/plain"),
    defaultOutputModes = listOf("text/markdown"),
    skills = listOf(
        AgentSkill(
            id = "write-post",
            name = "Blog Post Writing",
            description = "任意のトピックに関する魅力的なブログの投稿を生成します",
            tags = listOf("writing", "content", "blog"),
            examples = listOf("AIの動向に関する投稿を作成します")
        )
    )
)

汎用的なメッセージング: 1 つの単純なパターン

A2A は単一の標準化されたメッセージ形式をすべてのエージェント間通信に使用します。 この単純さは非常に効果的です。エージェントは多種多様な API を学習する代わりに、1 つの通信パターンのみを理解すればよいためです。

すべての対話は 1 つのフローに従います。

  1. タスクのリクエストとパラメーターを組み合わせてメッセージを送信します。
  2. 即時の結果か、追跡用のタスクを受信します。
  3. 長期的な処理では、リアルタイムのチャネルを通じて最新情報を取得します。

このような汎用的な方法を取っているため、新しいエージェントの機能を追加する際に通信プロトコルを変更する必要はありません。 エージェントに依頼するのがテキストの要約であれ、複雑なレポートの生成であれ、メッセージの構造は同じです。

Koog では、実装済みのオブジェクトとプロトコルを使用してメッセージの作成と送信を簡単に行えます。

val message = Message(
    role = Role.User,
    parts = listOf(
        TextPart("AIエージェントの未来に関するブログ投稿を作成します")
    ),
    contextId = "blog-project-456"
)

val request = Request(
    data = MessageSendParams(
        message = message,
        configuration = MessageConfiguration(
            blocking = false, // 最初の応答を取得
            historyLength = 5 // コンテキストを含める
        )
    )
)

val response = client.sendMessage(request)

メッセージの形式では、さまざまな Part 型を介して豊富なコンテンツがサポートされています。プレーンテキストのコンテンツの場合は TextPart、ファイルの添付には FilePart、構造化された JSON データの場合は DataPart などを使用できます。

このように構造が統一されているため、Koog エージェントはテキストの処理、ファイルの解析、複雑なデータの変換を問わず、あらゆる A2A 対応エージェントとシームレスに通信することができます。

タスクのライフサイクル: スマートなワークフロー

A2A は、さまざまな種類の処理を複雑度と所要時間に応じてインテリジェントに管理します。

即時のメッセージ: テキストの整形や簡単な計算などの単純な処理は、AI の応答で直接結果を返します。 待機したり追跡したリする必要はありません。

実行時間の長いタスク: ドキュメント解析や多段階ワークフローなどの複雑な処理はスケジュールされ、タスクを返します。 リクエストを送信したエージェントは進捗を監視し、タスクの結果を準備が完了した時点で取得することができます。

リアルタイム更新: 時間を要する処理の場合、サーバー送信イベント(SSE)によって最新の進捗情報がリアルタイムに提供されます。 そのため、エージェントは定期的にポーリングしなくても情報を取得できます。

class BlogWriterExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        val task = Task(
            contextId = context.contextId,
            status = TaskStatus(
                state = TaskState.Submitted,
                message = Message(
                    role = Role.Agent,
                    parts = listOf(TextPart("ブログの作成リクエストを受信しました")),
                    contextId = context.contextId,
    			taskId = context.taskId,
                )
            )
        )

        eventProcessor.sendTaskEvent(task)
	 ...
    }
}

組み込みのセキュリティ機能: 業界標準のみ

A2A は新たなセキュリティ機能をゼロから開発してはいません。 代わりに、実績があり、広く採用されている OAuth2、API キー、標準 HTTPS などの標準を利用しています。

このような方法を採用することで、開発者が新しい認証スキームを学習する必要性をなくしています。 モダンなウェブ API のセキュリティを理解している方は、すでに A2A のセキュリティを理解していると言えます。 このシステムは、これらの確立された標準が提供するすべてのツール、ベストプラクティス、セキュリティ監査を継承しています。

"securitySchemes": {
   "google": {
       "openIdConnectUrl": "https://accounts.google.com/.well-known/openid-configuration",
       "type": "openIdConnect"
   }
}
class AuthorizedA2AServer(
    agentExecutor: AgentExecutor,
    agentCard: AgentCard,
    agentCardExtended: AgentCard? = null,
    taskStorage: TaskStorage = InMemoryTaskStorage(),
    messageStorage: MessageStorage = InMemoryMessageStorage(),
    private val authService: AuthService, // 認証を担うサービス
) : A2AServer(
    agentExecutor = agentExecutor,
    agentCard = agentCard,
    agentCardExtended = agentCardExtended,
    taskStorage = taskStorage,
    messageStorage = messageStorage,
) {

    private suspend fun authenticateAndAuthorize(
        ctx: ServerCallContext,
        requiredPermission: String
    ): AuthenticatedUser {
        val token = ctx.headers["Authorization"]?.firstOrNull()
            ?: throw A2AInvalidParamsException("認可トークンがありません")

        val user = authService.authenticate(token)
            ?: throw A2AInvalidParamsException("認可トークンが無効です")

        if (requiredPermission !in user.permissions) {
            throw A2AUnsupportedOperationException("権限がありません")
        }

        return user
    }

   override suspend fun onSendMessage(
        request: Request,
        ctx: ServerCallContext
    ): Response {
        val user = authenticateAndAuthorize(ctx, requiredPermission = "send_message")

        // ユーザーデータをコンテキストの状態を介してエージェント実行処理に渡す
        val enrichedCtx = ctx.copy(
            state = ctx.state + (AuthStateKeys.USER to user)
        )

        // 強化されたコンテキストを使用して親の実装に委譲する
        return super.onSendMessage(request, enrichedCtx)
    }

   // A2A メソッドの残り 
   // ...
}

Koog エージェントと A2A の統合方法

Koog フレームワークには、A2A クライアントおよびサーバーの両方が組み込まれています。 そのため、Koog エージェントは他の A2A 対応エージェントとシームレスに対話可能であると同時に、外部での検出が可能になっています。 これを実現する方法を簡単に示す例を以下に掲載しています。

Koog エージェントを A2A サーバーにラップする方法

まず、エージェントの戦略を定義します。 Koog には Koog と A2A のメッセージ形式をシームレスに相互変換できる便利な変換関数(toKoogMessagetoA2AMessage)が備わっているため、手動でシリアル化する必要はありません。 メッセージ交換プロセスの処理は nodeA2ASendMessage などの専用ノードが行うため、通信ワークフローを実装するのは簡単です。

fun blogpostWritingStrategy() = strategy<MessageSendParams, A2AMessage>("blogpost-writer-strategy") {
    val blogpostRequest by node<MessageSendParams, A2AMessage> { input ->
        val userMessage = input.toKoogMessage().content

        llm.writeSession {
            user {
                +"ユーザーのリクエストに応じてブログ投稿を作成します"
                +xml {
                    tag("user_request") {
                        +userMessage
                    }
                }
            }

            requestLLM().toA2AMessage()
        }
    }

    val sendMessage by nodeA2ARespondMessage()

    nodeStart then blogpostRequest then sendMessage the nodeFinish
}

次に、エージェント自体を定義します。 A2AServer 機能をインストールすると、エージェントが検出可能となり、エコシステムの他のエージェントからアクセスできるようになります。これにより、特定の用途に特化したエージェントがシームレスに連携する高度なネットワークの作成が可能になります。

fun createBlogpostWritingAgent(
    requestContext: RequestContext,
    eventProcessor: SessionEventProcessor
): AIAgent<MessageSendParams, A2AMessage> {
     // 現在の対話のコンテキスト用に既存メッセージを取得する
     val messageHistory = requestContext.messageStorage.getAll().map { it.toKoogMessage() }

     val agentConfig = AIAgentConfig(
        prompt = prompt("blogpost") {
            system("You are a blogpost writing agent")

            messages(messageHistory)
        },
        model = GoogleModels.Gemini2_5Flash,
        maxAgentIterations = 5
    )

    return agent = AIAgent<FullWeatherForecastRequest, FullWeatherForecast>(
        promptExecutor = MultiLLMPromptExecutor(
            LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
        ),
        strategy = blogpostWritingStrategy(),
        agentConfig = agentConfig
    ) {
        install(A2AAgentServer) {
            this.context = requestContext
            this.eventProcessor = eventProcessor
        }

        handleEvents {
            onAgentFinished { ctx ->
                // エージェントからの応答を使用して現在の対話のコンテキストを更新する
                val resultMessge = ctx.result as A2AMessage
                requestContext.messageStorage.save(resultMessge)
            }
        }
    }
}

その後、このエージェントを実行処理にラップしてからサーバーを定義する必要があります。

class BlogpostAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        createBlogpostWritingAgent(context, eventProcessor)
            .run(context.params.message)
    }
}

val a2aServer = A2AServer(
    agentExecutor = BlogpostAgentExecutor(),
    agentCard = agentCard,
)

最後に、サーバートランスポートを定義してサーバーを実行します。

val transport = HttpJSONRPCServerTransport(
    requestHandler = a2aServer
)

transport.start(
    engineFactory = Netty,
    port = 8080,
    path = "/a2a",
    wait = true,
    agentCard = agentCard,
    agentCardPath = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH
)

エージェントがリクエストを処理できるようになりました!

Koog エージェントから他の A2A 対応エージェントを呼び出す方法

まずは A2A クライアントを構成し、そのクライアントを接続してエージェントカードを取得する必要があります。

val agentUrl = "https://example.com"

val cardResolver = UrlAgentCardResolver(
    baseUrl = agentUrl,
    path = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH,
)

val transport = HttpJSONRPCClientTransport(
    url = agentUrl,
)

val a2aClient = A2AClient(
    transport = transport,
    agentCardResolver = cardResolver
)

// クライアントを初期化してカードを取得する
a2aClient.connect()

その後、strategy で nodeA2ASendMessage または nodeA2ASendMessageStreaming を使用し、これらのクライアントを呼びしてメッセージやタスクの応答を受け取ります。

val agentId = "agent_id"
val agent = AIAgent<String, String>(
    promptExecutor = MultiLLMPromptExecutor(
        LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
    ),
    strategy = strategy<String, String>("a2a") {
        val nodePrepareRequest by node<String, A2AClientRequest> { input ->
            A2AClientRequest(
                agentId = agentId,
                callContext = ClientCallContext.Default,
                params = MessageSendParams(
                    message = A2AMessage(
                        messageId = Uuid.random().toString(),
                        role = Role.User,
                        parts = listOf(
                            TextPart(input)
                        )
                    )
                )
            )
        }
        val nodeA2A by nodeA2AClientSendMessage(agentId)
        
        val nodeProcessResponse by node<CommunicationEvent, String> {
            // Process event
            when (it) {
                is A2AMessage -> it.parts
                    .filterIsInstance()
                    .joinToString(separator = "\n") { it.text }
                
                is Task -> it.artifacts
                    .orEmpty()
                    .flatMap { it.parts }
                    .filterIsInstance()
                    .joinToString(separator = "\n") { it.text }
            }
        }

        nodeStart then nodePrepareRequest then nodeA2A then nodeProcessResponse then nodeFinish

    },
    agentConfig = agentConfig
) {
   install(A2AAgentClient) {
        this.a2aClients = mapOf(agentId to client)
    }
}

agent.run("A2A と Koog の統合に関するブログ投稿を作成します")

次のステップ

Koog と A2A について深掘りする場合は、以下の参考資料をご覧ください。

Koog のドキュメント

A2A の仕様

Koog A2A の例

オリジナル(英語)ブログ投稿記事の作者:

Andrey Bragin

Andrey Bragin

image description

Discover more