Fix: Kotlin Flow Not Working — Not Collecting, StateFlow Not Updating, or Flow Cancelled Unexpectedly
Quick Answer
How to fix Kotlin Flow issues — cold vs hot flows, collectLatest vs collect, StateFlow and SharedFlow setup, lifecycle-aware collection in Android, and common Flow cancellation problems.
The Problem
A Flow emits values but collect never receives them:
val flow = flow {
emit(1)
emit(2)
emit(3)
}
// Never prints anything
CoroutineScope(Dispatchers.Main).launch {
flow.collect { value ->
println(value)
}
}Or StateFlow updates are missed in the UI:
class MyViewModel : ViewModel() {
private val _state = MutableStateFlow(0)
val state: StateFlow<Int> = _state
fun increment() {
_state.value++ // Updates — but UI never re-renders
}
}Or a Flow that should emit continuously gets cancelled after the first value:
viewModel.dataFlow
.collect { data ->
updateUI(data)
// After calling another suspending function here — Flow stops
}Why This Happens
Kotlin Flow has several behaviors that differ from RxJava or LiveData:
- Cold flows don’t execute until collected — creating a
flow { }builder doesn’t start the producer. Eachcollectcall starts a new execution. If no one collects, nothing happens. - Flow is tied to the collecting coroutine’s lifecycle — if the coroutine is cancelled (e.g., because the screen rotates, the ViewModel is cleared, or the scope is closed), the Flow collection stops. This is intentional, but can cause values to be missed if the scope is too narrow.
StateFlownever replays when the value hasn’t changed —StateFlowis a conflated hot flow. If you emit the same value twice (_state.value = 5; _state.value = 5), collectors only see one emission. It also starts with an initial value, so late collectors receive the current value immediately.collectis a blocking suspending function — code aftercollectin the same coroutine never runs while the flow is active (unless the flow completes). Uselaunchto collect in a separate coroutine.collectLatestcancels the previous block — if the flow emits faster than you process,collectLatestcancels the previous processing block and starts a new one. Long processing insidecollectLatestmay never complete.
Fix 1: Understand Cold vs Hot Flows
Cold flows start fresh for each collector. Hot flows (like StateFlow and SharedFlow) run independently:
// COLD flow — runs once per collector, no sharing
val coldFlow = flow {
println("Producer started") // Prints for EACH collector
emit(1)
emit(2)
emit(3)
}
// Two collectors start two separate producers:
launch { coldFlow.collect { println("A: $it") } } // A: 1, A: 2, A: 3
launch { coldFlow.collect { println("B: $it") } } // B: 1, B: 2, B: 3
// HOT StateFlow — one shared state, late collectors get current value
val stateFlow = MutableStateFlow(0)
stateFlow.value = 5
launch { stateFlow.collect { println("Collector: $it") } }
// Immediately prints: "Collector: 5" (current value)
// HOT SharedFlow — configurable replay, no initial value
val sharedFlow = MutableSharedFlow<Int>(replay = 3) // Replays last 3 values
launch { sharedFlow.emit(1) }
launch { sharedFlow.emit(2) }
// Late collector gets: 1, 2 (if within replay buffer)Convert cold to hot with shareIn or stateIn:
// Share an expensive cold flow between multiple collectors
class Repository(private val db: Database) {
// WITHOUT sharing: each collector starts a new database query
val users: Flow<List<User>> = db.getUsersFlow()
// WITH sharing: one database subscription, shared among all collectors
val usersShared: Flow<List<User>> = db.getUsersFlow()
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000), // Keep alive 5s after last subscriber
replay = 1
)
// Convert to StateFlow: always has a current value
val usersState: StateFlow<List<User>> = db.getUsersFlow()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}Fix 2: Collect Flows in the Right Lifecycle Scope
Always collect Android Flows in a lifecycle-aware scope to prevent crashes and leaks:
// WRONG — collects in a scope that survives config changes
// May update destroyed views; also leaks if not cancelled manually
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
CoroutineScope(Dispatchers.Main).launch {
viewModel.state.collect { state ->
updateUI(state) // May run after view is destroyed
}
}
}
}
// CORRECT — use repeatOnLifecycle (recommended for Android Fragments/Activities)
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
viewLifecycleOwner.lifecycleScope.launch {
// Starts collection when STARTED, cancels when STOPPED
// Re-collects when STARTED again (e.g., after returning from background)
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.state.collect { state ->
updateUI(state)
}
}
}
}
}
// Multiple flows at the same time
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
launch {
viewModel.users.collect { updateUserList(it) }
}
launch {
viewModel.errors.collect { showError(it) }
}
}
}In Compose — use collectAsStateWithLifecycle:
// Add dependency: lifecycle-runtime-compose
@Composable
fun UserScreen(viewModel: UserViewModel = viewModel()) {
// Lifecycle-aware collection — stops when the screen is not visible
val state by viewModel.state.collectAsStateWithLifecycle()
// Or for non-lifecycle-aware (persists in background — avoid for production)
val stateAlways by viewModel.state.collectAsState()
UserContent(state = state)
}Fix 3: Choose the Right Collection Terminal
Different collection operators have different behavior on backpressure:
// collect — processes every emission, queues if slow
flow.collect { value ->
delay(1000) // Slow processing — all values queue up
processValue(value)
}
// collectLatest — cancels previous block when new value arrives
// Best for: UI updates, search queries (only care about latest)
flow.collectLatest { value ->
delay(1000) // If new value arrives before 1s, this is cancelled
updateUI(value) // Only runs if no new value arrived within 1s
}
// With buffer — decouple producer and consumer speeds
flow
.buffer(capacity = 64) // Buffer up to 64 items
.collect { value ->
delay(100) // Consumer is slower, but producer isn't blocked
processValue(value)
}
// conflate — like collectLatest but at the Flow level (not operator)
flow
.conflate() // Drop intermediate values when consumer is slow
.collect { value ->
processValue(value)
}Choosing the right operator:
// Search box — only care about the latest query
searchQuery
.debounce(300) // Wait 300ms after typing stops
.distinctUntilChanged() // Skip if same query
.flatMapLatest { query ->
repository.search(query) // Cancel previous search on new query
}
.collectLatest { results ->
updateResults(results)
}
// Progress events — need all values
uploadProgress
.collect { percent ->
progressBar.progress = percent
}
// Sensor data — only need latest, not historical
accelerometer
.conflate()
.collect { reading ->
updateVisualization(reading)
}Fix 4: Fix StateFlow and SharedFlow Issues
// StateFlow: initial value required, equality-based, always has a value
class CounterViewModel : ViewModel() {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++
// OR: _count.update { it + 1 } — thread-safe atomic update
}
// update { } for read-modify-write (thread-safe)
fun addToCart(item: Item) {
_cart.update { currentCart ->
currentCart + item
}
}
}
// StateFlow won't emit if the value is equal to the current value:
_count.value = 5
_count.value = 5 // No emission — same value
// For data classes, ensure equals() is implemented correctly:
data class UserState(val name: String, val count: Int)
_state.value = UserState("Alice", 1)
_state.value = UserState("Alice", 1) // No emission — same data class value
_state.value = UserState("Alice", 2) // Emits — different value// SharedFlow: no initial value, configurable replay and buffer
class EventViewModel : ViewModel() {
// One-time events (no replay — new collectors don't get past events)
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun showError(message: String) {
viewModelScope.launch {
_events.emit(UiEvent.Error(message))
}
}
// With replay: new collectors see last N events
private val _notifications = MutableSharedFlow<String>(
replay = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
}
// Collecting SharedFlow in Fragment
viewLifecycleOwner.lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.events.collect { event ->
when (event) {
is UiEvent.Error -> showSnackbar(event.message)
is UiEvent.Navigate -> findNavController().navigate(event.destination)
}
}
}
}Fix 5: Combine and Transform Flows
Build complex flows by combining operators:
// Combine multiple flows
val userId: StateFlow<Int> = userViewModel.userId
val preferences: Flow<Preferences> = preferencesRepository.flow
combine(userId, preferences) { id, prefs ->
Pair(id, prefs)
}.collect { (id, prefs) ->
loadUserWithPrefs(id, prefs)
}
// zip — pairs emissions one-to-one
val numbers = flowOf(1, 2, 3)
val letters = flowOf("a", "b", "c")
numbers.zip(letters) { num, letter ->
"$num$letter"
}.collect { println(it) }
// 1a, 2b, 3c
// flatMapLatest — cancel previous inner flow when outer emits
viewModel.selectedUserId
.flatMapLatest { userId ->
repository.getUser(userId) // New flow for each userId
}
.collect { user ->
updateUI(user)
}
// flatMapMerge — run inner flows concurrently
urls.flatMapMerge(concurrency = 4) { url ->
flow { emit(httpClient.get(url)) }
}.collect { response ->
process(response)
}
// mapNotNull — filter and transform in one step
dataFlow
.mapNotNull { it.data } // Skips nulls
.collect { data ->
processData(data)
}Fix 6: Handle Errors in Flows
Unhandled exceptions in a flow cancel it. Use catch to handle errors without terminating the flow:
// WRONG — exception cancels the entire flow
repository.getDataFlow()
.collect { data ->
riskyOperation(data) // Throws — collection stops completely
}
// CORRECT — catch exceptions and continue or emit error state
repository.getDataFlow()
.catch { e ->
emit(emptyList()) // Emit fallback value
// Or: emit(Resource.Error(e.message))
}
.collect { data ->
updateUI(data)
}
// Retry on error with backoff
repository.getDataFlow()
.retry(3) { e ->
e is IOException // Only retry IOExceptions
}
.catch { e ->
// Only reached if all retries fail
showPersistentError(e)
}
.collect { updateUI(it) }
// onEach for side effects (like logging) without consuming the flow
repository.getDataFlow()
.onEach { data -> logAnalytics(data) }
.onStart { showLoading() }
.onCompletion { hideLoading() }
.catch { e -> showError(e) }
.collect { updateUI(it) }Still Not Working?
Flow in ViewModel not updating after config change — viewModelScope survives configuration changes (screen rotation), but the Fragment/Activity is recreated. If you’re collecting in the old Fragment/Activity, it’s collecting in a cancelled scope. Always use viewLifecycleOwner.lifecycleScope in Fragments (not lifecycleScope), and use repeatOnLifecycle to restart collection.
StateFlow initial value causes unintended UI update — StateFlow always emits its current value to new collectors immediately. If you initialize with null or an empty state, your UI receives that before real data arrives. Use a sealed class for loading states:
sealed class UiState<out T> {
object Loading : UiState<Nothing>()
data class Success<T>(val data: T) : UiState<T>()
data class Error(val message: String) : UiState<Nothing>()
}
private val _state = MutableStateFlow<UiState<List<User>>>(UiState.Loading)callbackFlow or channelFlow emitting after close — if you use callbackFlow to wrap a callback-based API, always call awaitClose and clean up listeners:
fun listenToSensor(sensor: Sensor): Flow<SensorEvent> = callbackFlow {
val listener = SensorEventListener { event ->
trySend(event) // Use trySend instead of send (non-suspending)
}
sensorManager.registerListener(listener, sensor)
awaitClose {
// Called when the Flow is cancelled — clean up here
sensorManager.unregisterListener(listener)
}
}For related Kotlin issues, see Fix: Kotlin Coroutine Not Executing and Fix: Kotlin Coroutine Scope Cancelled.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Kotlin Coroutine Not Executing — launch{} or async{} Blocks Not Running
How to fix Kotlin coroutines not executing — CoroutineScope setup, dispatcher selection, structured concurrency, cancellation handling, blocking vs suspending calls, and exception propagation.
Fix: Kotlin Coroutine Scope Cancelled — JobCancellationException or Coroutine Not Running
How to fix Kotlin coroutine cancellation issues — scope lifecycle, SupervisorJob, CancellationException handling, structured concurrency, viewModelScope, and cooperative cancellation.
Fix: Kotlin Sealed Class Not Working — when Expression Not Exhaustive or Subclass Not Found
How to fix Kotlin sealed class issues — when exhaustiveness, sealed interface vs class, subclass visibility, Result pattern, and sealed classes across modules.
Fix: Fastify Not Working — 404, Plugin Encapsulation, and Schema Validation Errors
How to fix Fastify issues — route 404 from plugin encapsulation, reply already sent, FST_ERR_VALIDATION, request.body undefined, @fastify/cors, hooks not running, and TypeScript type inference.