aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
blob: 04342ed074a52696d38ca70a45f2fc1eeb6987e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.flow.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
 * Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
 *
 * Normally, [flows][Flow] are _sequential_. It means that the code of all operators is executed in the
 * same coroutine. For example, consider the following code using [onEach] and [collect] operators:
 *
 * ```
 * flowOf("A", "B", "C")
 *     .onEach  { println("1$it") }
 *     .collect { println("2$it") }
 * ```
 *
 * It is going to be executed in the following order by the coroutine `Q` that calls this code:
 *
 * ```
 * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
 * ```
 *
 * So if the operator's code takes considerable time to execute, then the total execution time is going to be
 * the sum of execution times for all operators.
 *
 * The `buffer` operator creates a separate coroutine during execution for the flow it applies to.
 * Consider the following code:
 *
 * ```
 * flowOf("A", "B", "C")
 *     .onEach  { println("1$it") }
 *     .buffer()  // <--------------- buffer between onEach and collect
 *     .collect { println("2$it") }
 * ```
 *
 * It will use two coroutines for execution of the code. A coroutine `Q` that calls this code is
 * going to execute `collect`, and the code before `buffer` will be executed in a separate
 * new coroutine `P` concurrently with `Q`:
 *
 * ```
 * P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }
 *
 *                       |
 *                       | channel               // buffer()
 *                       V
 *
 * Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect
 * ```
 *
 * When the operator's code takes some time to execute, this decreases the total execution time of the flow.
 * A [channel][Channel] is used between the coroutines to send elements emitted by the coroutine `P` to
 * the coroutine `Q`. If the code before `buffer` operator (in the coroutine `P`) is faster than the code after
 * `buffer` operator (in the coroutine `Q`), then this channel will become full at some point and will suspend
 * the producer coroutine `P` until the consumer coroutine `Q` catches up.
 * The [capacity] parameter defines the size of this buffer.
 *
 * ### Buffer overflow
 *
 * By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be
 * overridden with an optional [onBufferOverflow] parameter so that the emitter is never suspended. In this
 * case, on buffer overflow either the oldest value in the buffer is dropped with the [DROP_OLDEST][BufferOverflow.DROP_OLDEST]
 * strategy and the latest emitted value is added to the buffer,
 * or the latest value that is being emitted is dropped with the [DROP_LATEST][BufferOverflow.DROP_LATEST] strategy,
 * keeping the buffer intact.
 * To implement either of the custom strategies, a buffer of at least one element is used.
 *
 * ### Operator fusion
 *
 * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
 * always fused so that only one properly configured channel is used for execution.
 *
 * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
 * which effectively requests a buffer of any size. Multiple requests with a specified buffer
 * size produce a buffer with the sum of the requested buffer sizes.
 *
 * A `buffer` call with a non-default value of the [onBufferOverflow] parameter overrides all immediately preceding
 * buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.
 *
 * ### Conceptual implementation
 *
 * The actual implementation of `buffer` is not trivial due to the fusing, but conceptually its basic
 * implementation is equivalent to the following code that can be written using [produce]
 * coroutine builder to produce a channel and [consumeEach][ReceiveChannel.consumeEach] extension to consume it:
 *
 * ```
 * fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
 *     coroutineScope { // limit the scope of concurrent producer coroutine
 *         val channel = produce(capacity = capacity) {
 *             collect { send(it) } // send all to channel
 *         }
 *         // emit all received values
 *         channel.consumeEach { emit(it) }
 *     }
 * }
 * ```
 *
 * ### Conflation
 *
 * Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is a shortcut to
 * `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`, and is available via
 * a separate [conflate] operator. See its documentation for details.
 *
 * @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
 *   factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
 *   [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
 *   an explicitly requested size.
 * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
 *   [SUSPEND][BufferOverflow.SUSPEND], supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
 *   implicitly creates a channel with at least one buffered element).
 */
@Suppress("NAME_SHADOWING")
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
        "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
    }
    require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
    }
    // desugar CONFLATED capacity to (0, DROP_OLDEST)
    var capacity = capacity
    var onBufferOverflow = onBufferOverflow
    if (capacity == CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capacity)

/**
 * Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
 * The effect of this is that emitter is never suspended due to a slow collector, but collector
 * always gets the most recent value emitted.
 *
 * For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
 *
 * ```
 * val flow = flow {
 *     for (i in 1..30) {
 *         delay(100)
 *         emit(i)
 *     }
 * }
 * ```
 *
 * Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
 * integers 1, 10, 20, 30:
 *
 * ```
 * val result = flow.conflate().onEach { delay(1000) }.toList()
 * assertEquals(listOf(1, 10, 20, 30), result)
 * ```
 *
 * Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED],
 * with is, in turn, a shortcut to a buffer that only keeps the latest element as
 * created by `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`.
 *
 * ### Operator fusion
 *
 * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
 * always fused so that only one properly configured channel is used for execution.
 * **Conflation takes precedence over `buffer()` calls with any other capacity.**
 *
 * Note that any instance of [StateFlow] already behaves as if `conflate` operator is
 * applied to it, so applying `conflate` to a `StateFlow` has no effect.
 * See [StateFlow] documentation on Operator Fusion.
 */
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

/**
 * Changes the context where this flow is executed to the given [context].
 * This operator is composable and affects only preceding operators that do not have its own context.
 * This operator is context preserving: [context] **does not** leak into the downstream flow.
 *
 * For example:
 *
 * ```
 * withContext(Dispatchers.Main) {
 *     val singleValue = intFlow // will be executed on IO if context wasn't specified before
 *         .map { ... } // Will be executed in IO
 *         .flowOn(Dispatchers.IO)
 *         .filter { ... } // Will be executed in Default
 *         .flowOn(Dispatchers.Default)
 *         .single() // Will be executed in the Main
 * }
 * ```
 *
 * For more explanation of context preservation please refer to [Flow] documentation.
 *
 * This operator retains a _sequential_ nature of flow if changing the context does not call for changing
 * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
 * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
 * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
 * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
 * before or after `flowOn`, which requests buffering behavior and specifies channel size.
 *
 * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
 * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
 * was already emitted by the upstream flow.
 *
 * ### Operator fusion
 *
 * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
 * always fused so that only one properly configured channel is used for execution.
 *
 * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
 * the first `flowOn` operator naturally take precedence over the elements of the second `flowOn` operator
 * when they have the same context keys, for example:
 *
 * ```
 * flow.map { ... } // Will be executed in IO
 *     .flowOn(Dispatchers.IO) // This one takes precedence
 *     .flowOn(Dispatchers.Default)
 * ```
 *
 * Note that an instance of [SharedFlow] does not have an execution context by itself,
 * so applying `flowOn` to a `SharedFlow` has not effect. See the [SharedFlow] documentation on Operator Fusion.
 *
 * @throws [IllegalArgumentException] if provided context contains [Job] instance.
 */
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

/**
 * Returns a flow which checks cancellation status on each emission and throws
 * the corresponding cancellation cause if flow collector was cancelled.
 * Note that [flow] builder and all implementations of [SharedFlow] are [cancellable] by default.
 *
 * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
 * See [ensureActive][CoroutineContext.ensureActive] for details.
 */
public fun <T> Flow<T>.cancellable(): Flow<T> =
    when (this) {
        is CancellableFlow<*> -> this // Fast-path, already cancellable
        else -> CancellableFlowImpl(this)
    }

/**
 * Internal marker for flows that are [cancellable].
 */
internal interface CancellableFlow<out T> : Flow<T>

/**
 * Named implementation class for a flow that is defined by the [cancellable] function.
 */
private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        flow.collect {
            currentCoroutineContext().ensureActive()
            collector.emit(it)
        }
    }
}

/**
 * The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
 * This operator is context preserving and does not affect the context of the preceding and subsequent operations.
 *
 * Example:
 *
 * ```
 * flow // not affected
 *     .map { ... } // Not affected
 *     .flowWith(Dispatchers.IO) {
 *         map { ... } // in IO
 *         .filter { ... } // in IO
 *     }
 *     .map { ... } // Not affected
 * ```
 *
 * For more explanation of context preservation please refer to [Flow] documentation.
 *
 * This operator is deprecated without replacement because it was discovered that it doesn't play well with coroutines
 * and flow semantics:
 *
 * 1) It doesn't prevent context elements from the downstream to leak into its body
 *     ```
 *     flowOf(1).flowWith(EmptyCoroutineContext) {
 *         onEach { println(kotlin.coroutines.coroutineContext[CoroutineName]) } // Will print 42
 *     }.flowOn(CoroutineName(42))
 *     ```
 * 2) To avoid such leaks, new primitive should be introduced to `kotlinx.coroutines` -- the subtraction of contexts.
 *    And this will become a new concept to learn, maintain and explain.
 * 3) It defers the execution of declarative [builder] until the moment of [collection][Flow.collect] similarly
 *    to `Observable.defer`. But it is unexpected because nothing in the name `flowWith` reflects this fact.
 * 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
 *
 * @suppress
 */
@FlowPreview
@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4
public fun <T, R> Flow<T>.flowWith(
    flowContext: CoroutineContext,
    bufferSize: Int = BUFFERED,
    builder: Flow<T>.() -> Flow<R>
): Flow<R> {
    checkFlowContext(flowContext)
    val source = this
    return unsafeFlow {
        /**
         * Here we should remove a Job instance from the context.
         * All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
         * It is also necessary not to mess with cancellation if multiple flowWith are used.
         */
        val originalContext = currentCoroutineContext().minusKey(Job)
        val prepared = source.flowOn(originalContext).buffer(bufferSize)
        builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
            return@collect emit(value)
        }
    }
}

private fun checkFlowContext(context: CoroutineContext) {
    require(context[Job] == null) {
        "Flow context cannot contain job in it. Had $context"
    }
}