aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/internal/Merge.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/Merge.kt86
1 files changed, 86 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
new file mode 100644
index 00000000..f621be03
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+
+internal class ChannelFlowTransformLatest<T, R>(
+ private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
+ flow: Flow<T>,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.BUFFERED
+) : ChannelFlowOperator<T, R>(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<R> =
+ ChannelFlowTransformLatest(transform, flow, context, capacity)
+
+ override suspend fun flowCollect(collector: FlowCollector<R>) {
+ assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
+ flowScope {
+ var previousFlow: Job? = null
+ flow.collect { value ->
+ previousFlow?.apply {
+ cancel(ChildCancelledException())
+ join()
+ }
+ // Do not pay for dispatch here, it's never necessary
+ previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
+ collector.transform(value)
+ }
+ }
+ }
+ }
+}
+
+internal class ChannelFlowMerge<T>(
+ flow: Flow<Flow<T>>,
+ private val concurrency: Int,
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = Channel.OPTIONAL_CHANNEL
+) : ChannelFlowOperator<Flow<T>, T>(flow, context, capacity) {
+ override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
+ ChannelFlowMerge(flow, concurrency, context, capacity)
+
+ // The actual merge implementation with concurrency limit
+ private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
+ val semaphore = Semaphore(concurrency)
+ val job: Job? = coroutineContext[Job]
+ flow.collect { inner ->
+ /*
+ * We launch a coroutine on each emitted element and the only potential
+ * suspension point in this collector is `semaphore.acquire` that rarely suspends,
+ * so we manually check for cancellation to propagate it to the upstream in time.
+ */
+ job?.ensureActive()
+ semaphore.acquire()
+ scope.launch {
+ try {
+ inner.collect(collector)
+ } finally {
+ semaphore.release() // Release concurrency permit
+ }
+ }
+ }
+ }
+
+ // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
+ override suspend fun flowCollect(collector: FlowCollector<T>) {
+ // this function should not have been invoked when channel was explicitly requested
+ assert { capacity == Channel.OPTIONAL_CHANNEL }
+ flowScope {
+ mergeImpl(this, collector.asConcurrentFlowCollector())
+ }
+ }
+
+ // Slow path when output channel is required (and was created)
+ override suspend fun collectTo(scope: ProducerScope<T>) =
+ mergeImpl(scope, SendingCollector(scope))
+
+ override fun additionalToStringProps(): String =
+ "concurrency=$concurrency, "
+}