aboutsummaryrefslogtreecommitdiffstats
path: root/ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
blob: ebeaa3b84b9cea6447c6bd704f5a30a25d3bd0bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.javafx

import javafx.beans.value.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

/**
 * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
 * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
 * succession, only the last one will be emitted.
 * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
 * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
 * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
 * This flow emits at least the initial value.
 *
 * ### Operator fusion
 *
 * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
 * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
 */
@ExperimentalCoroutinesApi // Since 1.3.x
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
    val listener = ChangeListener<T> { _, _, newValue ->
        /*
         * Do not propagate the exception to the ObservableValue, it
         * already should've been handled by the downstream
         */
        trySend(newValue)
    }
    addListener(listener)
    send(value)
    awaitClose {
        removeListener(listener)
    }
}.flowOn(Dispatchers.JavaFx).conflate()