aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/channels/Channels.common.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channels.common.kt2195
1 files changed, 2195 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
new file mode 100644
index 00000000..cd37bfbc
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
@@ -0,0 +1,2195 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:JvmMultifileClass
+@file:JvmName("ChannelsKt")
+@file:Suppress("DEPRECATION")
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.selects.*
+import kotlin.coroutines.*
+import kotlin.jvm.*
+
+internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
+
+
+// -------- Operations on BroadcastChannel --------
+
+/**
+ * Opens subscription to this [BroadcastChannel] and makes sure that the given [block] consumes all elements
+ * from it by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@ObsoleteCoroutinesApi
+public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
+ val channel = openSubscription()
+ try {
+ return channel.block()
+ } finally {
+ channel.cancel()
+ }
+}
+
+/**
+ * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
+ * or returns `null` if the channel is [closed][Channel.isClosedForReceive].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * *Cancellation of suspended receive is atomic* -- when this function
+ * throws [CancellationException] it means that the element was not retrieved from this channel.
+ * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
+ * continue to execute even after it was cancelled from the same thread in the case when this receive operation
+ * was already resumed and the continuation was posted for execution to the thread's queue.
+ *
+ * Note, that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * This extension is defined only for channels on non-null types, so that generic functions defined using
+ * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
+ * to find bugs.
+ */
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
+ @Suppress("DEPRECATION", "UNCHECKED_CAST")
+ return (this as ReceiveChannel<E?>).receiveOrNull()
+}
+
+/**
+ * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
+ * is received from the channel or selects with `null` if the channel
+ * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
+ * the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ *
+ * This extension is defined only for channels on non-null types, so that generic functions defined using
+ * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
+ * to find bugs.
+ **/
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0
+public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
+ @Suppress("DEPRECATION", "UNCHECKED_CAST")
+ return (this as ReceiveChannel<E?>).onReceiveOrNull
+}
+
+/**
+ * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@ObsoleteCoroutinesApi
+public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit) =
+ consume {
+ for (element in this) action(element)
+ }
+
+// -------- Operations on ReceiveChannel --------
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
+ * with the corresponding cause. See also [ReceiveChannel.consume].
+ *
+ * **WARNING**: It is planned that in the future a second invocation of this method
+ * on an channel that is already being consumed is going to fail fast, that it
+ * immediately throws an [IllegalStateException].
+ * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
+ * for details.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
+ cancelConsumed(cause)
+}
+
+@PublishedApi
+internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
+ cancel(cause?.let {
+ it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
+ })
+}
+
+/**
+ * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
+ * specified [ReceiveChannel] instances with the corresponding cause.
+ * See also [ReceiveChannel.consumes()] for a version on one channel.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
+ { cause: Throwable? ->
+ var exception: Throwable? = null
+ for (channel in channels)
+ try {
+ channel.cancelConsumed(cause)
+ } catch (e: Throwable) {
+ if (exception == null) {
+ exception = e
+ } else {
+ exception.addSuppressedThrowable(e)
+ }
+ }
+ exception?.let { throw it }
+ }
+
+/**
+ * Makes sure that the given [block] consumes all elements from the given channel
+ * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
+ *
+ * The operation is _terminal_.
+ */
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0
+public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
+ var cause: Throwable? = null
+ try {
+ return block()
+ } catch (e: Throwable) {
+ cause = e
+ throw e
+ } finally {
+ cancelConsumed(cause)
+ }
+}
+
+/**
+ * Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
+ * the channel after the execution of the block.
+ * If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+@ExperimentalCoroutinesApi // since 1.3.0, tentatively graduates in 1.4.0
+public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
+ consume {
+ for (e in this) action(e)
+ }
+
+/**
+ * Performs the given [action] for each received element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
+ var index = 0
+ consumeEach {
+ action(IndexedValue(index++, it))
+ }
+}
+
+/**
+ * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
+ elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
+
+/**
+ * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
+ consume {
+ if (index < 0)
+ return defaultValue(index)
+ var count = 0
+ for (element in this) {
+ if (index == count++)
+ return element
+ }
+ return defaultValue(index)
+ }
+
+/**
+ * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
+ consume {
+ if (index < 0)
+ return null
+ var count = 0
+ for (element in this) {
+ if (index == count++)
+ return element
+ }
+ return null
+ }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
+ firstOrNull(predicate)
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
+ lastOrNull(predicate)
+
+/**
+ * Returns first element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.first(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ return iterator.next()
+ }
+
+/**
+ * Returns the first element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
+ consumeEach {
+ if (predicate(it)) return it
+ }
+ throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+}
+
+/**
+ * Returns the first element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ return iterator.next()
+ }
+
+/**
+ * Returns the first element matching the given [predicate], or `null` if element was not found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
+ consumeEach {
+ if (predicate(it)) return it
+ }
+ return null
+}
+
+/**
+ * Returns first index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
+ var index = 0
+ consumeEach {
+ if (element == it)
+ return index
+ index++
+ }
+ return -1
+}
+
+/**
+ * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
+ var index = 0
+ consumeEach {
+ if (predicate(it))
+ return index
+ index++
+ }
+ return -1
+}
+
+/**
+ * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
+ var lastIndex = -1
+ var index = 0
+ consumeEach {
+ if (predicate(it))
+ lastIndex = index
+ index++
+ }
+ return lastIndex
+}
+
+/**
+ * Returns the last element.
+ * @throws [NoSuchElementException] if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.last(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/**
+ * Returns the last element matching the given [predicate].
+ * @throws [NoSuchElementException] if no such element is found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
+ var last: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ last = it
+ found = true
+ }
+ }
+ if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+ @Suppress("UNCHECKED_CAST")
+ return last as E
+}
+
+/**
+ * Returns last index of [element], or -1 if the channel does not contain element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
+ var lastIndex = -1
+ var index = 0
+ consumeEach {
+ if (element == it)
+ lastIndex = index
+ index++
+ }
+ return lastIndex
+}
+
+/**
+ * Returns the last element, or `null` if the channel is empty.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/**
+ * Returns the last element matching the given [predicate], or `null` if no such element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
+ var last: E? = null
+ consumeEach {
+ if (predicate(it)) {
+ last = it
+ }
+ }
+ return last
+}
+
+/**
+ * Returns the single element, or throws an exception if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.single(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ val single = iterator.next()
+ if (iterator.hasNext())
+ throw IllegalArgumentException("ReceiveChannel has more than one element.")
+ return single
+ }
+
+/**
+ * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
+ var single: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
+ single = it
+ found = true
+ }
+ }
+ if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
+ @Suppress("UNCHECKED_CAST")
+ return single as E
+}
+
+/**
+ * Returns single element, or `null` if the channel is empty or has more than one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ val single = iterator.next()
+ if (iterator.hasNext())
+ return null
+ return single
+ }
+
+/**
+ * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
+ var single: E? = null
+ var found = false
+ consumeEach {
+ if (predicate(it)) {
+ if (found) return null
+ single = it
+ found = true
+ }
+ }
+ if (!found) return null
+ return single
+}
+
+/**
+ * Returns a channel containing all elements except first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ if (remaining > 0)
+ for (e in this@drop) {
+ remaining--
+ if (remaining == 0)
+ break
+ }
+ for (e in this@drop) {
+ send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing all elements except first elements that satisfy the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@dropWhile) {
+ if (!predicate(e)) {
+ send(e)
+ break
+ }
+ }
+ for (e in this@dropWhile) {
+ send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@filter) {
+ if (predicate(e)) send(e)
+ }
+ }
+
+/**
+ * Returns a channel containing only elements matching the given [predicate].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@filterIndexed) {
+ if (predicate(index++, e)) send(e)
+ }
+ }
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+ consumeEachIndexed { (index, element) ->
+ if (predicate(index, element)) destination.add(element)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ * @param [predicate] function that takes the index of an element and the element itself
+ * and returns the result of predicate evaluation on the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
+ consumeEachIndexed { (index, element) ->
+ if (predicate(index, element)) destination.send(element)
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing all elements not matching the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ filter(context) { !predicate(it) }
+
+/**
+ * Returns a channel containing all elements that are not `null`.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+@Suppress("UNCHECKED_CAST")
+public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
+ filter { it != null } as ReceiveChannel<E>
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements that are not `null` to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (!predicate(it)) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements not matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (!predicate(it)) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (predicate(it)) destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements matching the given [predicate] to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
+ consumeEach {
+ if (predicate(it)) destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing first [n] elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ if (n == 0) return@produce
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ for (e in this@take) {
+ send(e)
+ remaining--
+ if (remaining == 0)
+ return@produce
+ }
+ }
+
+/**
+ * Returns a channel containing first elements satisfying the given [predicate].
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@takeWhile) {
+ if (!predicate(e)) return@produce
+ send(e)
+ }
+ }
+
+/**
+ * Returns a [Map] containing key-value pairs provided by [transform] function
+ * applied to elements of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
+ associateTo(LinkedHashMap(), transform)
+
+/**
+ * Returns a [Map] containing the elements from the given channel indexed by the key
+ * returned from [keySelector] function applied to each element.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
+ associateByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The returned map preserves the entry iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
+ associateByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function applied to each element of the given channel
+ * and value is the element itself.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
+ consumeEach {
+ destination.put(keySelector(it), it)
+ }
+ return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs,
+ * where key is provided by the [keySelector] function and
+ * and value is provided by the [valueTransform] function applied to elements of the given channel.
+ *
+ * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+ consumeEach {
+ destination.put(keySelector(it), valueTransform(it))
+ }
+ return destination
+}
+
+/**
+ * Populates and returns the [destination] mutable map with key-value pairs
+ * provided by [transform] function applied to each element of the given channel.
+ *
+ * If any of two pairs would have the same key the last one gets added to the map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
+ consumeEach {
+ destination += transform(it)
+ }
+ return destination
+}
+
+/**
+ * Send each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
+ consumeEach {
+ destination.send(it)
+ }
+ return destination
+}
+
+/**
+ * Appends all elements to the given [destination] collection.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
+ consumeEach {
+ destination.add(it)
+ }
+ return destination
+}
+
+/**
+ * Returns a [List] containing all elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+public suspend fun <E> ReceiveChannel<E>.toList(): List<E> =
+ this.toMutableList()
+
+/**
+ * Returns a [Map] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
+ toMap(LinkedHashMap())
+
+/**
+ * Returns a [MutableMap] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
+ consumeEach {
+ destination += it
+ }
+ return destination
+}
+
+/**
+ * Returns a [MutableList] filled with all elements of this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
+ toCollection(ArrayList())
+
+/**
+ * Returns a [Set] of all elements.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
+ this.toMutableSet()
+
+/**
+ * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@flatMap) {
+ transform(e).toChannel(this)
+ }
+ }
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
+ groupByTo(LinkedHashMap(), keySelector)
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and returns a map where each group key is associated with a list of corresponding values.
+ *
+ * The returned map preserves the entry iteration order of the keys produced from the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
+ groupByTo(LinkedHashMap(), keySelector, valueTransform)
+
+/**
+ * Groups elements of the original channel by the key returned by the given [keySelector] function
+ * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
+ consumeEach {
+ val key = keySelector(it)
+ val list = destination.getOrPut(key) { ArrayList() }
+ list.add(it)
+ }
+ return destination
+}
+
+/**
+ * Groups values returned by the [valueTransform] function applied to each element of the original channel
+ * by the key returned by the given [keySelector] function applied to the element
+ * and puts to the [destination] map each group key associated with a list of corresponding values.
+ *
+ * @return The [destination] map.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
+ consumeEach {
+ val key = keySelector(it)
+ val list = destination.getOrPut(key) { ArrayList() }
+ list.add(valueTransform(it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ consumeEach {
+ send(transform(it))
+ }
+ }
+
+/**
+ * Returns a channel containing the results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@mapIndexed) {
+ send(transform(index++, e))
+ }
+ }
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element and its index in the original channel.
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
+ mapIndexed(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+ consumeEachIndexed { (index, element) ->
+ transform(index, element)?.let { destination.add(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends only the non-null results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
+ consumeEachIndexed { (index, element) ->
+ transform(index, element)?.let { destination.send(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+ var index = 0
+ consumeEach {
+ destination.add(transform(index++, it))
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element and its index in the original channel
+ * and appends the results to the given [destination].
+ * @param [transform] function that takes the index of an element and the element itself
+ * and returns the result of the transform applied to the element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
+ var index = 0
+ consumeEach {
+ destination.send(transform(index++, it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel containing only the non-null results of applying the given [transform] function
+ * to each element in the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
+ map(context, transform).filterNotNull()
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+ consumeEach {
+ transform(it)?.let { destination.add(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element in the original channel
+ * and appends only the non-null results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
+ consumeEach {
+ transform(it)?.let { destination.send(it) }
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+ consumeEach {
+ destination.add(transform(it))
+ }
+ return destination
+}
+
+/**
+ * Applies the given [transform] function to each element of the original channel
+ * and appends the results to the given [destination].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
+ consumeEach {
+ destination.send(transform(it))
+ }
+ return destination
+}
+
+/**
+ * Returns a channel of [IndexedValue] for each element of the original channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@withIndex) {
+ send(IndexedValue(index++, e))
+ }
+ }
+
+/**
+ * Returns a channel containing only distinct elements from the given channel.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
+ this.distinctBy { it }
+
+/**
+ * Returns a channel containing only elements from the given channel
+ * having distinct keys returned by the given [selector] function.
+ *
+ * The elements in the resulting channel are in the same order as they were in the source channel.
+ *
+ * The operation is _intermediate_ and _stateful_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ val keys = HashSet<K>()
+ for (e in this@distinctBy) {
+ val k = selector(e)
+ if (k !in keys) {
+ send(e)
+ keys += k
+ }
+ }
+ }
+
+/**
+ * Returns a mutable set containing all distinct elements from the given channel.
+ *
+ * The returned set preserves the element iteration order of the original channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
+ toCollection(LinkedHashSet())
+
+/**
+ * Returns `true` if all elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (!predicate(it)) return false
+ }
+ return true
+}
+
+/**
+ * Returns `true` if channel has at least one element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
+ consume {
+ return iterator().hasNext()
+ }
+
+/**
+ * Returns `true` if at least one element matches the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (predicate(it)) return true
+ }
+ return false
+}
+
+/**
+ * Returns the number of elements in this channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.count(): Int {
+ var count = 0
+ consumeEach { count++ }
+ return count
+}
+
+/**
+ * Returns the number of elements matching the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
+ var count = 0
+ consumeEach {
+ if (predicate(it)) count++
+ }
+ return count
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
+ var accumulator = initial
+ consumeEach {
+ accumulator = operation(accumulator, it)
+ }
+ return accumulator
+}
+
+/**
+ * Accumulates value starting with [initial] value and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself, and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
+ var index = 0
+ var accumulator = initial
+ consumeEach {
+ accumulator = operation(index++, accumulator, it)
+ }
+ return accumulator
+}
+
+/**
+ * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var maxElem = iterator.next()
+ var maxValue = selector(maxElem)
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ val v = selector(e)
+ if (maxValue < v) {
+ maxElem = e
+ maxValue = v
+ }
+ }
+ return maxElem
+ }
+
+/**
+ * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var max = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(max, e) < 0) max = e
+ }
+ return max
+ }
+
+/**
+ * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var minElem = iterator.next()
+ var minValue = selector(minElem)
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ val v = selector(e)
+ if (minValue > v) {
+ minElem = e
+ minValue = v
+ }
+ }
+ return minElem
+ }
+
+/**
+ * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var min = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(min, e) > 0) min = e
+ }
+ return min
+ }
+
+/**
+ * Returns `true` if the channel has no elements.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
+ consume {
+ return !iterator().hasNext()
+ }
+
+/**
+ * Returns `true` if no elements match the given [predicate].
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+ consumeEach {
+ if (predicate(it)) return false
+ }
+ return true
+}
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
+ consume {
+ val iterator = this.iterator()
+ if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+ var accumulator: S = iterator.next()
+ while (iterator.hasNext()) {
+ accumulator = operation(accumulator, iterator.next())
+ }
+ return accumulator
+ }
+
+/**
+ * Accumulates value starting with the first element and applying [operation] from left to right
+ * to current accumulator value and each element with its index in the original channel.
+ * @param [operation] function that takes the index of an element, current accumulator value
+ * and the element itself and calculates the next accumulator value.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+ consume {
+ val iterator = this.iterator()
+ if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
+ var index = 1
+ var accumulator: S = iterator.next()
+ while (iterator.hasNext()) {
+ accumulator = operation(index++, accumulator, iterator.next())
+ }
+ return accumulator
+ }
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
+ var sum = 0
+ consumeEach {
+ sum += selector(it)
+ }
+ return sum
+}
+
+/**
+ * Returns the sum of all values produced by [selector] function applied to each element in the channel.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
+ var sum = 0.0
+ consumeEach {
+ sum += selector(it)
+ }
+ return sum
+}
+
+/**
+ * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
+ map { it ?: throw IllegalArgumentException("null element found in $this.") }
+
+/**
+ * Splits the original channel into pair of lists,
+ * where *first* list contains elements for which [predicate] yielded `true`,
+ * while *second* list contains elements for which [predicate] yielded `false`.
+ *
+ * The operation is _terminal_.
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
+ val first = ArrayList<E>()
+ val second = ArrayList<E>()
+ consumeEach {
+ if (predicate(it)) {
+ first.add(it)
+ } else {
+ second.add(it)
+ }
+ }
+ return Pair(first, second)
+}
+
+/**
+ * Returns a channel of pairs built from elements of both channels with same indexes.
+ * Resulting channel has length of shortest input channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
+ zip(other) { t1, t2 -> t1 to t2 }
+
+/**
+ * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
+ *
+ * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
+ * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
+ */
+@Deprecated(
+ message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4",
+ level = DeprecationLevel.WARNING
+)
+public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
+ GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
+ val otherIterator = other.iterator()
+ this@zip.consumeEach { element1 ->
+ if (!otherIterator.hasNext()) return@consumeEach
+ val element2 = otherIterator.next()
+ send(transform(element1, element2))
+ }
+ }