Ecosystem

Kotlin コルーチン 1.5: GlobalScope がデリケート扱いに、チャンネル API の改善など

Read this post in other languages:

Kotlin コルーチン 1.5.0 がリリースされました! 新しいバージョンには以下の内容が追加されています。

  • Channel API の改善。ライブラリ関数の名前付けスキームが新しくなったのと同時に、非サスペンド関数の trySendtryReceiveofferpoll のより優れた代替関数として導入されました。
  • Reactive 統合の安定化。Reactive Streams と Korlin Flow 間での変換に使用できる関数をさらに追加し、多数の既存の関数と ReactiveContext API を安定させました。

このブログ記事では、新しいバージョンへの移行に関する推奨事項も記載しています。

コルーチン 1.5.0 を使い始める

GlobalScope をデリケート API 扱いに

GlobalScope クラスに @DelicateCoroutinesApi アノテーションが付けられました。 今後 GlobalScope を使用する際は、@OptIn(DelicateCoroutinesApi::class) による明示的なオプトインが必要となります。

GlobalScope の使用はほとんどの場合において推奨されませんが、公式ドキュメントでは引き続きデリケート(Delicate:取り扱いに注意が必要)な API を使用した概念として紹介しています。

グローバル CoroutineScope はどのジョブにもバインドされません。 グローバルスコープはアプリケーションの寿命を通じて動作する最上位のコルーチンを起動するには使用され、途中でキャンセルされません。 GlobalScope で起動されたアクティブなコルーチンは、プロセスを alive に維持するものではなく、 デーモンスレッドのように機能します。

GlobalScope は、使用すると誤ってリソースリークやメモリリークを発生させてしまう可能性があるため、取り扱いに注意な API です。 GlobalScope で起動したコルーチンは構造化された並行性の原則の対象外であるため、(ネットワークが遅いなどの)問題によってハングしたり遅延が発生したりすると、そのまま動作し、リソースを消費し続けてしまいます。 たとえば、次のコードを考察してみましょう。

loadConfiguration の呼び出しによって GlobalScope でコルーチンが作成され、それをキャンセルしたり完了を待機したりする規定もなく、バックグラウンドで動作しています。 ネットワークの速度が低下すると、コルーチンはバックグラウンドで待機し、リソースを消費し続けてしまいます。 loadConfiguration を繰り返して呼び出せば、消費されるリソースがさらに増加してしまいます。

考えられる代替手法

多くの場合、GlobalScope の使用は避けるべきであり、次の例のように、それを含む操作を suspend でマークしておく必要があります。

GlobalScope.launch を使用して複数の並行処理が起動されている場合には、対応する操作は代わりに coroutineScope でグループ化する必要があります。

最上位のコードでは、並行処理が非サスペンドのコンテキストから起動される際には、GlobalScope の代わりに、適切に限定された CoroutineScope のインスタンスを使用する必要があります。

正当なユースケース

GlobalScope が合理的かつ安全に使用される状況は限定的です。アプリケーションの寿命期間中にアクティブな状態を維持する必要のある最上位のバックグラウンドプロセスなどです。 そのため、GlobalScope を使用するには、以下のように @OptIn(DelicateCoroutinesApi::class) を使って明示的にオプトインする必要があります。

GlobalScope の使用箇所をよくレビューし、「正当なユースケース」に該当するもののみにアノテーションを付けることをお勧めします。 ほかの使用状況においては、コードのバグの原因になる可能性があります。そのような GlobalScope については前述の方法で書き換えてください。

JUnit 5 用の拡張機能

テストを別のスレッドで実行し、指定の時間制限を超えると失敗にしてスレッドを中断させることのできる CoroutinesTimeout アノテーションを追加しました。 以前の CoroutinesTimeout は JUnit 4 向けに提供されていましたが、 今回のリリースでは、JUnit 5 との統合を追加しています。

新しいアノテーションを使用するには、次の依存関係をプロジェクトに追加してください。

以下に、テストで新しい CoroutinesTimeout を使用する方法の例を簡単に示します。

この例では、コルーチンのタイムアウトはクラスレベルで firstTest を指定して定義されています。 関数のアノテーションがクラスレベルのアノテーションをオーバーライドするため、アノテーション付きのテストはタイムアウトしません。 secondTest はクラスレベルのアノテーションを使用しているため、タイムアウトします。

このアノテーションは、次のようにして宣言されています。

最初の testTimeoutMs パラメーターは、タイムアウト時間をミリ秒で指定しています。 2 つ目の cancelOnTimeout パラメーターは、タイムアウト時に実行中のすべてのコルーチンをキャンセルするかどうかを判定します。 true に設定されている場合、すべてのコルーチンは自動的にキャンセルされます。

CoroutinesTimeout アノテーションを使用する際は、コルーチンデバッガーが必ず自動的に有効になり、タイムアウト時のすべてのコルーチンをダンプします。 ダンプにはコルーチン作成のスタックトレースが含まれています。 テストを高速化するために作成のスタックトレースを無効にする必要がある場合は、この構成を行える CoroutinesTimeoutExtension を直接使用することを検討してください。

JUnit 5 用の CoroutinesTimeout に関する有益な PoC を作成してくれた Abhijit Sarkar に感謝いたします。 このアイデアを基に、1.5 リリースに追加した新しい CoroutinesTimeout アノテーションが作成されました。

Channel API の改善

Channel は重要なコミュニケーションプリミティブで、さまざまなコルーチンとコールバック間でデータを渡すことができます。 このリリースでは、Channel API の機能を少し改良し、困惑を招いていた offerpoll 関数をより優れた代替関数に置き換えました。 同時に、サスペンドメソッドと非サスペンドメソッド向けの一貫した新しい名前付けスキームも作成しました。

新しい名前付けスキーム

名前付けスキームに一貫性を持たせるように改善し、ほかのライブラリやコルーチン API でも使用できるようにしました。 関数の名前から振る舞いがわかるように作成しています。 そのため、新しいスキームでは次のような取り決めがなされています。

  • 通常のサスペンドメソッドについては、sendreceive などのようにそのままです。
  • エラーをカプセル化した非サスペンドメソッドについては、以前の offerpoll ではなく、trySendtryReceive のように接頭辞を「try」で一貫させています。
  • エラーをカプセル化する新しいサスペンドメソッドには、「Catching」という接尾辞を使用します。

これらの新しいメソッドについて詳しく見てみましょう。

Try 関数: send と receive に対応する非サスペンド関数

1 つのコルーチンはチャンネルに情報を送信し、もう 1 つのコルーチンはその情報を受信することができます。 sendreceive はともにサスペンド関数です。 send はチャンネルがいっぱいであり、新しい要素を受け入れられない場合に、そのコルーチンを一時的に中断し、receive は返す要素がチャンネルに存在しない場合に、コルーチンを一時的に中断します。

これらの関数には同期コードでこれらと対照して使用する、offerpoll という非サスペンドメソッドがあります。今回これらは非推奨となり、trySendtryReceive に置き換えられています。 このように変更された理由について説明しましょう。

offerpollsendreceive と同じことを実行しますが、一時的に中断させる機能はありません。 そう言えば、単純であり、要素が送信または受信される際にすべては正しく動作するように聞こえますが、 エラーが発生した場合には、どうなるのでしょうか。 sendreceive は、ジョブを実行できる状態になるまで一時的に中断しますが、 offerpoll については、チャンネルがいっぱいであるために要素が追加されない場合やチャンネルが空であるために取得できる要素がない場合には、それぞれ falsenull を返すことしかできません。 クローズされたチャンネルと通信しようとすれば例外をスローし、このことが混乱を招いていました。

この例では、poll は要素が追加される前に呼び出されているため、すぐに null を返しています。 これは想定される使用方法ではないことに注意してください。要素のポーリングを通常通り続けるところですが、この説明を単純化するために直接呼び出しています。 offer の呼び出しについても、チャンネルがランデブーモードであり、バッファ容量がゼロであるため失敗してしまいます。 そのため、offerfalse を、pollnull を返してしまうのです。単なる呼び出し順序の誤りが原因です。

上の例の channel.close() ステートメントのコメントを解除し、例外がスローされるようにしてみましょう。 この場合、poll は前と同様に false を返しますが、 offer についてはすでにクローズとなったチャンネルに要素を追加しようとし、失敗して例外をスローしています。 このような動作はエラーになりがちだという不満をたくさん耳にしました。 この例外をキャッチすることは忘れやすく、代わりに無視したり別の方法で処理したりしようとすると、プログラムがクラッシュしてしまいます。

この課題を修正してより詳細な結果を返すために、新しい trySendtryReceive が導入されました。 それぞれ ChannelResult インスタンスを返し、このインスタンスから、成功、失敗、またはチャンネルがクローズしていることを示す内容を得られます。

この例は前の例と同じように動作しますが、tryReceivetrySend を使用することで、より詳しい結果が返されるようになっています。 出力が、falsenull の代わりに Value(Failed) となっているのがわかるでしょう。 チャンネルをクローズしている行のコメントをもう一度解除し、trySend が例外をキャプチャして Closed の結果を返すようになることを確認してください。

inline value クラスにより、ChannelResult を使用しても追加のラッパーが作成されることがなく、successful 値が返される場合でも、オーバーヘッドを伴うことなくそのまま返されます。

Catching 関数: エラーをカプセル化するサスペンド関数

このリリースより、エラーをカプセル化するサスペンドメソッドには、「Catching」という接尾辞が使用されるようになりました。 たとえば、新しい receiveCatching 関数は、チャンネルがクローズになっている場合の例外を処理します。 以下の簡単な例を見てみましょう。

チャンネルは、値を取得しようとする前にクローズになっています。 ただし、プログラムは、チャンネルがクローズであることを示し、成功として完了します。 receiveCatching を通常の receive 関数に置き換えると、ClosedReceiveChannelException が発生します。

現時点では、receiveCatchingonReceiveCatching(以前の内部の receiveOrClosed の代替)しか提供していませんが、今後さらに関数を追加する予定です。

新しい関数にコードを移行する

プロジェクト内で offerpoll 関数を使用しているすべての箇所を自動的に新しい呼び出しに置き換えることができます。 offerBoolean を返すため、それに相当するコードとして channel.trySend("Element").isSuccess に置き換えることができます。

同様に、poll 関数も null 許容要素を返すため、channel.tryReceive().getOrNull() に置き換えることができます。

呼び出しの結果が使用されていない場合は、直接新しい呼び出しに置き換えることができます。

例外処理の動作は異なってしまうため、これについては手作業で必要な更新を行う必要があります。 「offer」と「poll」でクローズしたチャンネルに対する例外のスローを行っている場合は、次のように置き換える必要があります。

channel.offer("Element") に相当する新しいコードは、チャンネルがクローズである場合に、通常通りにクローズされている場合であっても例外をスローします。

channel.poll() に相当する新しいコードは、チャンネルがエラーでクローズされた場合には例外をスローし、通常通りクローズされた場合には null を返します。

この変更は、以前の offerpoll 関数の動作を反映したものです。

ほとんどの場合において、クローズされたチャンネルに対処するコードにこういった微妙な動作が使われていないこと、またこれはバグの原因だったと想定し、 IDE が提供する自動置換では、セマンティクスを単純化しています。 このケースが当てはまらない場合は、使用状況を確認し、手動で更新してください。また、クローズされたチャンネルを例外をスローせずに別の方法で処理する場合は、完全に書き直すことを検討してください。

安定化を目指した Reactive の統合

バージョン 1.5 の Kotlin コルーチンでは、Reactive フレームワークとの統合に関わるほとんどの関数を stable な API に昇格しています。

JVM エコシステムには、Reactive Streams 標準に準拠して非同期ストリーム処理を扱うフレームワークがいくつかあります。 たとえば、この分野で最も一般的に使用されている Java フレームワークは、Project ReactorRxJava です。

Kotlin Flows はこれらとは異なり、標準が指定する型とは互換性がありませんが、概念的にはストリームと同等です。 そのため、Flow から Reactive(仕様と TCK 準拠)の Publisher へ、またその逆方向に変換することが可能です。 このような変換は、kotlinx.coroutines を使ってすぐに利用でき、対応する Reactive モジュールに含まれていることがあります。

たとえば、Project Reactor 型の運用互換性が必要である場合、以下の依存関係をプロジェクトに追加する必要があります。

すると、Reactive Streams 型を使用する場合は Flow<T>.asPublisher() を、Project Reactor 型を直接使用する必要がある場合は Flow<T>.asFlux() を使用できるようになります。

上記は、簡単なまとめです。 詳細に関心のある方は、Reactive Streams と Kotlin Flows に関する Roman Elizarov の記事をお読みください。

Reactive ライブラリとの統合作業は API の stable 化をゴールとしていますが、厳密に言えば、@ExperimentalCoroutinesApi を取り除くことと、残りの様々なトピックを実装することを目標としています。

Reactive Streams との統合の改善

サードパーティのフレームワークと Kotlin コルーチンの運用互換性を確立するには、Reactive Streams 仕様との互換性が重要です。 コードをまるごと書き換えずに Kotlin コルーチンをレガシープロジェクトに採用する上で役立ちます。

今回、非常に多数の関数を stable ステータスに昇格することができました。 すべての Reactive Streams 実装から Flow に型を変換し、その逆の変換も可能になっています。 たとえば、コルーチンを使って新しいコードを記述できますが、逆変換によって古い Reactive コードベースに統合することができます。

また、多数の改善ReactorContext に適用されており、Reactor のコンテキストCoroutineContext にラップして、Project Reactor と Kotlin コルーチン間のシームレスな統合を実現しています。 この統合により、コルーチンを介して Reactor の Context に関する情報を伝搬できるようにもなりました。

コンテキストは、MonoFluxPublisher.asFlowFlow.asPublisherFlow.asFlux などのサブスクライバーのコンテキストを通じてすべての Reactive 統合によって明示的に伝搬されます。 以下に、サブスクライバーの ContextReactorContext に伝搬する簡単な例を示します。

上記の例では、Flow インスタンスを構築して、Reactor の Flux インスタンスにコンテキスト無しで変換しています。 引数を使わずに subscribe() メソッドを呼び出すと、パブリッシャーにすべてののデータを送信するよう要求する効果があります。 そのため、プログラムは「Reactor context in Flow: null」というフレーズを出力しています。

次の呼び出しチェーンも FlowFlux に変換していますが、キー値ペアの answer=42 をこのチェーンの Reactor のコンテキストに追加しています。 subscribe() への呼び出しによってこのチェーンがトリガーされます。 この場合、コンテキストが入力されているため、プログラムは「Reactor context in Flow: Context1{answer=42}」と出力します。

新しい便利な関数

コルーチンのコンテキストで Mono のような Reactive 型を使用しているときにスレッドをブロックせずに取得できる便利な関数がいくつかあります。 このリリースでは、任意の Publisher における awaitSingleOr* 関数と、MonoMaybe の特殊な一部の await* 関数を非推奨に指定しました。

Mono は、最大 1 つの値を生成するため、最後の要素と最初の要素が同一となります。 この場合、残りの要素を切り落とすセマンティクスが不要になります。 そのため、Mono.awaitFirst()Mono.awaitLast() を非推奨にし、Mono.awaitSingle() に置き換えました。

kotlinx.coroutines 1.5.0 を使い始めましょう!

新しいリリースには、目覚ましい変更がたくさん含まれています。 Channel API を改善している際に作成された新しい名前付けスキームは、チームによる特筆に値する達成です。 一方、コルーチン API をできる限り単純で直感的な API にするという大きな狙いもあります。

Kotlin コルーチンの新しいバージョンを使い始めるには、build.gradle.kts ファイルのコンテンツを更新してください。 最初に、最新バージョンの Kotlin Gradle プラグインがインストールされていることを確認してください。

次に、Reactive Streams との特定の統合機能を含むライブラリなど、依存関係のバージョンを更新してください。

動画とその他の情報

問題に直面した場合

image description

Discover more