aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
blob: 584178d8c4be908a0fd6cfa6bf72c41f81423e75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */
@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*

internal fun getNull(): Symbol = NULL // Workaround for JS BE bug

internal suspend fun <T1, T2, R> FlowCollector<R>.combineTransformInternal(
    first: Flow<T1>, second: Flow<T2>,
    transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
) {
    coroutineScope {
        val firstChannel = asFairChannel(first)
        val secondChannel = asFairChannel(second)
        var firstValue: Any? = null
        var secondValue: Any? = null
        var firstIsClosed = false
        var secondIsClosed = false
        while (!firstIsClosed || !secondIsClosed) {
            select<Unit> {
                onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value ->
                    firstValue = value
                    if (secondValue !== null) {
                        transform(getNull().unbox(firstValue), getNull().unbox(secondValue) as T2)
                    }
                }

                onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value ->
                    secondValue = value
                    if (firstValue !== null) {
                        transform(getNull().unbox(firstValue) as T1, getNull().unbox(secondValue) as T2)
                    }
                }
            }
        }
    }
}

@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
    flows: Array<out Flow<T>>,
    arrayFactory: () -> Array<T?>,
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = coroutineScope {
    val size = flows.size
    val channels = Array(size) { asFairChannel(flows[it]) }
    val latestValues = arrayOfNulls<Any?>(size)
    val isClosed = Array(size) { false }
    var nonClosed = size
    var remainingNulls = size
    // See flow.combine(other) for explanation.
    while (nonClosed != 0) {
        select<Unit> {
            for (i in 0 until size) {
                onReceive(isClosed[i], channels[i], { isClosed[i] = true; --nonClosed }) { value ->
                    if (latestValues[i] == null) --remainingNulls
                    latestValues[i] = value
                    if (remainingNulls != 0) return@onReceive
                    val arguments = arrayFactory()
                    for (index in 0 until size) {
                        arguments[index] = NULL.unbox(latestValues[index])
                    }
                    transform(arguments as Array<T>)
                }
            }
        }
    }
}

private inline fun SelectBuilder<Unit>.onReceive(
    isClosed: Boolean,
    channel: ReceiveChannel<Any>,
    crossinline onClosed: () -> Unit,
    noinline onReceive: suspend (value: Any) -> Unit
) {
    if (isClosed) return
    @Suppress("DEPRECATION")
    channel.onReceiveOrNull {
        // TODO onReceiveOrClosed when boxing issues are fixed
        if (it === null) onClosed()
        else onReceive(it)
    }
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
    val channel = channel as ChannelCoroutine<Any>
    flow.collect { value ->
        return@collect channel.sendFair(value ?: NULL)
    }
}

internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = unsafeFlow {
    coroutineScope {
        val first = asChannel(flow)
        val second = asChannel(flow2)
        /*
         * This approach only works with rendezvous channel and is required to enforce correctness
         * in the following scenario:
         * ```
         * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
         * val f2 = flowOf(1)
         * f1.zip(f2) { ... }
         * ```
         *
         * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
         */
        (second as SendChannel<*>).invokeOnClose {
            if (!first.isClosedForReceive) first.cancel(AbortFlowException(this@unsafeFlow))
        }

        val otherIterator = second.iterator()
        try {
            first.consumeEach { value ->
                if (!otherIterator.hasNext()) {
                    return@consumeEach
                }
                emit(transform(NULL.unbox(value), NULL.unbox(otherIterator.next())))
            }
        } catch (e: AbortFlowException) {
            e.checkOwnership(owner = this@unsafeFlow)
        } finally {
            if (!second.isClosedForReceive) second.cancel(AbortFlowException(this@unsafeFlow))
        }
    }
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
    flow.collect { value ->
        return@collect channel.send(value ?: NULL)
    }
}