Ai logo

JetBrains AI

Supercharge your tools with AI-powered features inside many JetBrains products

News Releases

Koog × A2A: Kotlin으로 구현하는 상호 연동되는 AI 에이전트

Read this post in other languages:

여러 AI 에이전트로 구성된 시스템을 구축해 보았다면, 아마도 문제를 겪어 보셨을 겁니다. 처음에는 꽤 단순하게 시작합니다. 한 에이전트는 블로그 게시물을 작성하고, 다른 에이전트는 이를 교정하며, 아마도 세 번째 에이전트는 이미지를 제안하거나 생성하도록 했다고 가정해 보겠습니다. 개별적으로 보면, 각 에이전트는 자기 일을 효과적으로 해냅니다. 하지만 함께라면 어떨까요? 여기서 문제가 생기기 시작합니다.

각 에이전트는 자신의 ‘언어’를 사용합니다. 한 에이전트는 상이한 API 인터페이스를 사용하고, 다른 에이전트는 고유한 메시지 형식을 가지고 있으며, 이 모두에 특정한 인증 요건이 있을 수 있습니다. 따라서 이들이 서로 소통하게 하려면 각 연결마다 그에 맞는 통합 코드를 작성해야 합니다. 하지만 가교를 만드는 일에 묶여 정작 에이전트를 더 지능적이고 빠르며 유용하게 만드는 데는 소홀하게 됩니다.

A2A의 역할: 에이전트 간 커뮤니케이션 계층

여기서 Agent2Agent(A2A) 프로토콜의 효용성이 발휘됩니다.

AI 에코시스템을 위한 범용 번역사 역할을 하는 A2A를 이용하면 에이전트가 표준화된 프로토콜을 통해 직접 소통할 수 있습니다. 블로그 작성 에이전트는 콘텐츠를 교정 에이전트에게 매끄럽게 전달하고, 이 과정에서 이미지 생성 에이전트가 트리거됩니다. 교정 에이전트는 수정 사항을 되돌려 보내며, 이미지 생성 에이전트는 스타일 관련 세부 지침을 요청합니다. 즉, 하나의 통합된 커뮤니케이션 계층을 통해 모든 작업이 조정됩니다.

A2A는 수십 개의 단일 연결을 관리하는 대신 다음을 제공합니다.

  • 플러그앤플레이 연결: 에이전트가 서로를 자동으로 검색하고 연결
  • 표준화된 메시징: 단일 형식, 명확한 프로토콜 및 변환 필요성 해소
  • 오케스트레이션 내장: 워크플로만 정의하면 A2A가 알아서 조율
  • 복잡성 없이 확장: 기존 연결을 수정하지 않고 에이전트를 추가하거나 재사용

결과는 어떨까요? 커뮤니케이션을 디버그하는 대신 에이전트의 기능을 개선하는 데 시간을 할애할 수 있습니다. 무엇보다 원하는 어떤 언어나 프레임워크로든 에이전트를 구현할 수 있습니다. JVM 사용자의 경우, Koog가 가장 많은 선택을 받으며 버전 0.5.0 기준으로 A2A 에코시스템과 원활하게 통합됩니다.

Koog의 역할: 내부 오케스트레이션 엔진

Koog는 JVM, Android, iOS, WebAssembly 및 브라우저 내 애플리케이션을 타깃으로 AI 에이전트를 구축하기 위한 Kotlin 기반 프레임워크입니다. 장점은 다음과 같습니다.

  • 복잡한 워크플로 관리: 루프, 브랜치, 폴백 및 병렬 브랜치 실행의 도움을 받아 그래프 기반의 전략을 설계합니다.
  • 바로 사용할 수 있는 구성 요소: LLM과 외부 도구를 호출하고, 메시지 내역을 요약하며 전체 전략을 실행하는 데 내장 노드를 활용할 수 있습니다.
  • 도구 오케스트레이션: 코드의 어떤 함수든 AI 에이전트가 사용할 수 있는 도구로 전환합니다. 순차적으로, 또는 동시적으로도 가능합니다.
  • 네이티브 MCP 통합: Kotlin MCP SDK를 사용하는 모든 MCP 서버에 원활하게 연결합니다.
  • 메모리 및 스토리지 지원: 효율적인 컨텍스트 관리와 함께 에이전트 메모리와 RAG(Retrieval-Augmented Generation) 워크플로를 기본적으로 지원합니다.
  • 내결함성: 재시도, 검사점 실행, 복구 메커니즘 및 상태 지속성이 기본 제공되어 안정적인 실행을 보장합니다.
  • 관찰 가능성: 전체 에이전트 이벤트 처리, 로깅을 제공하며, Langfuse 및 W&B Weave와의 기본 통합을 통해 OpenTelemetry를 지원합니다.

간단히 말해서, Koog는 AI 에이전트를 안정적으로 구축하기에 매우 좋습니다.

Koog를 A2A와 함께 사용하는 이유

Koog와 A2A는 AI 에이전트 스택의 서로 다른 계층을 처리합니다. 따라서 함께 사용했을 때 서로를 보완하고 빈틈을 채웁니다.

Koog는 실제 엔터프라이즈 사용에 필요한 AI 오케스트레이션에서 가장 어려운 부분을 해결합니다.

A2A가 채워주는 부족한 부분: Koog 에이전트가 에코시스템의 다른 A2A 호환 에이전트와 소통할 수 있게 해줍니다. 각 외부 서비스에 대해 맞춤화된 통합을 구축하는 대신 Koog AI 워크플로는 다른 에이전트를 자동으로 찾아 사용할 수 있습니다.

결과는 완벽한 결합: Koog의 고급 워크플로는 모든 에이전트가 요청할 수 있는 A2A 작업이 되고, Koog 에이전트는 A2A 에코시스템을 완벽하게 활용할 수 있습니다. 또한 Koog는 백엔드, 기기 내 및 브라우저 내 환경에서 실행되므로 서로 연결된 AI를 이전보다 더 폭넓고 효과적으로 전달할 수 있습니다.

이것이 어떻게 가능할까요? 함께 살펴보겠습니다!

A2A 프로토콜

A2A 프로토콜은 에이전트 간 커뮤니케이션의 필수 구성 요소를 정의합니다.

  • 표준화된 에이전트 카드(기능을 설명하는 JSON 문서)를 통한 에이전트 검색
  • 일관된 스키마를 가진 요청 및 응답 메시지 형식
  • 제출 → 작업 → 완료/실패의 명확한 상태를 통한 작업 수명 주기 관리
  • JSON-RPC, gRPC, REST 등의 전송 계층
  • 표준 OAuth2, API 키 및 JWT 토큰을 이용한 보안 체계
  • 표준화된 오류 코드를 이용한 오류 처리

에이전트 카드: 디지털 명함

A2A 에코시스템의 모든 에이전트는 ‘에이전트 카드’로 자신의 기능을 게시합니다. 에이전트 카드는 에이전트 도메인의 일부 URL(예: /.well-known/agent-card.json)에서 호스팅되는 표준화된 JSON 파일입니다. 디지털 명함 역할을 하는 이 에이전트 카드를 통해 다른 에이전트가 해당 에이전트의 역할을 알 수 있습니다.

에이전트 카드에는 일반적으로 다음이 포함됩니다.

  • 기본 정보: 에이전트 이름, 설명, 버전 등
  • 능력: 에이전트가 제공할 수 있는 서비스(예: 문서 초안 작성, 텍스트 교정, 데이터 분석, 이미지 생성)
  • 엔드포인트: 에이전트에 연결하는 방법 
  • 기타 선택적 정보: 사용 가능한 기능, 인증 등

이러한 검색 메커니즘이 있으므로 직접 통합하는 작업이 필요하지 않습니다. 어떤 특정한 스킬이 필요한 경우, 에이전트는 간단히 관련이 있는 에이전트 카드를 찾아 이 서비스와 상호 작용하는 방법을 파악합니다.

Koog에서 에이전트 카드는 Kotlin 데이터 클래스를 이용해 정의됩니다.

val agentCard = AgentCard(
    name = "Blog Writer",
    description = "AI agent that creates high-quality blog posts and articles",
    url = "https://api.blog-writer.com/a2a/v1",
    version = "1.0.0",
    capabilities = AgentCapabilities(streaming = true),
    defaultInputModes = listOf("text/plain"),
    defaultOutputModes = listOf("text/markdown"),
    skills = listOf(
        AgentSkill(
            id = "write-post",
            name = "Blog Post Writing",
            description = "Generate engaging blog posts on any topic",
            tags = listOf("writing", "content", "blog"),
            examples = listOf("Write a post about AI trends")
        )
    )
)

범용 메시징: 하나의 간단한 패턴

A2A는 에이전트와의 모든 커뮤니케이션에 하나의 표준화된 메시지 형식을 사용합니다. 이러한 단순성은 매우 강력한데, 에이전트는 수십 개의 API를 알 필요 없이 하나의 커뮤니케이션 패턴만 이해하면 되기 때문입니다.

각각의 상호 작용은 동일한 흐름을 따릅니다.

  1. 작업 요청과 매개변수를 포함한 메시지 보내기
  2. 즉각적인 결과 또는 추적할 수 있는 작업 수신
  3. 긴 작업의 경우 실시간 채널을 통해 업데이트 수신

이러한 범용적 접근 방식을 따르면 새로운 에이전트 기능을 추가해도 커뮤니케이션 프로토콜을 변경할 필요가 없습니다. 에이전트에게 텍스트를 요약하거나 복잡한 보고서를 생성하도록 요청하는 모든 경우에 메시지 구조는 일관되게 유지됩니다.

Koog에서는 이미 구현된 객체와 프로토콜을 이용하므로 메시지를 생성하고 보내는 과정이 단순합니다.

val message = Message(
    role = Role.User,
    parts = listOf(
        TextPart("Write a blog post about the future of AI agents")
    ),
    contextId = "blog-project-456"
)

val request = Request(
    data = MessageSendParams(
        message = message,
        configuration = MessageConfiguration(
            blocking = false, // 첫 번째 응답 가져오기
            historyLength = 5 // 컨텍스트 포함
        )
    )
)

val response = client.sendMessage(request)

다양한 Part 타입을 통해 메시지 형식에 풍부한 콘텐츠가 지원됩니다. 예를 들어, 일반 텍스트의 경우 TextPart, 첨부 파일의 경우 FilePart, 구조화된 JSON 데이터의 경우 DataPart가 이용됩니다.

이 통합적 구조에 기반하여 Koog 에이전트는 텍스트 처리, 파일 분석 또는 복잡한 데이터 변환 등 모든 작업에서 A2A 호환 에이전트와 원활하게 소통할 수 있습니다.

작업 수명 주기: 스마트 워크플로

A2A는 복잡도와 지속 기간에 따라 다양한 유형의 작업을 지능적으로 관리합니다.

즉각적인 메시지: 텍스트 서식 지정이나 빠른 계산과 같은 단순한 작업의 경우, AI 응답에 바로 결과가 반환됩니다. 기다리거나 추적할 필요가 없습니다.

장기 실행 작업: 문서 분석이나 다단계 워크플로와 같이 복잡한 작업은 일정 계획에 따라 결과가 반환됩니다. 그러면 요청한 에이전트가 진행 상황을 모니터링하고 작업 결과가 준비되면 가져옵니다.

실시간 업데이트: 시간이 많이 소요되는 작업의 경우, Server-Sent Events(SSE)가 실시간으로 진행 상황을 업데이트합니다. 따라서 계속해서 폴링하지 않아도 에이전트가 정보를 지속적으로 수신할 수 있습니다.

class BlogWriterExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        val task = Task(
            contextId = context.contextId,
            status = TaskStatus(
                state = TaskState.Submitted,
                message = Message(
                    role = Role.Agent,
                    parts = listOf(TextPart("Blog writing request received")),
                    contextId = context.contextId,
    			taskId = context.taskId,
                )
            )
        )

        eventProcessor.sendTaskEvent(task)
	 ...
    }
}

보안 기능 내장: 업계 표준만 이용

A2A는 보안을 자체적으로 만들지 않습니다. 대신 OAuth2, API 키, 표준 HTTPS 등 널리 수용되고 입증된 표준을 이용합니다.

따라서 개발자는 새로운 인증 체계에 대해 신경 쓸 필요가 없습니다. 현대적 웹 API 보안을 알고 있다면 A2A 보안도 알고 있는 것입니다. 이러한 기존 표준이 제공하는 모든 도구, 모범 사례 및 보안 감사가 시스템에 그대로 적용됩니다.

"securitySchemes": {
   "google": {
       "openIdConnectUrl": "https://accounts.google.com/.well-known/openid-configuration",
       "type": "openIdConnect"
   }
}
class AuthorizedA2AServer(
    agentExecutor: AgentExecutor,
    agentCard: AgentCard,
    agentCardExtended: AgentCard? = null,
    taskStorage: TaskStorage = InMemoryTaskStorage(),
    messageStorage: MessageStorage = InMemoryMessageStorage(),
    private val authService: AuthService, // 인증을 담당하는 서비스
) : A2AServer(
    agentExecutor = agentExecutor,
    agentCard = agentCard,
    agentCardExtended = agentCardExtended,
    taskStorage = taskStorage,
    messageStorage = messageStorage,
) {

    private suspend fun authenticateAndAuthorize(
        ctx: ServerCallContext,
        requiredPermission: String
    ): AuthenticatedUser {
        val token = ctx.headers["Authorization"]?.firstOrNull()
            ?: throw A2AInvalidParamsException("Missing Authorization token")

        val user = authService.authenticate(token)
            ?: throw A2AInvalidParamsException("Invalid Authorization token")

        if (requiredPermission !in user.permissions) {
            throw A2AUnsupportedOperationException("Insufficient permissions")
        }

        return user
    }

   override suspend fun onSendMessage(
        request: Request,
        ctx: ServerCallContext
    ): Response {
        val user = authenticateAndAuthorize(ctx, requiredPermission = "send_message")

        // 컨텍스트 상태를 통해 에이전트 실행자에게 사용자 데이터 전달
        val enrichedCtx = ctx.copy(
            state = ctx.state + (AuthStateKeys.USER to user)
        )

        // 풍부한 컨텍스트를 사용하여 상위 구현에 위임
        return super.onSendMessage(request, enrichedCtx)
    }

   // 나머지 래핑된 A2A 메서드 
   // ...
}

Koog 에이전트를 A2A와 통합하는 방법

Koog 프레임워크에는 A2A 클라이언트와 서버가 기본적으로 내장되어 있습니다. 즉, Koog 에이전트는 다른 A2A 지원 에이전트와 원활하게 소통하면서 외부에서도 자신을 검색할 수 있도록 합니다. 이를 구현하는 방법을 보여주는 간단한 예를 들어보겠습니다.

Koog 에이전트를 A2A 서버에 래핑하는 방법

우선, 에이전트에 대한 전략을 정의합니다. Koog는 Koog와 A2A 사이의 원활한 변환을 위한 편리한 변환기(toKoogMessage, toA2AMessage)를 제공하므로 직접 직렬화할 필요가 없습니다. nodeA2ASendMessage와 같은 특수 노드가 메시지 교환 과정을 처리해 주므로 커뮤니케이션 워크플로의 구현이 간단합니다.

fun blogpostWritingStrategy() = strategy("blogpost-writer-strategy") {
    val blogpostRequest by node { input ->
        val userMessage = input.toKoogMessage().content

        llm.writeSession {
            user {
                +"Write a blogpost based on the user request"
                +xml {
                    tag("user_request") {
                        +userMessage
                    }
                }
            }

            requestLLM().toA2AMessage()
        }
    }

    val sendMessage by nodeA2ARespondMessage()

    nodeStart then blogpostRequest then sendMessage the nodeFinish
}

두 번째, 에이전트 자체를 정의합니다. A2AServer 기능을 설치하면 에코시스템의 다른 에이전트가 해당 에이전트를 검색하고 액세스할 수 있게 되어 다수의 특수 에이전트가 원활하게 협력하는 정교한 네트워크가 형성됩니다.

fun createBlogpostWritingAgent(
    requestContext: RequestContext,
    eventProcessor: SessionEventProcessor
): AIAgent {
     // 현재 대화 컨텍스트에 대한 기존 메시지 가져오기
     val messageHistory = requestContext.messageStorage.getAll().map { it.toKoogMessage() }

     val agentConfig = AIAgentConfig(
        prompt = prompt("blogpost") {
            system("You are a blogpost writing agent")

            messages(messageHistory)
        },
        model = GoogleModels.Gemini2_5Flash,
        maxAgentIterations = 5
    )

    return agent = AIAgent(
        promptExecutor = MultiLLMPromptExecutor(
            LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
        ),
        strategy = blogpostWritingStrategy(),
        agentConfig = agentConfig
    ) {
        install(A2AAgentServer) {
            this.context = requestContext
            this.eventProcessor = eventProcessor
        }

        handleEvents {
            onAgentFinished { ctx ->
                // 에이전트의 응답으로 현재 대화 컨텍스트 업데이트
                val resultMessge = ctx.result as A2AMessage
                requestContext.messageStorage.save(resultMessge)
            }
        }
    }
}

세 번째, 에이전트를 실행자에 래핑한 다음 서버를 정의해야 합니다.

class BlogpostAgentExecutor : AgentExecutor {
    override suspend fun execute(
        context: RequestContext,
        eventProcessor: SessionEventProcessor
    ) {
        createBlogpostWritingAgent(context, eventProcessor)
            .run(context.params.message)
    }
}

val a2aServer = A2AServer(
    agentExecutor = BlogpostAgentExecutor(),
    agentCard = agentCard,
)

마지막으로, 서버 전송을 정의하고 서버를 실행합니다.

val transport = HttpJSONRPCServerTransport(
    requestHandler = a2aServer
)

transport.start(
    engineFactory = Netty,
    port = 8080,
    path = "/a2a",
    wait = true,
    agentCard = agentCard,
    agentCardPath = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH
)

이제 에이전트가 요청을 처리할 수 있습니다! 

Koog 에이전트에서 다른 A2A 지원 에이전트를 호출하는 방법

먼저, A2A 클라이언트를 구성하고 이를 연결하여 에이전트 카드를 가져와야 합니다.

val agentUrl = "https://example.com"

val cardResolver = UrlAgentCardResolver(
    baseUrl = agentUrl,
    path = A2AConsts.AGENT_CARD_WELL_KNOWN_PATH,
)

val transport = HttpJSONRPCClientTransport(
    url = agentUrl,
)

val a2aClient = A2AClient(
    transport = transport,
    agentCardResolver = cardResolver
)

// 클라이언트를 초기화하고 카드를 가져옵니다
a2aClient.connect()

그런 다음, 해당 전략에서 nodeA2ASendMessage 또는 nodeA2ASendMessageStreaming을 사용하여 이러한 클라이언트를 호출하고 메시지 또는 작업 응답을 가져올 수 있습니다.

val agentId = "agent_id"
val agent = AIAgent(
    promptExecutor = MultiLLMPromptExecutor(
        LLMProvider.Google to GoogleLLMClient(System.getenv("GOOGLE_API_KEY")),
    ),
    strategy = strategy("a2a") {
        val nodePrepareRequest by node<String, A2AClientRequest> { input ->
            A2AClientRequest(
                agentId = agentId,
                callContext = ClientCallContext.Default,
                params = MessageSendParams(
                    message = A2AMessage(
                        messageId = Uuid.random().toString(),
                        role = Role.User,
                        parts = listOf(
                            TextPart(input)
                        )
                    )
                )
            )
        }
        val nodeA2A by nodeA2AClientSendMessage(agentId)
        
        val nodeProcessResponse by node {
            // 프로세스 이벤트
            when (it) {
                is A2AMessage -> it.parts
                    .filterIsInstance()
                    .joinToString(separator = "n") { it.text }
                
                is Task -> it.artifacts
                    .orEmpty()
                    .flatMap { it.parts }
                    .filterIsInstance()
                    .joinToString(separator = "n") { it.text }
            }
        }

        nodeStart then nodePrepareRequest then nodeA2A then nodeProcessResponse then nodeFinish

    },
    agentConfig = agentConfig
) {
   install(A2AAgentClient) {
        this.a2aClients = mapOf(agentId to client)
    }
}

agent.run("Write blog post about A2A and Koog integration")

다음 단계

Koog와 A2A에 대해 자세히 알아보려면 다음 유용한 자료를 살펴보세요.

Koog 문서

A2A 사양

Koog A2A 예시

게시물 원문 작성자

Andrey Bragin

Andrey Bragin

image description

Discover more