aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/operators/Share.kt
blob: fe1d7216b0f2512afa09ae6dcce2b72d5c93eaef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*

// -------------------------------- shareIn --------------------------------

/**
 * Converts a _cold_ [Flow] into a _hot_ [SharedFlow] that is started in the given coroutine [scope],
 * sharing emissions from a single running instance of the upstream flow with multiple downstream subscribers,
 * and replaying a specified number of [replay] values to new subscribers. See the [SharedFlow] documentation
 * for the general concepts of shared flows.
 *
 * The starting of the sharing coroutine is controlled by the [started] parameter. The following options
 * are supported.
 *
 * * [Eagerly][SharingStarted.Eagerly] — the upstream flow is started even before the first subscriber appears. Note
 *   that in this case all values emitted by the upstream beyond the most recent values as specified by
 *   [replay] parameter **will be immediately discarded**.
 * * [Lazily][SharingStarted.Lazily] — starts the upstream flow after the first subscriber appears, which guarantees
 *   that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to
 *   get the most recent [replay] values. The upstream flow continues to be active even when all subscribers
 *   disappear, but only the most recent [replay] values are cached without subscribers.
 * * [WhileSubscribed()][SharingStarted.WhileSubscribed] — starts the upstream flow when the first subscriber
 *   appears, immediately stops when the last subscriber disappears, keeping the replay cache forever.
 *   It has additional optional configuration parameters as explained in its documentation.
 * * A custom strategy can be supplied by implementing the [SharingStarted] interface.
 *
 * The `shareIn` operator is useful in situations when there is a _cold_ flow that is expensive to create and/or
 * to maintain, but there are multiple subscribers that need to collect its values. For example, consider a
 * flow of messages coming from a backend over the expensive network connection, taking a lot of
 * time to establish. Conceptually, it might be implemented like this:
 *
 * ```
 * val backendMessages: Flow<Message> = flow {
 *     connectToBackend() // takes a lot of time
 *     try {
 *       while (true) {
 *           emit(receiveMessageFromBackend())
 *       }
 *     } finally {
 *         disconnectFromBackend()
 *     }
 * }
 * ```
 *
 * If this flow is directly used in the application, then every time it is collected a fresh connection is
 * established, and it will take a while before messages start flowing. However, we can share a single connection
 * and establish it eagerly like this:
 *
 * ```
 * val messages: SharedFlow<Message> = backendMessages.shareIn(scope, SharingStarted.Eagerly)
 * ```
 *
 * Now a single connection is shared between all collectors from `messages`, and there is a chance that the connection
 * is already established by the time it is needed.
 *
 * ### Upstream completion and error handling
 *
 * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
 * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
 * action on upstream completion is needed, then an [onCompletion] operator can be used before the
 * `shareIn` operator to emit a special value in this case, like this:
 *
 * ```
 * backendMessages
 *     .onCompletion { cause -> if (cause == null) emit(UpstreamHasCompletedMessage) }
 *     .shareIn(scope, SharingStarted.Eagerly)
 * ```
 *
 * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
 * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
 * can be configured by using the [catch] or [retry] operators before the `shareIn` operator.
 * For example, to retry connection on any `IOException` with 1 second delay between attempts, use:
 *
 * ```
 * val messages = backendMessages
 *     .retry { e ->
 *         val shallRetry = e is IOException // other exception are bugs - handle them
 *         if (shallRetry) delay(1000)
 *         shallRetry
 *     }
 *     .shareIn(scope, SharingStarted.Eagerly)
 * ```
 *
 * ### Initial value
 *
 * When a special initial value is needed to signal to subscribers that the upstream is still loading the data,
 * use the [onStart] operator on the upstream flow. For example:
 *
 * ```
 * backendMessages
 *     .onStart { emit(UpstreamIsStartingMessage) }
 *     .shareIn(scope, SharingStarted.Eagerly, 1) // replay one most recent message
 * ```
 *
 * ### Buffering and conflation
 *
 * The `shareIn` operator runs the upstream flow in a separate coroutine, and buffers emissions from upstream as explained
 * in the [buffer] operator's description, using a buffer of [replay] size or the default (whichever is larger).
 * This default buffering can be overridden with an explicit buffer configuration by preceding the `shareIn` call
 * with [buffer] or [conflate], for example:
 *
 * * `buffer(0).shareIn(scope, started, 0)` &mdash; overrides the default buffer size and creates a [SharedFlow] without a buffer.
 *   Effectively, it configures sequential processing between the upstream emitter and subscribers,
 *   as the emitter is suspended until all subscribers process the value. Note, that the value is still immediately
 *   discarded when there are no subscribers.
 * * `buffer(b).shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r` and `extraBufferCapacity = b`.
 * * `conflate().shareIn(scope, started, r)` &mdash; creates a [SharedFlow] with `replay = r`, `onBufferOverflow = DROP_OLDEST`,
 *   and `extraBufferCapacity = 1` when `replay == 0` to support this strategy.
 *
 * ### Operator fusion
 *
 * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
 * or [cancellable] operators to the resulting shared flow has no effect.
 *
 * ### Exceptions
 *
 * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
 *
 * @param scope the coroutine scope in which sharing is started.
 * @param started the strategy that controls when sharing is started and stopped.
 * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
 */
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return shared.asSharedFlow()
}

private class SharingConfig<T>(
    @JvmField val upstream: Flow<T>,
    @JvmField val extraBufferCapacity: Int,
    @JvmField val onBufferOverflow: BufferOverflow,
    @JvmField val context: CoroutineContext
)

// Decomposes upstream flow to fuse with it when possible
private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
    assert { replay >= 0 }
    val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
    // Combine with preceding buffer/flowOn and channel-using operators
    if (this is ChannelFlow) {
        // Check if this ChannelFlow can operate without a channel
        val upstream = dropChannelOperators()
        if (upstream != null) { // Yes, it can => eliminate the intermediate channel
            return SharingConfig(
                upstream = upstream,
                extraBufferCapacity = when (capacity) {
                    Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 -> // handle special capacities
                        when {
                            onBufferOverflow == BufferOverflow.SUSPEND -> // buffer was configured with suspension
                                if (capacity == 0) 0 else defaultExtraCapacity // keep explicitly configured 0 or use default
                            replay == 0 -> 1 // no suspension => need at least buffer of one
                            else -> 0 // replay > 0 => no need for extra buffer beyond replay because we don't suspend
                        }
                    else -> capacity // otherwise just use the specified capacity as extra capacity
                },
                onBufferOverflow = onBufferOverflow,
                context = context
            )
        }
    }
    // Add sharing operator on top with a default buffer
    return SharingConfig(
        upstream = this,
        extraBufferCapacity = defaultExtraCapacity,
        onBufferOverflow = BufferOverflow.SUSPEND,
        context = EmptyCoroutineContext
    )
}

// Launches sharing coroutine
private fun <T> CoroutineScope.launchSharing(
    context: CoroutineContext,
    upstream: Flow<T>,
    shared: MutableSharedFlow<T>,
    started: SharingStarted,
    initialValue: T
) {
    launch(context) { // the single coroutine to rule the sharing
        // Optimize common built-in started strategies
        when {
            started === SharingStarted.Eagerly -> {
                // collect immediately & forever
                upstream.collect(shared)
            }
            started === SharingStarted.Lazily -> {
                // start collecting on the first subscriber - wait for it first
                shared.subscriptionCount.first { it > 0 }
                upstream.collect(shared)
            }
            else -> {
                // other & custom strategies
                started.command(shared.subscriptionCount)
                    .distinctUntilChanged() // only changes in command have effect
                    .collectLatest { // cancels block on new emission
                        when (it) {
                            SharingCommand.START -> upstream.collect(shared) // can be cancelled
                            SharingCommand.STOP -> { /* just cancel and do nothing else */ }
                            SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
                                if (initialValue === NO_VALUE) {
                                    shared.resetReplayCache() // regular shared flow -> reset cache
                                } else {
                                    shared.tryEmit(initialValue) // state flow -> reset to initial value
                                }
                            }
                        }
                    }
            }
        }
    }
}

// -------------------------------- stateIn --------------------------------

/**
 * Converts a _cold_ [Flow] into a _hot_ [StateFlow] that is started in the given coroutine [scope],
 * sharing the most recently emitted value from a single running instance of the upstream flow with multiple
 * downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
 *
 * The starting of the sharing coroutine is controlled by the [started] parameter, as explained in the
 * documentation for [shareIn] operator.
 *
 * The `stateIn` operator is useful in situations when there is a _cold_ flow that provides updates to the
 * value of some state and is expensive to create and/or to maintain, but there are multiple subscribers
 * that need to collect the most recent state value. For example, consider a
 * flow of state updates coming from a backend over the expensive network connection, taking a lot of
 * time to establish. Conceptually it might be implemented like this:
 *
 * ```
 * val backendState: Flow<State> = flow {
 *     connectToBackend() // takes a lot of time
 *     try {
 *       while (true) {
 *           emit(receiveStateUpdateFromBackend())
 *       }
 *     } finally {
 *         disconnectFromBackend()
 *     }
 * }
 * ```
 *
 * If this flow is directly used in the application, then every time it is collected a fresh connection is
 * established, and it will take a while before state updates start flowing. However, we can share a single connection
 * and establish it eagerly like this:
 *
 * ```
 * val state: StateFlow<State> = backendMessages.stateIn(scope, SharingStarted.Eagerly, State.LOADING)
 * ```
 *
 * Now, a single connection is shared between all collectors from `state`, and there is a chance that the connection
 * is already established by the time it is needed.
 *
 * ### Upstream completion and error handling
 *
 * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a
 * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special
 * action on upstream completion is needed, then an [onCompletion] operator can be used before
 * the `stateIn` operator to emit a special value in this case. See the [shareIn] operator's documentation for an example.
 *
 * Any exception in the upstream flow terminates the sharing coroutine without affecting any of the subscribers,
 * and will be handled by the [scope] in which the sharing coroutine is launched. Custom exception handling
 * can be configured by using the [catch] or [retry] operators before the `stateIn` operator, similarly to
 * the [shareIn] operator.
 *
 * ### Operator fusion
 *
 * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
 * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
 * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
 *
 * @param scope the coroutine scope in which sharing is started.
 * @param started the strategy that controls when sharing is started and stopped.
 * @param initialValue the initial value of the state flow.
 *   This value is also used when the state flow is reset using the [SharingStarted.WhileSubscribed] strategy
 *   with the `replayExpirationMillis` parameter.
 */
public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T> {
    val config = configureSharing(1)
    val state = MutableStateFlow(initialValue)
    scope.launchSharing(config.context, config.upstream, state, started, initialValue)
    return state.asStateFlow()
}

/**
 * Starts the upstream flow in a given [scope], suspends until the first value is emitted, and returns a _hot_
 * [StateFlow] of future emissions, sharing the most recently emitted value from this running instance of the upstream flow
 * with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
 *
 * @param scope the coroutine scope in which sharing is started.
 */
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
    val config = configureSharing(1)
    val result = CompletableDeferred<StateFlow<T>>()
    scope.launchSharingDeferred(config.context, config.upstream, result)
    return result.await()
}

private fun <T> CoroutineScope.launchSharingDeferred(
    context: CoroutineContext,
    upstream: Flow<T>,
    result: CompletableDeferred<StateFlow<T>>
) {
    launch(context) {
        try {
            var state: MutableStateFlow<T>? = null
            upstream.collect { value ->
                state?.let { it.value = value } ?: run {
                    state = MutableStateFlow(value).also {
                        result.complete(it.asStateFlow())
                    }
                }
            }
        } catch (e: Throwable) {
            // Notify the waiter that the flow has failed
            result.completeExceptionally(e)
            // But still cancel the scope where state was (not) produced
            throw e
        }
    }
}

// -------------------------------- asSharedFlow/asStateFlow --------------------------------

/**
 * Represents this mutable shared flow as a read-only shared flow.
 */
public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> =
    ReadonlySharedFlow(this)

/**
 * Represents this mutable state flow as a read-only state flow.
 */
public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> =
    ReadonlyStateFlow(this)

private class ReadonlySharedFlow<T>(
    flow: SharedFlow<T>
) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseSharedFlow(context, capacity, onBufferOverflow)
}

private class ReadonlyStateFlow<T>(
    flow: StateFlow<T>
) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> {
    override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
        fuseStateFlow(context, capacity, onBufferOverflow)
}

// -------------------------------- onSubscription --------------------------------

/**
 * Returns a flow that invokes the given [action] **after** this shared flow starts to be collected
 * (after the subscription is registered).
 *
 * The [action] is called before any value is emitted from the upstream
 * flow to this subscription but after the subscription is established. It is guaranteed that all emissions to
 * the upstream flow that happen inside or immediately after this `onSubscription` action will be
 * collected by this subscription.
 *
 * The receiver of the [action] is [FlowCollector], so `onSubscription` can emit additional elements.
 */
public fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T> =
    SubscribedSharedFlow(this, action)

private class SubscribedSharedFlow<T>(
    private val sharedFlow: SharedFlow<T>,
    private val action: suspend FlowCollector<T>.() -> Unit
) : SharedFlow<T> by sharedFlow {
    override suspend fun collect(collector: FlowCollector<T>) =
        sharedFlow.collect(SubscribedFlowCollector(collector, action))
}

internal class SubscribedFlowCollector<T>(
    private val collector: FlowCollector<T>,
    private val action: suspend FlowCollector<T>.() -> Unit
) : FlowCollector<T> by collector {
    suspend fun onSubscription() {
        val safeCollector = SafeCollector(collector, currentCoroutineContext())
        try {
            safeCollector.action()
        } finally {
            safeCollector.releaseIntercepted()
        }
        if (collector is SubscribedFlowCollector) collector.onSubscription()
    }
}