aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
blob: a937adcc8abb6c13c46d7cb889d9b2ab2800ad16 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

@file:JvmMultifileClass
@file:JvmName("FlowKt")
@file:Suppress("UNCHECKED_CAST")

package kotlinx.coroutines.flow

import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.Symbol
import kotlin.jvm.*

/**
 * Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
 * Throws [NoSuchElementException] if flow was empty.
 */
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
    var accumulator: Any? = NULL

    collect { value ->
        accumulator = if (accumulator !== NULL) {
            @Suppress("UNCHECKED_CAST")
            operation(accumulator as S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    @Suppress("UNCHECKED_CAST")
    return accumulator as S
}

/**
 * Accumulates value starting with [initial] value and applying [operation] current accumulator value and each element
 */
public suspend inline fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (acc: R, value: T) -> R
): R {
    var accumulator = initial
    collect { value ->
        accumulator = operation(accumulator, value)
    }
    return accumulator
}

/**
 * The terminal operator that awaits for one and only one value to be emitted.
 * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
 * that contains more than one element.
 */
public suspend fun <T> Flow<T>.single(): T {
    var result: Any? = NULL
    collect { value ->
        require(result === NULL) { "Flow has more than one element" }
        result = value
    }

    if (result === NULL) throw NoSuchElementException("Flow is empty")
    return result as T
}

/**
 * The terminal operator that awaits for one and only one value to be emitted.
 * Returns the single value or `null`, if the flow was empty or emitted more than one value.
 */
public suspend fun <T> Flow<T>.singleOrNull(): T? {
    var result: Any? = NULL
    collectWhile {
        // No values yet, update result
        if (result === NULL) {
            result = it
            true
        } else {
            // Second value, reset result and bail out
            result = NULL
            false
        }
    }
    return if (result === NULL) null else result as T
}

/**
 * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
 * Throws [NoSuchElementException] if the flow was empty.
 */
public suspend fun <T> Flow<T>.first(): T {
    var result: Any? = NULL
    collectWhile {
        result = it
        false
    }
    if (result === NULL) throw NoSuchElementException("Expected at least one element")
    return result as T
}

/**
 * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
 * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
 */
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
    var result: Any? = NULL
    collectWhile {
        if (predicate(it)) {
            result = it
            false
        } else {
            true
        }
    }
    if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
    return result as T
}

/**
 * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
 * Returns `null` if the flow was empty.
 */
public suspend fun <T> Flow<T>.firstOrNull(): T? {
    var result: T? = null
    collectWhile {
        result = it
        false
    }
    return result
}

/**
 * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
 * Returns `null` if the flow did not contain an element matching the [predicate].
 */
public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
    var result: T? = null
    collectWhile {
        if (predicate(it)) {
            result = it
            false
        } else {
            true
        }
    }
    return result
}