Skip to content

Fix: Kotlin Flow Not Working — Not Collecting, StateFlow Not Updating, or Flow Cancelled Unexpectedly

FixDevs ·

Quick Answer

How to fix Kotlin Flow issues — cold vs hot flows, collectLatest vs collect, StateFlow and SharedFlow setup, lifecycle-aware collection in Android, and common Flow cancellation problems.

The Problem

A Flow emits values but collect never receives them:

val flow = flow {
    emit(1)
    emit(2)
    emit(3)
}

// Never prints anything
CoroutineScope(Dispatchers.Main).launch {
    flow.collect { value ->
        println(value)
    }
}

Or StateFlow updates are missed in the UI:

class MyViewModel : ViewModel() {
    private val _state = MutableStateFlow(0)
    val state: StateFlow<Int> = _state

    fun increment() {
        _state.value++  // Updates — but UI never re-renders
    }
}

Or a Flow that should emit continuously gets cancelled after the first value:

viewModel.dataFlow
    .collect { data ->
        updateUI(data)
        // After calling another suspending function here — Flow stops
    }

Why This Happens

Kotlin Flow has several behaviors that differ from RxJava or LiveData:

  • Cold flows don’t execute until collected — creating a flow { } builder doesn’t start the producer. Each collect call starts a new execution. If no one collects, nothing happens.
  • Flow is tied to the collecting coroutine’s lifecycle — if the coroutine is cancelled (e.g., because the screen rotates, the ViewModel is cleared, or the scope is closed), the Flow collection stops. This is intentional, but can cause values to be missed if the scope is too narrow.
  • StateFlow never replays when the value hasn’t changedStateFlow is a conflated hot flow. If you emit the same value twice (_state.value = 5; _state.value = 5), collectors only see one emission. It also starts with an initial value, so late collectors receive the current value immediately.
  • collect is a blocking suspending function — code after collect in the same coroutine never runs while the flow is active (unless the flow completes). Use launch to collect in a separate coroutine.
  • collectLatest cancels the previous block — if the flow emits faster than you process, collectLatest cancels the previous processing block and starts a new one. Long processing inside collectLatest may never complete.

Fix 1: Understand Cold vs Hot Flows

Cold flows start fresh for each collector. Hot flows (like StateFlow and SharedFlow) run independently:

// COLD flow — runs once per collector, no sharing
val coldFlow = flow {
    println("Producer started")  // Prints for EACH collector
    emit(1)
    emit(2)
    emit(3)
}

// Two collectors start two separate producers:
launch { coldFlow.collect { println("A: $it") } }  // A: 1, A: 2, A: 3
launch { coldFlow.collect { println("B: $it") } }  // B: 1, B: 2, B: 3

// HOT StateFlow — one shared state, late collectors get current value
val stateFlow = MutableStateFlow(0)
stateFlow.value = 5

launch { stateFlow.collect { println("Collector: $it") } }
// Immediately prints: "Collector: 5" (current value)

// HOT SharedFlow — configurable replay, no initial value
val sharedFlow = MutableSharedFlow<Int>(replay = 3)  // Replays last 3 values
launch { sharedFlow.emit(1) }
launch { sharedFlow.emit(2) }
// Late collector gets: 1, 2 (if within replay buffer)

Convert cold to hot with shareIn or stateIn:

// Share an expensive cold flow between multiple collectors
class Repository(private val db: Database) {
    // WITHOUT sharing: each collector starts a new database query
    val users: Flow<List<User>> = db.getUsersFlow()

    // WITH sharing: one database subscription, shared among all collectors
    val usersShared: Flow<List<User>> = db.getUsersFlow()
        .shareIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),  // Keep alive 5s after last subscriber
            replay = 1
        )

    // Convert to StateFlow: always has a current value
    val usersState: StateFlow<List<User>> = db.getUsersFlow()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

Fix 2: Collect Flows in the Right Lifecycle Scope

Always collect Android Flows in a lifecycle-aware scope to prevent crashes and leaks:

// WRONG — collects in a scope that survives config changes
// May update destroyed views; also leaks if not cancelled manually
class MyFragment : Fragment() {
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        CoroutineScope(Dispatchers.Main).launch {
            viewModel.state.collect { state ->
                updateUI(state)  // May run after view is destroyed
            }
        }
    }
}

// CORRECT — use repeatOnLifecycle (recommended for Android Fragments/Activities)
class MyFragment : Fragment() {
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        viewLifecycleOwner.lifecycleScope.launch {
            // Starts collection when STARTED, cancels when STOPPED
            // Re-collects when STARTED again (e.g., after returning from background)
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.state.collect { state ->
                    updateUI(state)
                }
            }
        }
    }
}

// Multiple flows at the same time
viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        launch {
            viewModel.users.collect { updateUserList(it) }
        }
        launch {
            viewModel.errors.collect { showError(it) }
        }
    }
}

In Compose — use collectAsStateWithLifecycle:

// Add dependency: lifecycle-runtime-compose
@Composable
fun UserScreen(viewModel: UserViewModel = viewModel()) {
    // Lifecycle-aware collection — stops when the screen is not visible
    val state by viewModel.state.collectAsStateWithLifecycle()

    // Or for non-lifecycle-aware (persists in background — avoid for production)
    val stateAlways by viewModel.state.collectAsState()

    UserContent(state = state)
}

Fix 3: Choose the Right Collection Terminal

Different collection operators have different behavior on backpressure:

// collect — processes every emission, queues if slow
flow.collect { value ->
    delay(1000)  // Slow processing — all values queue up
    processValue(value)
}

// collectLatest — cancels previous block when new value arrives
// Best for: UI updates, search queries (only care about latest)
flow.collectLatest { value ->
    delay(1000)  // If new value arrives before 1s, this is cancelled
    updateUI(value)  // Only runs if no new value arrived within 1s
}

// With buffer — decouple producer and consumer speeds
flow
    .buffer(capacity = 64)  // Buffer up to 64 items
    .collect { value ->
        delay(100)  // Consumer is slower, but producer isn't blocked
        processValue(value)
    }

// conflate — like collectLatest but at the Flow level (not operator)
flow
    .conflate()  // Drop intermediate values when consumer is slow
    .collect { value ->
        processValue(value)
    }

Choosing the right operator:

// Search box — only care about the latest query
searchQuery
    .debounce(300)          // Wait 300ms after typing stops
    .distinctUntilChanged() // Skip if same query
    .flatMapLatest { query ->
        repository.search(query)  // Cancel previous search on new query
    }
    .collectLatest { results ->
        updateResults(results)
    }

// Progress events — need all values
uploadProgress
    .collect { percent ->
        progressBar.progress = percent
    }

// Sensor data — only need latest, not historical
accelerometer
    .conflate()
    .collect { reading ->
        updateVisualization(reading)
    }

Fix 4: Fix StateFlow and SharedFlow Issues

// StateFlow: initial value required, equality-based, always has a value
class CounterViewModel : ViewModel() {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.value++
        // OR: _count.update { it + 1 }  — thread-safe atomic update
    }

    // update { } for read-modify-write (thread-safe)
    fun addToCart(item: Item) {
        _cart.update { currentCart ->
            currentCart + item
        }
    }
}

// StateFlow won't emit if the value is equal to the current value:
_count.value = 5
_count.value = 5  // No emission — same value

// For data classes, ensure equals() is implemented correctly:
data class UserState(val name: String, val count: Int)
_state.value = UserState("Alice", 1)
_state.value = UserState("Alice", 1)  // No emission — same data class value
_state.value = UserState("Alice", 2)  // Emits — different value
// SharedFlow: no initial value, configurable replay and buffer
class EventViewModel : ViewModel() {
    // One-time events (no replay — new collectors don't get past events)
    private val _events = MutableSharedFlow<UiEvent>()
    val events: SharedFlow<UiEvent> = _events.asSharedFlow()

    fun showError(message: String) {
        viewModelScope.launch {
            _events.emit(UiEvent.Error(message))
        }
    }

    // With replay: new collectors see last N events
    private val _notifications = MutableSharedFlow<String>(
        replay = 3,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
}

// Collecting SharedFlow in Fragment
viewLifecycleOwner.lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.events.collect { event ->
            when (event) {
                is UiEvent.Error -> showSnackbar(event.message)
                is UiEvent.Navigate -> findNavController().navigate(event.destination)
            }
        }
    }
}

Fix 5: Combine and Transform Flows

Build complex flows by combining operators:

// Combine multiple flows
val userId: StateFlow<Int> = userViewModel.userId
val preferences: Flow<Preferences> = preferencesRepository.flow

combine(userId, preferences) { id, prefs ->
    Pair(id, prefs)
}.collect { (id, prefs) ->
    loadUserWithPrefs(id, prefs)
}

// zip — pairs emissions one-to-one
val numbers = flowOf(1, 2, 3)
val letters = flowOf("a", "b", "c")

numbers.zip(letters) { num, letter ->
    "$num$letter"
}.collect { println(it) }
// 1a, 2b, 3c

// flatMapLatest — cancel previous inner flow when outer emits
viewModel.selectedUserId
    .flatMapLatest { userId ->
        repository.getUser(userId)  // New flow for each userId
    }
    .collect { user ->
        updateUI(user)
    }

// flatMapMerge — run inner flows concurrently
urls.flatMapMerge(concurrency = 4) { url ->
    flow { emit(httpClient.get(url)) }
}.collect { response ->
    process(response)
}

// mapNotNull — filter and transform in one step
dataFlow
    .mapNotNull { it.data }  // Skips nulls
    .collect { data ->
        processData(data)
    }

Fix 6: Handle Errors in Flows

Unhandled exceptions in a flow cancel it. Use catch to handle errors without terminating the flow:

// WRONG — exception cancels the entire flow
repository.getDataFlow()
    .collect { data ->
        riskyOperation(data)  // Throws — collection stops completely
    }

// CORRECT — catch exceptions and continue or emit error state
repository.getDataFlow()
    .catch { e ->
        emit(emptyList())  // Emit fallback value
        // Or: emit(Resource.Error(e.message))
    }
    .collect { data ->
        updateUI(data)
    }

// Retry on error with backoff
repository.getDataFlow()
    .retry(3) { e ->
        e is IOException  // Only retry IOExceptions
    }
    .catch { e ->
        // Only reached if all retries fail
        showPersistentError(e)
    }
    .collect { updateUI(it) }

// onEach for side effects (like logging) without consuming the flow
repository.getDataFlow()
    .onEach { data -> logAnalytics(data) }
    .onStart { showLoading() }
    .onCompletion { hideLoading() }
    .catch { e -> showError(e) }
    .collect { updateUI(it) }

Still Not Working?

Flow in ViewModel not updating after config changeviewModelScope survives configuration changes (screen rotation), but the Fragment/Activity is recreated. If you’re collecting in the old Fragment/Activity, it’s collecting in a cancelled scope. Always use viewLifecycleOwner.lifecycleScope in Fragments (not lifecycleScope), and use repeatOnLifecycle to restart collection.

StateFlow initial value causes unintended UI updateStateFlow always emits its current value to new collectors immediately. If you initialize with null or an empty state, your UI receives that before real data arrives. Use a sealed class for loading states:

sealed class UiState<out T> {
    object Loading : UiState<Nothing>()
    data class Success<T>(val data: T) : UiState<T>()
    data class Error(val message: String) : UiState<Nothing>()
}

private val _state = MutableStateFlow<UiState<List<User>>>(UiState.Loading)

callbackFlow or channelFlow emitting after close — if you use callbackFlow to wrap a callback-based API, always call awaitClose and clean up listeners:

fun listenToSensor(sensor: Sensor): Flow<SensorEvent> = callbackFlow {
    val listener = SensorEventListener { event ->
        trySend(event)  // Use trySend instead of send (non-suspending)
    }
    sensorManager.registerListener(listener, sensor)

    awaitClose {
        // Called when the Flow is cancelled — clean up here
        sensorManager.unregisterListener(listener)
    }
}

For related Kotlin issues, see Fix: Kotlin Coroutine Not Executing and Fix: Kotlin Coroutine Scope Cancelled.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles