In Part 1 we looked at how to create a testing setup for Kotlin Coroutines. In this part, we will build on the base from Part 1 and look at how can we test Kotlin Flows.
Here's the link to Part 1 if you missed it: blog.shounakmulay.dev/part-1-testing-corout..
At its base, flow is a type that can return multiple values sequentially. If you are not familiar with flows, you can read more about them here.
Starter project
In Part 1 we build on top of the starter project and created a coroutines testing setup. If you want to code along with the article, you can clone the repo from https://github.com/shounakmulay/KotlinFlowTest and check out the part2-start
branch.
Open the project in Android Studio. The branch part2-start
has all the code implemented in Part 1 as well.
Even though we are looking at an Android project, these concepts will apply to any Kotlin based project.
Testing a simple Flow
A flow that only emits a few values and does not have any more operations applied to it, is the easiest to test.
In the MainViewModel
we have a countFlow
that points to a flow returned by the repository. This is a simple flow that counts from 1 to 3.
val countFlow = mainRepository.count3Flow()
In MainViewModelTest
we have mocked the response of getCount3Flow
. Since countFlow
is a val
on the view model, we need to mock the response before we initialize the MainViewModel
. Otherwise the value is read before the mock is registered and it lead to a NullPointerException
.
The setup function now looks like this
@Before
fun setUp() {
mainRepository = mock()
whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
mainViewModel = MainViewModel(mainRepository, coroutineScope.dispatcherProvider)
}
And the getCount3Flow
function just emits 3 numbers from the flow builder.
private fun getCount3Flow() = flow {
(1..3).forEach {
emit(it)
}
}
The easiest way to test a flow is to convert it to a list. This internally collects the flow and return all the collected values as a list. Let's run the test named Given no error occurs, When count3Flow is called, Then it should emit all values correctly
.
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() = runBlocking {
val countFlow = mainViewModel.countFlow
assertEquals(listOf(1, 2, 3), countFlow.toList())
}
It passes!
Testing Complex Flows
Many times we need to move the heavy computation away from a particular thread / main thread.
To do that with flows, we use flowOn
.
- Let's modify the flow in the
MainViewModel
to use theDefault
dispatcher. Notice that we did not use the dispatcher provider that we created in Part 1 here.
val countFlow = mainRepository
.count3Flow()
.flowOn(Dispatchers.Default)
- In the
MainViewModel
add one more flow that maps over thecountFlow
and doubles the value of each number. Let's use theIO
dispatcher for this. Let's also use adelay
to simulate some long running operation, or to represent a flow that emits values at a certain frequency.
val doubleCountFlow = countFlow.map {
delay(2000)
it * 2
}.flowOn(Dispatchers.IO)
- Now let's zip these 2 flow into a flow that returns count to double pairs. The
zip
operator only emits a value when each flow emits one value.
Let's look at this with an example: If countFlow
emits two values: 1 & 2 and doubleCountFlow
has only emitted one value as of yet, that is 2.
In this case our new countWithDoubleFlow
will only emit 1 time with the value (1, 2). As soon as doubleCountFlow
emits the second value, that is 4, countWithDoubleFlow
will now have the 2nd values from both the flow and will emit its own second value: (2, 4).
Let use the Default
dispatcher for this zipped flow as well.
val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
count to double
}.flowOn(Dispatchers.Default)
- Also update the test to use the new combined flow.
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
runBlocking {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Now if we run the test, it will pass but the time it takes to run is almost 6 seconds.
That is a very long time for such a simple test.
As we saw in part 1, we can use
runBlockingTest
to skip past delays and make the test run as fast as possible. Let's use that
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
runBlockingTest {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Running the test now gives an error saying
This job has not completed yet
The reason of this is,runBlockingTest
tries to skip past delays by advancing time on the dispatcher. Since we have not injected the dispatchers, we are not using the test dispatcher in our test. Advancing time is only possible on a test dispatcher.
- Replace the dispatchers with the dispatchers from the
CoroutineDispatcherProvider
for all of the flows
val countFlow = mainRepository
.count3Flow()
.flowOn(dispatcherProvider.default)
val doubleCountFlow = countFlow.map {
delay(2000)
it * 2
}.flowOn(dispatcherProvider.io)
val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
count to double
}.flowOn(dispatcherProvider.default)
- We also need to tell the
runBlockingTest
to use the test dispatcher that we are injecting in tests. To do that first make the test dispatcher fromCoroutineScopeRule
public.
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
- Then replace the
runBlockingTest
withcoroutineScope.dispatcher.runBlockingTest
so that we are using the correct dispatcher withrunBlockingTest
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
coroutineScope.dispatcher.runBlockingTest {
val countFlow = mainViewModel.countWithDoubleFlow
assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
}
- Run the test and you will see that it passes and also takes much less time to execute. From approximately 6.7 seconds before to only around 600-700 milliseconds.
Cold vs Hot Flows
Cold flows are flows that do not produce values until they have at least 1 collector registered. As soon as the last collector leaves, cold flows stop producing.
Hot flows on the other hand do not wait for a collector to be registered and start producing right away. Also, hot flows do not stop producing once all the collectors leave.
Another major difference is that hot flows never complete. So calls to collect on a hot flow are never complete and are always suspended. Also, functions like toList
will never complete when called on a hot flow.
Since hot flows never complete, we cannot test them by converting the values to a list. For testing hot flow and make testing any flow in general easier we can use a library called Turbine
.
You can read more about turbine here.
Testing State Flow
State flow is a type of hot flow. It emits updates to a value, i.e. duplicate consecutive values are not emitted by a state flow.
For example, if we try to emit [10, 20, 20, 30] from a state flow, the collector of the flow would only receive [10, 20, 30].
If you are familiar with Android's LiveData, state flow is very similar to it. You can read more about state flow here.
- Add a
getStateFlow
function to theMainViewModel
fun getStateFlow(): MutableStateFlow<Int> {
return MutableStateFlow(10)
}
Here we are creating a normal flow and converting it to a state flow using the stateIn
function.
- Add a test for
getStateFlow
inMainViewModelTest
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
val stateFlow = mainViewModel.getStateFlow()
assertEquals(listOf(10), stateFlow.toList())
}
- If you run this test, you will notice that it keeps running forever. This is because the state flow is a hot flow, and hot flows never complete.
To overcome this, we can use a library called Turbine
. It is a small testing library for kotlin Flows. Let's add the turbine dependency in the app's build.gradle
file.
dependencies {
testImplementation 'app.cash.turbine:turbine:0.6.1'
}
The turbine library comes with a handy test
extension function on flow that lets us test all types of flow with ease. It internally uses a Channel
to collect the values form the flow and gives us useful functions to receive items one by one from the flow.
Channels are out of scope of this article but you can read more about those here.
- Let's update our test case to use this
test
function.
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
val stateFlow = mainViewModel.getStateFlow()
stateFlow.test {
val firstItem = awaitItem()
assertEquals(10, firstItem)
stateFlow.emit(20)
val secondItem = awaitItem()
assertEquals(20, secondItem)
stateFlow.emit(20)
expectNoEvents()
}
}
The test function lets us await the next item in the flow and also expect various events. We can run assertions on each emit individually. This also makes the test code very clean and precise.
Also notice that the last call to emit is with the same value, thus the state flow does not actually emit again. Therefore we can call
expectNoEvents
after that and the test passes.As of the time of writing this tutorial the turbine library internally uses kotlin time library which is still experimental. So we need to add the
@ExperimentalTime
annotation to the test class in order for it to compile.
@ExperimentalTime
class MainViewModelTest : BaseTest()
- Now if we run the test, it passes!
Testing Shared Flow
Shared flow is another type of a hot flow. Unlike a state flow, it will emit all the values, even if the values are the same.
For example, if we emit [10, 20, 20, 30] from a shared flow, the collector would receive all the values [10, 20, 20, 30].
- Let's create a shared flow in the
MainViewModel
form thedoubleCountFlow
. We can use theshareIn
extension function to create a shared flow from an existing flow.
val doubleCountSharedFlow= doubleCountFlow
.shareIn(viewModelScope, SharingStarted.Lazily)
We pass the
viewModelScope
as the scope in which the flow will be shared. The second argument is theSharingStarted
class. It indicates the strategy by which the sharing should be started.For eg:
Lazily
means sharing is started when the first subscriber appears and never stops. WhileEagerly
means that sharing is started immediately.Similar to the state flow, calls to terminal functions like
toList
do not complete on shared flow and keep running forever.Add a test for this shared flow in
MainViewModelTest
. We will use turbine for this test as well. We are also usingrunBlockingTest
here because thedoubleCountFlow
uses adelay
block.
@Test
fun `When countWithDoubleSharedFlow is called, it should emit values correctly`() =
coroutineScope.dispatcher.runBlockingTest {
val sharedFlow = mainViewModel.doubleCountSharedFlow
sharedFlow.test() {
val firstItem = awaitItem()
assertEquals(2, firstItem)
val secondItem = awaitItem()
assertEquals(4, secondItem)
val thirdItem = awaitItem()
assertEquals(6, thirdItem)
}
}
- This test passes as expected. Testing s lazily started shared flow is not much different that any normal hot flow.
Always inject SharingStarted
strategy
There are situations where you would want your shared flow to start eagerly. Eagerly started shared flows can pose some challenges in terms of testing.
- Let's change the
SharingStarted
strategy toEagerly
val doubleCountSharedFlow =
doubleCountFlow.shareIn(viewModelScope, SharingStarted.Eagerly)
- If we run the test now, it will fail with a error
Timed out waiting for 1000 ms
. 1 sec is the default timeout period for turbine tests.
Why does the test fail? Since we are using
Eagerly
, the sharing of this shared flow is started immediately. That means by the time we start collecting the flow in our test, it has already emitted all the values.To fix this we need the sharing to be started
Eagerly
in real application butLazily
in tests. We can create a class similar toCoroutineDispatcherProvider
that will provide us the sharing strategy. Then we can inject the sharing strategy wherever we need it.Create a class
SharingStrategyProvider
data class SharingStrategyProvider(
val lazily: SharingStarted = SharingStarted.Lazily,
val eagerly: SharingStarted = SharingStarted.Eagerly,
val whileSubscribed: SharingStarted = SharingStarted.WhileSubscribed()
)
- Add it as a dependency to the
MainViewModel
class MainViewModel(
private val mainRepository: MainRepository,
private val dispatcherProvider: CoroutineDispatcherProvider,
private val sharingStrategyProvider: SharingStrategyProvider
) : ViewModel()
- In
app/di/viewModelModule
update the view model constructor
viewModel { MainViewModel(get(), CoroutineDispatcherProvider(), SharingStrategyProvider()) }
- For tests, we can add
SharingStrategyProvider
to the already createdCoroutineScopeRule
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
private val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider(),
var sharingStrategyProvider: SharingStrategyProvider = SharingStrategyProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
- Also update the
starting
function and useLazily
as the strategy everywhere.
override fun starting(description: Description?) {
super.starting(description)
Dispatchers.setMain(dispatcher)
dispatcherProvider = CoroutineDispatcherProvider(
main = dispatcher,
default = dispatcher,
io = dispatcher
)
sharingStrategyProvider = SharingStrategyProvider(
lazily = SharingStarted.Lazily,
eagerly = SharingStarted.Lazily,
whileSubscribed = SharingStarted.Lazily
)
}
- Update the
setUp
function of the test class
@Before
fun setUp() {
mainRepository = mock()
whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
mainViewModel = MainViewModel(
mainRepository,
coroutineScope.dispatcherProvider,
coroutineScope.sharingStrategyProvider
)
}
- Lastly use the injected sharing strategy in the view model
val doubleCountSharedFlow =
doubleCountFlow.shareIn(viewModelScope, sharingStrategyProvider.eagerly)
- That's it. If you run the test now, it will pass as expected!
Where to go from here
Between Part 1 and Part 2, we have looked at how to deal with various situations that arise while testing coroutines and flows. You can now write tests for all types of coroutines and flows!
You can look at the complete code form this article at https://github.com/shounakmulay/KotlinFlowTest/tree/part2-end