[Part 2] Testing Coroutines and Kotlin Flows
![[Part 2] Testing Coroutines and Kotlin Flows](/_next/image?url=https%3A%2F%2Fcdn.hashnode.com%2Fres%2Fhashnode%2Fimage%2Fupload%2Fv1637587041933%2FlfO62ZVz2.png&w=3840&q=75)
I'm a software developer learning and exploring various platforms, sharing my learning along the way!
In Part 1 we looked at how to create a testing setup for Kotlin Coroutines. In this part, we will build on the base from Part 1 and look at how can we test Kotlin Flows.
Here's the link to Part 1 if you missed it: https://blog.shounakmulay.dev/part-1-testing-coroutines-and-kotlin-flows
At its base, flow is a type that can return multiple values sequentially. If you are not familiar with flows, you can read more about them here.
Starter project
In Part 1 we build on top of the starter project and created a coroutines testing setup. If you want to code along with the article, you can clone the repo from https://github.com/shounakmulay/KotlinFlowTest and check out the part2-start branch.
Open the project in Android Studio. The branch part2-start has all the code implemented in Part 1 as well.
Even though we are looking at an Android project, these concepts will apply to any Kotlin based project.
Testing a simple Flow
A flow that only emits a few values and does not have any more operations applied to it, is the easiest to test.
In the MainViewModel we have a countFlow that points to a flow returned by the repository. This is a simple flow that counts from 1 to 3.
val countFlow = mainRepository.count3Flow()
In MainViewModelTest we have mocked the response of getCount3Flow. Since countFlow is a val on the view model, we need to mock the response before we initialize the MainViewModel. Otherwise the value is read before the mock is registered and it lead to a NullPointerException.
The setup function now looks like this
@Before
fun setUp() {
mainRepository = mock()
whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
mainViewModel = MainViewModel(mainRepository, coroutineScope.dispatcherProvider)
}
And the getCount3Flow function just emits 3 numbers from the flow builder.
private fun getCount3Flow() = flow {
(1..3).forEach {
emit(it)
}
}
The easiest way to test a flow is to convert it to a list. This internally collects the flow and return all the collected values as a list. Let's run the test named Given no error occurs, When count3Flow is called, Then it should emit all values correctly.
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() = runBlocking {
val countFlow = mainViewModel.countFlow
assertEquals(listOf(1, 2, 3), countFlow.toList())
}
It passes!

Testing Complex Flows
Many times we need to move the heavy computation away from a particular thread / main thread.
To do that with flows, we use flowOn.
- Let's modify the flow in the
MainViewModelto use theDefaultdispatcher. Notice that we did not use the dispatcher provider that we created in Part 1 here.
val countFlow = mainRepository
.count3Flow()
.flowOn(Dispatchers.Default)
- In the
MainViewModeladd one more flow that maps over thecountFlowand doubles the value of each number. Let's use theIOdispatcher for this. Let's also use adelayto simulate some long running operation, or to represent a flow that emits values at a certain frequency.
val doubleCountFlow = countFlow.map {
delay(2000)
it * 2
}.flowOn(Dispatchers.IO)
- Now let's zip these 2 flow into a flow that returns count to double pairs. The
zipoperator only emits a value when each flow emits one value.
Let's look at this with an example: If countFlow emits two values: 1 & 2 and doubleCountFlow has only emitted one value as of yet, that is 2.
In this case our new countWithDoubleFlow will only emit 1 time with the value (1, 2). As soon as doubleCountFlow emits the second value, that is 4, countWithDoubleFlow will now have the 2nd values from both the flow and will emit its own second value: (2, 4).
Let use the Default dispatcher for this zipped flow as well.
val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
count to double
}.flowOn(Dispatchers.Default)
- Also update the test to use the new combined flow.
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
runBlocking {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Now if we run the test, it will pass but the time it takes to run is almost 6 seconds.

That is a very long time for such a simple test.
As we saw in part 1, we can use
runBlockingTestto skip past delays and make the test run as fast as possible. Let's use that
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
runBlockingTest {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Running the test now gives an error saying
This job has not completed yetThe reason of this is,runBlockingTesttries to skip past delays by advancing time on the dispatcher. Since we have not injected the dispatchers, we are not using the test dispatcher in our test. Advancing time is only possible on a test dispatcher.

- Replace the dispatchers with the dispatchers from the
CoroutineDispatcherProviderfor all of the flows
val countFlow = mainRepository
.count3Flow()
.flowOn(dispatcherProvider.default)
val doubleCountFlow = countFlow.map {
delay(2000)
it * 2
}.flowOn(dispatcherProvider.io)
val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
count to double
}.flowOn(dispatcherProvider.default)
- We also need to tell the
runBlockingTestto use the test dispatcher that we are injecting in tests. To do that first make the test dispatcher fromCoroutineScopeRulepublic.
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
- Then replace the
runBlockingTestwithcoroutineScope.dispatcher.runBlockingTestso that we are using the correct dispatcher withrunBlockingTest
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
coroutineScope.dispatcher.runBlockingTest {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Run the test and you will see that it passes and also takes much less time to execute. From approximately 6.7 seconds before to only around 600-700 milliseconds.

Cold vs Hot Flows
Cold flows are flows that do not produce values until they have at least 1 collector registered. As soon as the last collector leaves, cold flows stop producing.
Hot flows on the other hand do not wait for a collector to be registered and start producing right away. Also, hot flows do not stop producing once all the collectors leave.
Another major difference is that hot flows never complete. So calls to collect on a hot flow are never complete and are always suspended. Also, functions like toList will never complete when called on a hot flow.
Since hot flows never complete, we cannot test them by converting the values to a list. For testing hot flow and make testing any flow in general easier we can use a library called Turbine.
You can read more about turbine here.
Testing State Flow
State flow is a type of hot flow. It emits updates to a value, i.e. duplicate consecutive values are not emitted by a state flow.
For example, if we try to emit [10, 20, 20, 30] from a state flow, the collector of the flow would only receive [10, 20, 30].
If you are familiar with Android's LiveData, state flow is very similar to it. You can read more about state flow here.
- Add a
getStateFlowfunction to theMainViewModel
fun getStateFlow(): MutableStateFlow<Int> {
return MutableStateFlow(10)
}
Here we are creating a normal flow and converting it to a state flow using the stateIn function.
- Add a test for
getStateFlowinMainViewModelTest
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
val stateFlow = mainViewModel.getStateFlow()
assertEquals(listOf(10), stateFlow.toList())
}
- If you run this test, you will notice that it keeps running forever. This is because the state flow is a hot flow, and hot flows never complete.

To overcome this, we can use a library called Turbine. It is a small testing library for kotlin Flows. Let's add the turbine dependency in the app's build.gradle file.
dependencies {
testImplementation 'app.cash.turbine:turbine:0.6.1'
}
The turbine library comes with a handy test extension function on flow that lets us test all types of flow with ease. It internally uses a Channel to collect the values form the flow and gives us useful functions to receive items one by one from the flow.
Channels are out of scope of this article but you can read more about those here.
- Let's update our test case to use this
testfunction.
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
val stateFlow = mainViewModel.getStateFlow()
stateFlow.test {
val firstItem = awaitItem()
assertEquals(10, firstItem)
stateFlow.emit(20)
val secondItem = awaitItem()
assertEquals(20, secondItem)
stateFlow.emit(20)
expectNoEvents()
}
}
The test function lets us await the next item in the flow and also expect various events. We can run assertions on each emit individually. This also makes the test code very clean and precise.
Also notice that the last call to emit is with the same value, thus the state flow does not actually emit again. Therefore we can call
expectNoEventsafter that and the test passes.As of the time of writing this tutorial the turbine library internally uses kotlin time library which is still experimental. So we need to add the
@ExperimentalTimeannotation to the test class in order for it to compile.
@ExperimentalTime
class MainViewModelTest : BaseTest()
- Now if we run the test, it passes!

Testing Shared Flow
Shared flow is another type of a hot flow. Unlike a state flow, it will emit all the values, even if the values are the same.
For example, if we emit [10, 20, 20, 30] from a shared flow, the collector would receive all the values [10, 20, 20, 30].
- Let's create a shared flow in the
MainViewModelform thedoubleCountFlow. We can use theshareInextension function to create a shared flow from an existing flow.
val doubleCountSharedFlow= doubleCountFlow
.shareIn(viewModelScope, SharingStarted.Lazily)
We pass the
viewModelScopeas the scope in which the flow will be shared. The second argument is theSharingStartedclass. It indicates the strategy by which the sharing should be started.For eg:
Lazilymeans sharing is started when the first subscriber appears and never stops. WhileEagerlymeans that sharing is started immediately.Similar to the state flow, calls to terminal functions like
toListdo not complete on shared flow and keep running forever.Add a test for this shared flow in
MainViewModelTest. We will use turbine for this test as well. We are also usingrunBlockingTesthere because thedoubleCountFlowuses adelayblock.
@Test
fun `When countWithDoubleSharedFlow is called, it should emit values correctly`() =
coroutineScope.dispatcher.runBlockingTest {
val sharedFlow = mainViewModel.doubleCountSharedFlow
sharedFlow.test() {
val firstItem = awaitItem()
assertEquals(2, firstItem)
val secondItem = awaitItem()
assertEquals(4, secondItem)
val thirdItem = awaitItem()
assertEquals(6, thirdItem)
}
}
- This test passes as expected. Testing s lazily started shared flow is not much different that any normal hot flow.
Always inject SharingStarted strategy
There are situations where you would want your shared flow to start eagerly. Eagerly started shared flows can pose some challenges in terms of testing.
- Let's change the
SharingStartedstrategy toEagerly
val doubleCountSharedFlow =
doubleCountFlow.shareIn(viewModelScope, SharingStarted.Eagerly)
- If we run the test now, it will fail with a error
Timed out waiting for 1000 ms. 1 sec is the default timeout period for turbine tests.

Why does the test fail? Since we are using
Eagerly, the sharing of this shared flow is started immediately. That means by the time we start collecting the flow in our test, it has already emitted all the values.To fix this we need the sharing to be started
Eagerlyin real application butLazilyin tests. We can create a class similar toCoroutineDispatcherProviderthat will provide us the sharing strategy. Then we can inject the sharing strategy wherever we need it.Create a class
SharingStrategyProvider
data class SharingStrategyProvider(
val lazily: SharingStarted = SharingStarted.Lazily,
val eagerly: SharingStarted = SharingStarted.Eagerly,
val whileSubscribed: SharingStarted = SharingStarted.WhileSubscribed()
)
- Add it as a dependency to the
MainViewModel
class MainViewModel(
private val mainRepository: MainRepository,
private val dispatcherProvider: CoroutineDispatcherProvider,
private val sharingStrategyProvider: SharingStrategyProvider
) : ViewModel()
- In
app/di/viewModelModuleupdate the view model constructor
viewModel { MainViewModel(get(), CoroutineDispatcherProvider(), SharingStrategyProvider()) }
- For tests, we can add
SharingStrategyProviderto the already createdCoroutineScopeRule
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
private val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider(),
var sharingStrategyProvider: SharingStrategyProvider = SharingStrategyProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
- Also update the
startingfunction and useLazilyas the strategy everywhere.
override fun starting(description: Description?) {
super.starting(description)
Dispatchers.setMain(dispatcher)
dispatcherProvider = CoroutineDispatcherProvider(
main = dispatcher,
default = dispatcher,
io = dispatcher
)
sharingStrategyProvider = SharingStrategyProvider(
lazily = SharingStarted.Lazily,
eagerly = SharingStarted.Lazily,
whileSubscribed = SharingStarted.Lazily
)
}
- Update the
setUpfunction of the test class
@Before
fun setUp() {
mainRepository = mock()
whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
mainViewModel = MainViewModel(
mainRepository,
coroutineScope.dispatcherProvider,
coroutineScope.sharingStrategyProvider
)
}
- Lastly use the injected sharing strategy in the view model
val doubleCountSharedFlow =
doubleCountFlow.shareIn(viewModelScope, sharingStrategyProvider.eagerly)
- That's it. If you run the test now, it will pass as expected!

Where to go from here
Between Part 1 and Part 2, we have looked at how to deal with various situations that arise while testing coroutines and flows. You can now write tests for all types of coroutines and flows!
You can look at the complete code form this article at https://github.com/shounakmulay/KotlinFlowTest/tree/part2-end




