Kotlin logo

The Kotlin Blog

Kotlin Programming Language by JetBrains

Libraries

Корутины 1.5 в Kotlin: GlobalScope отмечен как «требующий внимания», улучшенный Channels API и многое другое

Соавтор: Светлана Исакова

Представляем корутины 1.5.0! Вот что реализовано в новой версии.

  • Улучшенный Channels API. Помимо новых правил именования для функций библиотеки, были реализованы неостанавливаемые функции trySend и tryReceive, ставшие улучшенной альтернативой для offer и poll.

Кроме того, в этой статье вы найдете рекомендации по миграции на новую версию.

Начните использовать корутины 1.5.0

GlobalScope отмечен как API, «требующий внимания»

Теперь класс GlobalScope отмечен аннотацией @DelicateCoroutinesApi. Начиная с этой версии, для использования GlobalScope его необходимо явным образом включить, используя аннотацию @OptIn(DelicateCoroutinesApi::class).

В большинстве случаев использовать GlobalScope не рекомендуется, однако официальная документация предлагает ряд концептов с использованием этого API.

Глобальная область CoroutineScope не связана ни с каким заданием. Такой вариант используется для запуска корутин верхнего уровня, которые работают в течение всего срока жизни приложения. Активные корутины, запущенные в GlobalScope, не препятствуют остановке процесса. Они похожи на потоки-демоны в Java.

При использовании API GlobalScope нужно быть очень внимательным, потому что можно легко организовать утечку ресурсов или памяти. Корутины, запущенные в GlobalScope, не подчиняются принципу структурированного параллелизма. Если они зависают или тормозят (например, из-за низкой пропускной способности сети), то все равно продолжают работать, потребляя ресурсы. Возьмем, например, следующий код:

Вызов loadConfiguration создает корутину в GlobalScope, которая работает в фоновом режиме, и не заданы никакие условия ее отмены или ожидания ее завершения. Если пропускная способность сети низкая, она просто ожидает в фоновом режиме, потребляя ресурсы. При повторных вызовах loadConfiguration потребление ресурсов будет постоянно расти.

Возможная замена

Во многих случаях использования GlobalScope следует избегать, а содержащую его операцию следуют отметить как suspend, например:

Если вы используете GlobalScope.launch для запуска нескольких параллельных операций, их следует сгруппировать, используя coroutineScope:

При запуске параллельной операции из неостанавливаемого контекста в коде верхнего уровня вместо GlobalScope следует использовать правильно ограниченный экземпляр CoroutineScope.

Случаи оправданного использования

Существует ряд ситуаций, когда использование GlobalScope оправданно и безопасно, например, фоновые процессы верхнего уровня, которые должны работать в течение всего срока жизни приложения. Поэтому, начиная с этой версии, для использования GlobalScope его необходимо явным образом включить, используя аннотацию @OptIn(DelicateCoroutinesApi::class) как показано ниже:

Мы рекомендуем внимательно проанализировать все случаи использования GlobalScope и отметить указанной аннотацией только те из них, которые попадают в категорию оправданного использования. Во всех остальных случаях это может привести к ошибкам в коде, поэтому следует заменить GlobalScope, как описано выше.

Расширения для JUnit 5

В новой версии мы добавили аннотацию CoroutinesTimeout, которая позволяет запускать тесты в отдельном потоке, отключать их после истечения отведенного времени и прерывать поток. CoroutinesTimeout уже был доступен для JUnit 4. В этой версии мы добавили интеграцию с JUnit 5.

Чтобы использовать новую аннотацию, добавьте в проект новую зависимость:

Вот простой пример использования нового CoroutinesTimeout для тестирования:

В этом примере тайм-аут корутин определяется на уровне класса, конкретно для firstTest. Для аннотированного теста тайм-аут не учитывается, поскольку аннотация функции переопределяет функцию класса. secondTest использует аннотацию на уровне класса, поэтому он завершается при превышении лимита времени.

Аннотация объявляется следующим образом:

Первый параметр, testTimeoutMs, определяет продолжительность времени ожидания в миллисекундах. Второй параметр, cancelOnTimeout, определяет, нужно ли отменить все выполняемые корутины по истечении этого времени. Если для него задано значение true, все корутины будут автоматически отменены.

При использовании аннотации CoroutinesTimeout автоматически включается отладчик корутин, который делает дамп всех корутин в момент таймаута. Дамп содержит трассировки стека создания корутин. Если нужно отменить трассировку стека, чтобы ускорить выполнение тестов, можно использовать напрямую CoroutinesTimeoutExtension — это позволяет сделать соответствующие настройки.

Мы благодарим Абхиджита Саркара, который создал экспериментальный образец CoroutinesTimeout для JUnit 5. Идея была реализована в новой аннотации CoroutinesTimeout, которая и добавлена в версию 1.5.

Улучшения Channels API

Каналы — важные примитивы коммуникации, которые позволяют различным корутинам обмениваться данными. В этой версии мы немного обновили Channels API, предложив улучшенные альтернативы для функций offer и poll, которые часто создавали проблемы при использовании. Заодно мы разработали новые последовательные правила именования для останавливаемых и неостанавливаемых методов.

Новые правила именования

Мы попробовали создать последовательные правила именования, которые можно было бы затем использовать в других библиотеках или API корутин. Нам хотелось, чтобы имя функции несло в себе информацию о ее поведении. В результате получилось вот что:

  • имена обычных останавливаемых методов остаются без изменений — например, send, receive;
  • все имена соответствующих неостанавливаемых методов с инкапсуляцией ошибок получают префикс «try»: trySend и tryReceive вместо прежних offer и poll;
  • имена новых останавливаемых методов с инкапсуляцией ошибок получают суффикс «Catching».

Теперь давайте разберемся с новыми методами подробнее.

Функции Try: неостанавливаемые аналоги для send и receive

Одна корутина может отправлять информацию в канал, а другая может ее оттуда получать. Обе функции send и receive — останавливаемые. send останавливает корутину, если канал полон и не может принять новый элемент, а receive останавливает свою корутину, если в канале нет возвращаемых элементов:

У этих функций есть неостанавливаемые аналоги, используемые в синхронном коде: offer и poll. Теперь вместо них появились trySend и tryReceive, а поддержка прежних функций прекращается. Ниже мы подробнее расскажем о причинах таких изменений.

offer и poll должны делать то же, что и send и receive, но без остановки. Звучит просто и успешно работает, когда можно отправить или получить элемент. А вот когда происходит ошибка, все усложняется. send и receive останавливают соответствующие корутины до тех пор, пока не появится возможность выполнить поставленную задачу. offer и poll просто возвращали значения false и null соответственно, если элемент нельзя было добавить, когда канал полон, или получить, когда канал пуст. Обе функции выдавали исключения, пытаясь работать с закрытым каналом, и вот это вызывало затруднения при их использовании.

В приведенном примере функция poll вызывается до добавления элементов в канал и сразу же возвращает null. Обратите внимание, что она не должна использоваться таким образом. Вместо этого нужно продолжать регулярно отправлять запросы элементов, но мы для простоты примера вызываем ее напрямую. Вызвать функцию offer также не удается, поскольку это рандеву-канал с нулевым объемом буфера. В результате функция offer возвращает false, а функция poll возвращает null, просто потому что они были вызваны не в том порядке.

В этом примере можно раскомментировать строку с вызовом channel.close(), чтобы наверняка возникло исключение. Тогда poll, как и раньше, вернет false. Однако после этого offer попытается добавить элемент в уже закрытый канал и выдаст исключение. Мы получали много жалоб на то, что такое поведение чревато ошибками. Очень легко забыть обработать это исключение, а если его игнорировать или обработать иным образом, это приведет к завершению программы.

Новые функции trySend и tryReceive позволили устранить эту проблему, они выдают более подробный результат. Каждая из них возвращает экземпляр ChannelResult, который может иметь одно из трех значений: успешное выполнение, сбой или указание на то, что канал был закрыт.

Этот пример кода работает так же, как предыдущий, но функции tryReceive и trySend возвращают более подробный результат. Вместо false и null вы видите здесь Value(Failed). Можно снова убрать комментарий со строки, закрывающей канал, и убедиться, что trySend возвращает результат Closed, перехватывая исключение.

Благодаря inline-классам значений использование ChannelResult не создает ниже дополнительных оберток, и значение успешного выполнения возвращается без изменений и лишней траты ресурсов.

Перехватывающие функции: останавливаемые функции с инкапсуляцией ошибок

Начиная с этой версии, имена новых останавливаемых методов с инкапсуляцией ошибок получают суффикс «Catching». Например, новая функция receiveCatching обрабатывает исключение, если канал закрыт. Возьмем такой пример:

Канал был закрыт, прежде чем мы попытались получить значение. Однако выполнение программы успешно завершается с указанием, что канал был закрыт. Если вместо receiveCatching использовать обычную функцию receive, она выдаст исключение ClosedReceiveChannelException:

Пока что мы ввели только функции receiveCatching и onReceiveCatching (вместо использовавшейся ранее встроенной функции receiveOrClosed), но в дальнейшем планируем добавить новые.

Переход к использованию новых функций в коде

Во всех случаях, где в проекте используются функции offer и poll, их можно заменить автоматически с помощью новых вызовов. Поскольку offer возвращала Boolean, ее равноценной заменой станет channel.trySend("Элемент").isSuccess.

В свою очередь poll возвращает элемент, допускающий значение null, поэтому ее заменяет channel.tryReceive().getOrNull().

Если результат вызова не использован, его можно заменять напрямую с помощью нового вызова.

Теперь поведение функций при обработке исключений различается, требуемые изменения нужно внести вручную. Если код рассчитан на то, что методы «offer» и «poll» выдают исключения при закрытом канале, нужно будет сделать следующие замены.

Равноценная замена для channel.offer("Элемент") должна выдавать исключение, если канал закрыт, даже при нормальном закрытии:

Равноценная замена для channel.poll() выдает исключение, если канал закрыт с ошибкой, и возвращает null, если он был закрыт в нормальном режиме:

Такие изменения соответствуют прежнему поведению функций offer и poll.

Мы полагаем, что обычно код не учитывает такие тонкие различия в поведении при закрытом канале, и они в основном приводили к ошибкам. Поэтому автоматическая замена в IDE упростит семантику кода. Если в вашем случае это не так, вам нужно проанализировать, как используются эти функции, и обновить код вручную. Возможно, стоит его полностью переписать, чтобы ситуации с закрытым каналом обрабатывались иначе, без вызова исключений.

Интеграция с реактивными потоками — постепенная стабилизация

В версии 1.5 статус большинства функций, отвечающих за интеграцию с реактивными фреймворками, был повышен до стабильного API.

В экосистеме JVM есть ряд фремворков, которые выполняют обработку асинхронных потоков в соответствии со стандартом реактивных потоков. Два популярных фреймворка Java, которые это предлагают, — Project Reactor и RxJava.

Потоки Kotlin Flows работают иначе, и их типы несовместимы с описанными в этом стандарте, но по своей сути это тем не менее потоки. Поток Flow можно преобразовать в реактивный поток Publisher (совместимый со спецификацией и TCK) и наоборот. kotlinx.coroutines предлагает готовые конвертеры, которые можно найти в соответствующих реактивных модулях.

Например, если нужно обеспечить совместимость с типами Project Reactor, добавьте в проект следующие зависимости:

После этого можно будет использовать Flow<T>.asPublisher(), если вы хотите использовать типы Reactive Streams, или Flow<T>.asFlux(), если нужно напрямую использовать типы Project Reactor.

Это очень сжатое изложение вопроса. Если вам нужна подробная информация, рекомендуем статью Романа Елизарова о реактивных потоках и Kotlin Flows.

Хотя интеграция с реактивными библиотеками способствует стабилизации API, с технической точки зрения задача заключается в том, чтобы избавиться от @ExperimentalCoroutinesApi и реализовать оставшиеся функции по разным направлениям.

Улучшенная интеграция с реактивными потоками

Совместимость со спецификацией реактивных потоков важна для обеспечения интероперабельности между сторонними фреймворками и корутинами. Это позволяет использовать корутины в старых проектах, не переписывая весь код.

В этот раз мы присвоили статус «стабильный» длинному списку функций. Теперь можно конвертировать нужный тип из любого реактивного потока во Flow и обратно. Например, новый код может быть написан с использованием Coroutines и интегрирован со старой реактивной кодовой базой с помощью встречных конвертеров:

Кроме того, было внесено много улучшений в ReactorContext, который оборачивает контекст Reactor в CoroutineContext, обеспечивая полную интеграцию между Project Reactor и корутинами в Kotlin. Благодаря ей можно распространять информацию о контексте Reactor в корутинах.

Контест распространяется неявно как контекст подписчиков во всех интеграциях с реактивными потоками — Mono, Flux, Publisher.asFlow, Flow.asPublisher и Flow.asFlux. Вот простой пример распространения Context подписчика в ReactorContext:

В приведенном примере мы создаем экземпляр Flow, который затем преобразуется в экземпляр Reactor Flux без контекста. Вызов метода subscribe() без аргумента запрашивает у публикатора отправку всех данных. В результате программа выводит фразу «Reactor context in Flow: null».

Следующая цепочка вызовов также преобразует Flow во Flux, но затем добавляет пару ключ-значение, answer=42, к контексту Reactor для этой цепочки. Вызов subscribe() становится триггером для всей цепочки. В этом случае, поскольку контекст заполнен, программа выводит фразу «Reactor context in Flow: Context1{answer=42}»

Новые вспомогательные функции

При работе с реактивными типами, например, с Mono, в контексте корутин, можно использовать вспомогательные функции, которые позволяют получать данные, не блокируя поток. Начиная с этой версии, мы прекратили поддержку функций awaitSingleOr* для произвольных Publisher и специализировали некоторые функции await* для Mono и Maybe.

Mono генерирует не больше одного значения, так что последний элемент всегда аналогичен первому. Соответственно, семантика удаления оставшихся элементов тоже бесполезна. Поэтому поддержка Mono.awaitFirst() и Mono.awaitLast() прекращена, вместо них вводится Mono.awaitSingle().

Начинаем работу с kotlinx.coroutines 1.5.0!

В новой версии появилось множество изменений. Новые правила именования, разработанные при улучшении Channels API, стали для команды существенным достижением. Кроме того, мы стараемся сделать API корутин как можно более простым и понятным.

Чтобы начать работу с новой версией, просто обновите содержимое файла build.gradle.kts. Сначала проверьте, что у вас установлена самая свежая версия Kotlin плагина в Gradle:

Затем обновите версии зависимостей, включая библиотеки, содержащие конкретные интеграции с реактивными потоками.

Дополнительные материалы

Если вы столкнулись с проблемами

Ваша команда Kotlin

The Drive to Develop

Discover more