path: root/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/internal/Combine.kt')
1 files changed, 142 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
new file mode 100644
index 00000000..f7edad08
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
@@ -0,0 +1,142 @@
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+package kotlinx.coroutines.flow.internal
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.selects.*
+internal fun getNull(): Symbol = NULL // Workaround for JS BE bug
+internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
+ first: Flow<T1>, second: Flow<T2>,
+ transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
+) {
+ coroutineScope {
+ val firstChannel = asFairChannel(first)
+ val secondChannel = asFairChannel(second)
+ var firstValue: Any? = null
+ var secondValue: Any? = null
+ var firstIsClosed = false
+ var secondIsClosed = false
+ while (!firstIsClosed || !secondIsClosed) {
+ select<Unit> {
+ onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
+ firstValue = value
+ if (secondValue !== null) {
+ transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
+ }
+ }
+ onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
+ secondValue = value
+ if (firstValue !== null) {
+ transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
+ }
+ }
+ }
+ }
+ }
+internal suspend fun <R, T> FlowCollector<R>.combineInternal(
+ flows: Array<out Flow<T>>,
+ arrayFactory: () -> Array<T?>,
+ transform: suspend FlowCollector<R>.(Array<T>) -> Unit
+) {
+ coroutineScope {
+ val size = flows.size
+ val channels =
+ Array(size) { asFairChannel(flows[it]) }
+ val latestValues = arrayOfNulls<Any?>(size)
+ val isClosed = Array(size) { false }
+ // See flow.combine(other) for explanation.
+ while (!isClosed.all { it }) {
+ select<Unit> {
+ for (i in 0 until size) {
+ onReceive(isClosed[i], channels[i], { isClosed[i] = true }) { value ->
+ latestValues[i] = value
+ if (latestValues.all { it !== null }) {
+ val arguments = arrayFactory()
+ for (index in 0 until size) {
+ arguments[index] = NULL.unbox(latestValues[index])
+ }
+ transform(arguments as Array<T>)
+ }
+ }
+ }
+ }
+ }
+ }
+private inline fun SelectBuilder<Unit>.onReceive(
+ isClosed: Boolean,
+ channel: ReceiveChannel<Any>,
+ crossinline onClosed: () -> Unit,
+ noinline onReceive: suspend (value: Any) -> Unit
+) {
+ if (isClosed) return
+ channel.onReceiveOrNull {
+ // TODO onReceiveOrClosed when boxing issues are fixed
+ if (it === null) onClosed()
+ else onReceive(it)
+ }
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ val channel = channel as ChannelCoroutine<Any>
+ flow.collect { value ->
+ return@collect channel.sendFair(value ?: NULL)
+ }
+internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
+ coroutineScope {
+ val first = asChannel(flow)
+ val second = asChannel(flow2)
+ /*
+ * This approach only works with rendezvous channel and is required to enforce correctness
+ * in the following scenario:
+ * ```
+ * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
+ * val f2 = flowOf(1)
+ * f1.zip(f2) { ... }
+ * ```
+ *
+ * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
+ */
+ (second as SendChannel<*>).invokeOnClose {
+ if (!first.isClosedForReceive) first.cancel(AbortFlowException())
+ }
+ val otherIterator = second.iterator()
+ try {
+ first.consumeEach { value ->
+ if (!otherIterator.hasNext()) {
+ return@consumeEach
+ }
+ emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
+ }
+ } catch (e: AbortFlowException) {
+ // complete
+ } finally {
+ if (!second.isClosedForReceive) second.cancel(AbortFlowException())
+ }
+ }
+// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
+private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
+ flow.collect { value ->
+ return@collect channel.send(value ?: NULL)
+ }