kotlinx.coroutines 1.4.0: Introducing StateFlow and SharedFlow

Anton Arhipov

Today we’re pleased to announce the release of version 1.4.0 of the Kotlin Coroutines library. The highlights of the release are StateFlow and SharedFlow, which are being promoted to stable API. StateFlow and SharedFlow are designed to be used in cases where state management is required in an asynchronous execution context with Kotlin Coroutines.

The Flow API in Kotlin is designed to asynchronously handle a stream of data that executes sequentially. In essence, Flow is a sequence. We can do the same operations with Flow that we can do with Sequences in Kotlin: transform, filter, map, etc. The main difference between Kotlin Sequences and Flow is that Flow allows the execution to be suspended.

In Flow, you can suspend anywhere: in the builder function or in any of the operators provided by the Flow API. Suspension in Flow behaves like backpressure control, but you don’t have to do anything – the compiler will do all the work.

Flow is as simple as Kotlin Sequences. But it provides all the benefits of reactive programming where you don’t have to manage backpressure.

Flow is a convenient API, but it doesn’t provide state management, which is required in some cases. For instance, a process may have multiple intermediate states and a final state. Downloading a file is an example of such a process: the download process lasts for some time, and we may identify the intermediate states as “Started” then “In progress”, and the final state would be the “Success” or “Failure”. In this case, we are interested only in the results, whether the download was successful or not.

When implementing the scenario above using Flow API, we want to publish state changes to the observers who can act on the changes. Historically, we’ve recommended using ConflatedBroadcastChannel for this. However, ConflatedBroadcastChannel is a little too complex for the task at hand. Plus, there are some logical inconsistencies that arise when using channels for state management. For instance, a channel can be closed or canceled. This doesn’t play well with state management, since a state cannot be canceled!

We’ve decided to deprecate ConflatedBroadcastChannel and introduce a pair of new APIs instead – StateFlow and SharedFlow!

StateFlow

StateFlow comes in two varieties: StateFlow and MutableStateFlow:

The state is represented by the value. Any update to the value will be reflected in all flow collectors by emitting a value with state updates.

Let’s take a look at how the file download example we described earlier could be implemented using the new API.

This example exposes an immutable version of state to the clients and manages the mutable state (_state) internally. In the download function, we first update the internal state value: _state.value = DownloadStatus.INITIALIZED. Next, we can update the internal state with intermediate numbers indicating the progress. Eventually, we update the state with the final value indicating the download status.

As you can see, there is no channel API involved. We are not launching any additional coroutines, and there are no new concepts to learn. Just simple imperative code that uses a variable as an implementation detail, and for the clients we expose state as Flow.

SharedFlow

What if instead of managing a state we need to manage a series of state updates, i.e. an event stream. For this use case, we have a new API called SharedFlow. This API is handy if you are interested in the series of values emitted. For instance, calculating the moving average from a data stream.

The shared flow is just a flow that holds a replay cache that can be used as an atomic snapshot. Every new subscriber first gets the values from the replay cache and then gets the new emitted values.

Along with SharedFlow, we also provide MutableSharedFlow

MutableSharedFlow can be used to emit values from both suspending and non-suspending contexts. As the name implies, the replay cache of MutableSharedFlow can be reset. And it also exposes its collectors count as flow.

Implementing your own MutableSharedFlow can be tricky. So we are providing a few convenient methods for working with SharedFlow.

To initialize the MutableSharedFlow instance using the function above, we can specify the number of values replayed to new subscribers, the buffer capacity, and what to do if the buffer is full. For instance, we may choose to suspend the flow if the buffer is full.

If you already have a flow instance and you’d like to make it shareable, you can use the new operator Flow.shareIn

Summary

The new StateFlow and SharedFlow APIs provide a more elegant way to work with the state in Kotlin programs with coroutines. They are much simpler and more intuitive than using broadcast channels to publish the state changes from within the flow context.

Please give the new API a try, test it, break it, and share your feedback with us!

For more details and to learn about what’s new in Kotlin Coroutines, please watch the talk by Vsevolod Tolstopyatov from the Kotlin 1.4 Online Event.