aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/channels
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/channels')
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt110
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt1
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Broadcast.kt21
-rw-r--r--kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt12
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channel.kt449
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt4
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channels.common.kt2122
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt19
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt5
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Deprecated.kt478
-rw-r--r--kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Produce.kt2
12 files changed, 881 insertions, 2344 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 9721583e..bcf19215 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -127,8 +127,7 @@ internal abstract class AbstractSendChannel<E>(
// ------ SendChannel ------
public final override val isClosedForSend: Boolean get() = closedForSend != null
- public override val isFull: Boolean get() = isFullImpl
- protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
+ private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
@@ -137,23 +136,43 @@ internal abstract class AbstractSendChannel<E>(
return sendSuspend(element)
}
- public final override fun offer(element: E): Boolean {
+ override fun offer(element: E): Boolean {
+ // Temporary migration for offer users who rely on onUndeliveredElement
+ try {
+ return super.offer(element)
+ } catch (e: Throwable) {
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
+ // If it crashes, add send exception as suppressed for better diagnostics
+ it.addSuppressed(e)
+ throw it
+ }
+ throw e
+ }
+ }
+
+ public final override fun trySend(element: E): ChannelResult<Unit> {
val result = offerInternal(element)
return when {
- result === OFFER_SUCCESS -> true
+ result === OFFER_SUCCESS -> ChannelResult.success(Unit)
result === OFFER_FAILED -> {
- // We should check for closed token on offer as well, otherwise offer won't be linearizable
+ // We should check for closed token on trySend as well, otherwise trySend won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
- throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
+ val closedForSend = closedForSend ?: return ChannelResult.failure()
+ ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
}
result is Closed<*> -> {
- throw recoverStackTrace(helpCloseAndGetSendException(element, result))
+ ChannelResult.closed(helpCloseAndGetSendException(result))
}
- else -> error("offerInternal returned $result")
+ else -> error("trySend returned $result")
}
}
+ private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
+ helpClose(closed)
+ return closed.sendException
+ }
+
private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
@@ -604,26 +623,8 @@ internal abstract class AbstractChannel<E>(
if (result) onReceiveEnqueued()
}
- public final override suspend fun receiveOrNull(): E? {
- // fast path -- try poll non-blocking
- val result = pollInternal()
- @Suppress("UNCHECKED_CAST")
- if (result !== POLL_FAILED && result !is Closed<*>) return result as E
- // slow-path does suspend
- return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
- }
-
@Suppress("UNCHECKED_CAST")
- private fun receiveOrNullResult(result: Any?): E? {
- if (result is Closed<*>) {
- if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
- return null
- }
- return result as E
- }
-
- @Suppress("UNCHECKED_CAST")
- public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
+ public final override suspend fun receiveCatching(): ChannelResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
@@ -632,9 +633,11 @@ internal abstract class AbstractChannel<E>(
}
@Suppress("UNCHECKED_CAST")
- public final override fun poll(): E? {
+ public final override fun tryReceive(): ChannelResult<E> {
val result = pollInternal()
- return if (result === POLL_FAILED) null else receiveOrNullResult(result)
+ if (result === POLL_FAILED) return ChannelResult.failure()
+ if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
+ return ChannelResult.success(result as E)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
@@ -734,18 +737,10 @@ internal abstract class AbstractChannel<E>(
}
}
- final override val onReceiveOrNull: SelectClause1<E?>
- get() = object : SelectClause1<E?> {
+ final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
+ get() = object : SelectClause1<ChannelResult<E>> {
@Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
- registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R)
- }
- }
-
- final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
- get() = object : SelectClause1<ValueOrClosed<E>> {
- @Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
}
}
@@ -776,15 +771,7 @@ internal abstract class AbstractChannel<E>(
}
RECEIVE_RESULT -> {
if (!select.trySelect()) return
- startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
- }
- RECEIVE_NULL_ON_CLOSE -> {
- if (value.closeCause == null) {
- if (!select.trySelect()) return
- startCoroutineUnintercepted(null, select.completion)
- } else {
- throw recoverStackTrace(value.receiveException)
- }
+ startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
}
}
}
@@ -905,7 +892,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
- RECEIVE_RESULT -> ValueOrClosed.value(value)
+ RECEIVE_RESULT -> ChannelResult.success(value)
else -> value
}
@@ -921,7 +908,6 @@ internal abstract class AbstractChannel<E>(
override fun resumeReceiveClosed(closed: Closed<*>) {
when {
- receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
else -> cont.resumeWithException(closed.receiveException)
}
@@ -990,7 +976,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
- if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
+ if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
@@ -1000,12 +986,7 @@ internal abstract class AbstractChannel<E>(
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
- RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
- RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
- block.startCoroutineCancellable(null, select.completion)
- } else {
- select.resumeSelectWithException(closed.receiveException)
- }
+ RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
}
}
@@ -1023,8 +1004,7 @@ internal abstract class AbstractChannel<E>(
// receiveMode values
internal const val RECEIVE_THROWS_ON_CLOSE = 0
-internal const val RECEIVE_NULL_ON_CLOSE = 1
-internal const val RECEIVE_RESULT = 2
+internal const val RECEIVE_RESULT = 1
@JvmField
@SharedImmutable
@@ -1128,9 +1108,9 @@ internal class Closed<in E>(
override val offerResult get() = this
override val pollResult get() = this
- override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
- override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(value: E) {}
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
@@ -1143,8 +1123,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
}
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
-private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
- if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
+private inline fun <E> Any?.toResult(): ChannelResult<E> =
+ if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
@Suppress("NOTHING_TO_INLINE")
-private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
+private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
index 4569ec72..7e6c0e68 100644
--- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt
@@ -49,7 +49,6 @@ internal open class ArrayChannel<E>(
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND
- override val isFull: Boolean get() = lock.withLock { isFullImpl }
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive }
diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
index 07e75976..b1c24b45 100644
--- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.*
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced with the
- * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
+ * operator.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
+@ObsoleteCoroutinesApi
public fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
@@ -95,10 +97,12 @@ public fun <E> ReceiveChannel<E>.broadcast(
*
* ### Future replacement
*
+ * This API is obsolete since 1.5.0.
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
- * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
- * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
+ * the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
+ * operator.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
@@ -106,6 +110,7 @@ public fun <E> ReceiveChannel<E>.broadcast(
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
+@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
@@ -127,7 +132,13 @@ private open class BroadcastCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: BroadcastChannel<E>,
active: Boolean
-) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
+) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active),
+ ProducerScope<E>, BroadcastChannel<E> by _channel {
+
+ init {
+ initParentJob(parentContext[Job])
+ }
+
override val isActive: Boolean get() = super.isActive
override val channel: SendChannel<E>
diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
index 6cd79373..c82b8dbd 100644
--- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
@@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
- * when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
*/
-@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
+@ObsoleteCoroutinesApi
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
@@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
* * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
* * otherwise -- throws [IllegalArgumentException].
*
- * **Note: This is an experimental api.** It may be changed in the future updates.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]
+ * and [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
*/
-@ExperimentalCoroutinesApi
+@ObsoleteCoroutinesApi
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index b8b81aac..b15c4262 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -14,6 +14,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
+import kotlin.contracts.*
import kotlin.internal.*
import kotlin.jvm.*
@@ -23,7 +24,7 @@ import kotlin.jvm.*
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by an invocation of [close]. This means that
- * calling [send] or [offer] will result in an exception.
+ * calling [send] will result in an exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
@@ -31,16 +32,6 @@ public interface SendChannel<in E> {
public val isClosedForSend: Boolean
/**
- * Returns `true` if the channel is full (out of capacity), which means that an attempt to [send] will suspend.
- * This function returns `false` if the channel [is closed for `send`][isClosedForSend].
- *
- * @suppress **Will be removed in next releases, no replacement.**
- */
- @ExperimentalCoroutinesApi
- @Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement")
- public val isFull: Boolean
-
- /**
* Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full
* or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
*
@@ -60,7 +51,7 @@ public interface SendChannel<in E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onSend] clause.
- * Use [offer] to try sending to this channel without waiting.
+ * Use [trySend] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)
@@ -73,19 +64,17 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>
+
/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
- * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off
- * in situations when `send` suspends.
- *
- * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
+ * and returns the successful result. Otherwise, returns failed or closed result.
+ * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
*
- * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
- * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
- * then it calls `onUndeliveredElement` before throwing an exception.
+ * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
+ * it does not call `onUndeliveredElement` that was installed for this channel.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
- public fun offer(element: E): Boolean
+ public fun trySend(element: E): ChannelResult<Unit>
/**
* Closes this channel.
@@ -97,7 +86,7 @@ public interface SendChannel<in E> {
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
- * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer]
+ * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
* and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
@@ -116,10 +105,11 @@ public interface SendChannel<in E> {
* * the cause of `close` or `cancel` otherwise.
*
* Example of usage (exception handling is omitted):
+ *
* ```
* val events = Channel(UNLIMITED)
* callbackBasedApi.registerCallback { event ->
- * events.offer(event)
+ * events.trySend(event)
* }
*
* val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
@@ -128,7 +118,6 @@ public interface SendChannel<in E> {
* }
*
* events.invokeOnClose { callbackBasedApi.stop() }
- *
* ```
*
* **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
@@ -140,6 +129,44 @@ public interface SendChannel<in E> {
*/
@ExperimentalCoroutinesApi
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
+
+ /**
+ * **Deprecated** offer method.
+ *
+ * This method was deprecated in the favour of [trySend].
+ * It has proven itself as the most error-prone method in Channel API:
+ *
+ * * `Boolean` return type creates the false sense of security, implying that `false`
+ * is returned instead of throwing an exception.
+ * * It was used mostly from non-suspending APIs where CancellationException triggered
+ * internal failures in the application (the most common source of bugs).
+ * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
+ * oversee such error during code review.
+ * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
+ *
+ * **NB** Automatic migration provides best-effort for the user experience, but requires removal
+ * or adjusting of the code that relied on the exception handling.
+ * The complete replacement has a more verbose form:
+ * ```
+ * channel.trySend(element)
+ * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
+ * .isSuccess
+ * ```
+ *
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
+ *
+ * @suppress **Deprecated**.
+ */
+ @Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'trySend' method",
+ replaceWith = ReplaceWith("trySend(element).isSuccess")
+ ) // Warning since 1.5.0
+ public fun offer(element: E): Boolean {
+ val result = trySend(element)
+ if (result.isSuccess) return true
+ throw recoverStackTrace(result.exceptionOrNull() ?: return false)
+ }
}
/**
@@ -182,7 +209,7 @@ public interface ReceiveChannel<out E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceive] clause.
- * Use [poll] to try receiving from this channel without waiting.
+ * Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E
@@ -195,94 +222,39 @@ public interface ReceiveChannel<out E> {
public val onReceive: SelectClause1<E>
/**
- * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
- * or returns `null` if the channel is [closed for `receive`][isClosedForReceive] without cause,
- * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
- *
- * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
- * function is suspended, this function immediately resumes with a [CancellationException].
- * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
- * suspended, it will not resume successfully. The `receiveOrNull` call can retrieve the element from the channel,
- * but then throw [CancellationException], thus failing to deliver the element.
- * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
- *
- * Note that this function does not check for cancellation when it is not suspended.
- * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
- *
- * This function can be used in [select] invocations with the [onReceiveOrNull] clause.
- * Use [poll] to try receiving from this channel without waiting.
- *
- * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension.
- */
- @ObsoleteCoroutinesApi
- @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
- @LowPriorityInOverloadResolution
- @Deprecated(
- message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
- level = DeprecationLevel.WARNING,
- replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
- )
- public suspend fun receiveOrNull(): E?
-
- /**
- * Clause for the [select] expression of the [receiveOrNull] suspending function that selects with the element
- * received from the channel or `null` if the channel is
- * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with
- * the original [close][SendChannel.close] cause exception if the channel has _failed_.
- *
- * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
- */
- @ObsoleteCoroutinesApi
- @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
- @LowPriorityInOverloadResolution
- @Deprecated(
- message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
- level = DeprecationLevel.WARNING,
- replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
- )
- public val onReceiveOrNull: SelectClause1<E?>
-
- /**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
- * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel
- * or the close cause if the channel was closed.
+ * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
+ * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
+ * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed].
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
- * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel,
+ * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
* but then throw [CancellationException], thus failing to deliver the element.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocations with the [onReceiveOrClosed] clause.
- * Use [poll] to try receiving from this channel without waiting.
- *
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ * This function can be used in [select] invocations with the [onReceiveCatching] clause.
+ * Use [tryReceive] to try receiving from this channel without waiting.
*/
- @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
- public suspend fun receiveOrClosed(): ValueOrClosed<E>
+ public suspend fun receiveCatching(): ChannelResult<E>
/**
- * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
+ * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
* that is received from the channel or with a close cause if the channel
* [is closed for `receive`][isClosedForReceive].
- *
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
- @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
- public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
+ public val onReceiveCatching: SelectClause1<ChannelResult<E>>
/**
- * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
- * or is [is closed for `receive`][isClosedForReceive] without a cause.
- * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
+ * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
+ * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
+ * result if the channel is closed.
*/
- public fun poll(): E?
+ public fun tryReceive(): ChannelResult<E>
/**
* Returns a new iterator to receive elements from this channel using a `for` loop.
@@ -318,107 +290,262 @@ public interface ReceiveChannel<out E> {
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public fun cancel(cause: Throwable? = null): Boolean
+
+ /**
+ * **Deprecated** poll method.
+ *
+ * This method was deprecated in the favour of [tryReceive].
+ * It has proven itself as error-prone method in Channel API:
+ *
+ * * Nullable return type creates the false sense of security, implying that `null`
+ * is returned instead of throwing an exception.
+ * * It was used mostly from non-suspending APIs where CancellationException triggered
+ * internal failures in the application (the most common source of bugs).
+ * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
+ *
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
+ *
+ * ### Replacement note
+ *
+ * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and
+ * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception.
+ * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()`
+ *
+ * @suppress **Deprecated**.
+ */
+ @Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'tryReceive'. " +
+ "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " +
+ "for the precise replacement please refer to the 'poll' documentation",
+ replaceWith = ReplaceWith("tryReceive().getOrNull()")
+ ) // Warning since 1.5.0
+ public fun poll(): E? {
+ val result = tryReceive()
+ if (result.isSuccess) return result.getOrThrow()
+ throw recoverStackTrace(result.exceptionOrNull() ?: return null)
+ }
+
+ /**
+ * This function was deprecated since 1.3.0 and is no longer recommended to use
+ * or to implement in subclasses.
+ *
+ * It had the following pitfalls:
+ * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
+ * - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
+ * - It didn't really belong to core channel API and can be exposed as an extension instead.
+ *
+ * ### Replacement note
+ *
+ * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and
+ * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception.
+ * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`.
+ *
+ * @suppress **Deprecated**
+ */
+ @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+ @LowPriorityInOverloadResolution
+ @Deprecated(
+ message = "Deprecated in favor of 'receiveCatching'. " +
+ "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " +
+ "for the detailed replacement please refer to the 'receiveOrNull' documentation",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("receiveCatching().getOrNull()")
+ ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
+ public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()
+
+ /**
+ * This function was deprecated since 1.3.0 and is no longer recommended to use
+ * or to implement in subclasses.
+ * See [receiveOrNull] documentation.
+ *
+ * @suppress **Deprecated**: in favor of onReceiveCatching extension.
+ */
+ @Deprecated(
+ message = "Deprecated in favor of onReceiveCatching extension",
+ level = DeprecationLevel.ERROR,
+ replaceWith = ReplaceWith("onReceiveCatching")
+ ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0
+ public val onReceiveOrNull: SelectClause1<E?>
+ get() {
+ return object : SelectClause1<E?> {
+ @InternalCoroutinesApi
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
+ onReceiveCatching.registerSelectClause1(select) {
+ it.exceptionOrNull()?.let { throw it }
+ block(it.getOrNull())
+ }
+ }
+ }
+ }
}
/**
- * A discriminated union of [ReceiveChannel.receiveOrClosed] result
- * that encapsulates either an element of type [T] successfully received from the channel or a close cause.
+ * A discriminated union of channel operation result.
+ * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
+ * an optional cause.
*
- * :todo: Do not make it public before resolving todos in the code of this class.
+ * The successful result represents a successful operation with a value of type [T], for example,
+ * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
*
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
+ * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
+ *
+ * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
+ * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend]
+ * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving.
*/
-@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING")
-@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
-public inline class ValueOrClosed<out T>
-internal constructor(private val holder: Any?) {
+@JvmInline
+public value class ChannelResult<out T>
+@PublishedApi internal constructor(@PublishedApi internal val holder: Any?) {
/**
- * Returns `true` if this instance represents a received element.
- * In this case [isClosed] returns `false`.
- * todo: it is commented for now, because it is not used
+ * Returns `true` if this instance represents a successful
+ * operation outcome.
+ *
+ * In this case [isFailure] and [isClosed] return `false`.
*/
- //public val isValue: Boolean get() = holder !is Closed
+ public val isSuccess: Boolean get() = holder !is Failed
/**
- * Returns `true` if this instance represents a close cause.
- * In this case [isValue] returns `false`.
+ * Returns `true` if this instance represents unsuccessful operation.
+ *
+ * In this case [isSuccess] returns false, but it does not imply
+ * that the channel is failed or closed.
+ *
+ * Example of a failed operation without an exception and channel being closed
+ * is [Channel.trySend] attempt to a channel that is full.
*/
- public val isClosed: Boolean get() = holder is Closed
+ public val isFailure: Boolean get() = holder is Failed
/**
- * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise.
+ * Returns `true` if this instance represents unsuccessful operation
+ * to a closed or cancelled channel.
*
- * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]:
+ * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
+ * that [exceptionOrNull] returns non-null value.
*
- * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`.
- * On the other hand, the channel has the following `receive` variants:
- * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh?
- * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull`
- * * `receiveOrClosed`
- * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`.
+ * It can happen if the channel was [closed][Channel.close] normally without an exception.
*/
- @Suppress("UNCHECKED_CAST")
- public val value: T
- get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T
+ public val isClosed: Boolean get() = holder is Closed
/**
- * Returns the received value if this element represents a received value, or `null` otherwise.
- * :todo: Decide if it shall be made into extension that is available only for non-null T.
- * Note: it might become inconsistent with kotlin.Result
+ * Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
*/
@Suppress("UNCHECKED_CAST")
- public val valueOrNull: T?
- get() = if (holder is Closed) null else holder as T
+ public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
/**
- * :todo: Decide, if it is needed, how it shall be named with relation to [value].
- * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site,
- * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used.
- * Otherwise, it could be very hard to locate the source of the exception.
- * todo: it is commented for now, because it is not used
+ * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
*/
- //@Suppress("UNCHECKED_CAST")
- //public val valueOrThrow: T
- // get() = if (holder is Closed) throw holder.exception else holder as T
+ public fun getOrThrow(): T {
+ @Suppress("UNCHECKED_CAST")
+ if (holder !is Failed) return holder as T
+ if (holder is Closed && holder.cause != null) throw holder.cause
+ error("Trying to call 'getOrThrow' on a failed channel result: $holder")
+ }
/**
- * Returns the close cause of the channel if this instance represents a close cause, or throws
- * an [IllegalStateException] otherwise.
+ * Returns the encapsulated exception if this instance represents failure or `null` if it is success
+ * or unsuccessful operation to closed channel.
*/
- @Suppress("UNCHECKED_CAST")
- public val closeCause: Throwable? get() =
- if (holder is Closed) holder.cause else error("Channel was not closed")
+ public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
+
+ internal open class Failed {
+ override fun toString(): String = "Failed"
+ }
+
+ internal class Closed(@JvmField val cause: Throwable?): Failed() {
+ override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
+ override fun hashCode(): Int = cause.hashCode()
+ override fun toString(): String = "Closed($cause)"
+ }
+
+ @Suppress("NOTHING_TO_INLINE")
+ @InternalCoroutinesApi
+ public companion object {
+ private val failed = Failed()
+
+ @InternalCoroutinesApi
+ public fun <E> success(value: E): ChannelResult<E> =
+ ChannelResult(value)
+
+ @InternalCoroutinesApi
+ public fun <E> failure(): ChannelResult<E> =
+ ChannelResult(failed)
+
+ @InternalCoroutinesApi
+ public fun <E> closed(cause: Throwable?): ChannelResult<E> =
+ ChannelResult(Closed(cause))
+ }
- /**
- * @suppress
- */
public override fun toString(): String =
when (holder) {
is Closed -> holder.toString()
else -> "Value($holder)"
+ }
+}
+
+/**
+ * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the
+ * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed
+ * result.
+ */
+@OptIn(ExperimentalContracts::class)
+public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T {
+ contract {
+ callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE)
}
+ @Suppress("UNCHECKED_CAST")
+ return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T
+}
- internal class Closed(@JvmField val cause: Throwable?) {
- // todo: it is commented for now, because it is not used
- //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
- override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
- override fun hashCode(): Int = cause.hashCode()
- override fun toString(): String = "Closed($cause)"
+/**
+ * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess].
+ * Returns the original `ChannelResult` unchanged.
+ */
+@OptIn(ExperimentalContracts::class)
+public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> {
+ contract {
+ callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
+ @Suppress("UNCHECKED_CAST")
+ if (holder !is ChannelResult.Failed) action(holder as T)
+ return this
+}
- /**
- * todo: consider making value/closed constructors public in the future.
- */
- internal companion object {
- @Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> value(value: E): ValueOrClosed<E> =
- ValueOrClosed(value)
-
- @Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> =
- ValueOrClosed(Closed(cause))
+/**
+ * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure].
+ * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
+ *
+ * Returns the original `ChannelResult` unchanged.
+ */
+@OptIn(ExperimentalContracts::class)
+public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
+ contract {
+ callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
+ @Suppress("UNCHECKED_CAST")
+ if (holder is ChannelResult.Failed) action(exceptionOrNull())
+ return this
+}
+
+/**
+ * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]
+ * due to channel being [closed][Channel.close].
+ * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter.
+ * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend]
+ * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation.
+ *
+ * Returns the original `ChannelResult` unchanged.
+ */
+@OptIn(ExperimentalContracts::class)
+public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> {
+ contract {
+ callsInPlace(action, InvocationKind.AT_MOST_ONCE)
+ }
+ @Suppress("UNCHECKED_CAST")
+ if (holder is ChannelResult.Closed) action(exceptionOrNull())
+ return this
}
/**
@@ -493,14 +620,14 @@ public interface ChannelIterator<out E> {
*
* * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
* This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
- * [Sending][send] to this channel never suspends, and [offer] always returns `true`.
+ * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
- * This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the last element sent.
* Back-to-back sent elements are conflated &mdash; only the last sent element is received,
* while previously sent elements **are lost**.
- * [Sending][send] to this channel never suspends, and [offer] always returns `true`.
+ * [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
* This channel has an array buffer of a fixed `capacity`.
@@ -547,8 +674,6 @@ public interface ChannelIterator<out E> {
*
* * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
* send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
- * * When [offer][SendChannel.offer] operation throws an exception when
- * the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
* operation throws an exception when it had retrieved the element from the
* channel but was cancelled before the code following the receive call resumed.
diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
index b2b257de..57b2797d 100644
--- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
@@ -11,8 +11,10 @@ import kotlin.coroutines.*
internal open class ChannelCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: Channel<E>,
+ initParentJob: Boolean,
active: Boolean
-) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
+) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {
+
val channel: Channel<E> get() = this
override fun cancel() {
diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
index e3567e31..e0b4f9d2 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt
@@ -37,129 +37,40 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.()
}
/**
- * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
- * or returns `null` if the channel is [closed][Channel.isClosedForReceive].
+ * This function is deprecated in the favour of [ReceiveChannel.receiveCatching].
*
- * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
- * function is suspended, this function immediately resumes with [CancellationException].
- * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
- * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way
- * to tell if some element was already received from the channel or not. See [Channel] documentation for details.
+ * This function is considered error-prone for the following reasons;
+ * * Is throwing if the channel has failed even though its signature may suggest it returns 'null'
+ * * It is easy to forget that exception handling still have to be explicit
+ * * During code reviews and code reading, intentions of the code are frequently unclear:
+ * are potential exceptions ignored deliberately or not?
*
- * Note, that this function does not check for cancellation when it is not suspended.
- * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
- *
- * This extension is defined only for channels on non-null types, so that generic functions defined using
- * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
- * to find bugs.
+ * @suppress doc
*/
+@Deprecated(
+ "Deprecated in the favour of 'receiveCatching'",
+ ReplaceWith("receiveCatching().getOrNull()"),
+ DeprecationLevel.WARNING
+) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).receiveOrNull()
}
/**
- * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
- * is received from the channel or selects with `null` if the channel
- * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
- * the original [close][SendChannel.close] cause exception if the channel has _failed_.
- *
- * This extension is defined only for channels on non-null types, so that generic functions defined using
- * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard
- * to find bugs.
- **/
-@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x
+ * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching]
+ */
+@Deprecated(
+ "Deprecated in the favour of 'onReceiveCatching'",
+ level = DeprecationLevel.WARNING
+) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).onReceiveOrNull
}
/**
- * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@ObsoleteCoroutinesApi
-public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
- consume {
- for (element in this) action(element)
- }
-
-// -------- Operations on ReceiveChannel --------
-
-/**
- * Returns a [List] containing all elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- */
-@OptIn(ExperimentalStdlibApi::class)
-public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
- consumeEach {
- add(it)
- }
-}
-
-/**
- * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel]
- * with the corresponding cause. See also [ReceiveChannel.consume].
- *
- * **WARNING**: It is planned that in the future a second invocation of this method
- * on an channel that is already being consumed is going to fail fast, that it
- * immediately throws an [IllegalStateException].
- * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167)
- * for details.
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
- cancelConsumed(cause)
-}
-
-@PublishedApi
-internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
- cancel(cause?.let {
- it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
- })
-}
-
-/**
- * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the
- * specified [ReceiveChannel] instances with the corresponding cause.
- * See also [ReceiveChannel.consumes()] for a version on one channel.
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
- { cause: Throwable? ->
- var exception: Throwable? = null
- for (channel in channels)
- try {
- channel.cancelConsumed(cause)
- } catch (e: Throwable) {
- if (exception == null) {
- exception = e
- } else {
- exception.addSuppressedThrowable(e)
- }
- }
- exception?.let { throw it }
- }
-
-/**
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
*
@@ -194,2006 +105,35 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
}
/**
- * Performs the given [action] for each received element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) {
- var index = 0
- consumeEach {
- action(IndexedValue(index++, it))
- }
-}
-
-/**
- * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E =
- elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") }
-
-/**
- * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E =
- consume {
- if (index < 0)
- return defaultValue(index)
- var count = 0
- for (element in this) {
- if (index == count++)
- return element
- }
- return defaultValue(index)
- }
-
-/**
- * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
- consume {
- if (index < 0)
- return null
- var count = 0
- for (element in this) {
- if (index == count++)
- return element
- }
- return null
- }
-
-/**
- * Returns the first element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? =
- firstOrNull(predicate)
-
-/**
- * Returns the last element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? =
- lastOrNull(predicate)
-
-/**
- * Returns first element.
- * @throws [NoSuchElementException] if the channel is empty.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.first(): E =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- throw NoSuchElementException("ReceiveChannel is empty.")
- return iterator.next()
- }
-
-/**
- * Returns the first element matching the given [predicate].
- * @throws [NoSuchElementException] if no such element is found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E {
- consumeEach {
- if (predicate(it)) return it
- }
- throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
-}
-
-/**
- * Returns the first element, or `null` if the channel is empty.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- return null
- return iterator.next()
- }
-
-/**
- * Returns the first element matching the given [predicate], or `null` if element was not found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? {
- consumeEach {
- if (predicate(it)) return it
- }
- return null
-}
-
-/**
- * Returns first index of [element], or -1 if the channel does not contain element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
- var index = 0
- consumeEach {
- if (element == it)
- return index
- index++
- }
- return -1
-}
-
-/**
- * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int {
- var index = 0
- consumeEach {
- if (predicate(it))
- return index
- index++
- }
- return -1
-}
-
-/**
- * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int {
- var lastIndex = -1
- var index = 0
- consumeEach {
- if (predicate(it))
- lastIndex = index
- index++
- }
- return lastIndex
-}
-
-/**
- * Returns the last element.
- * @throws [NoSuchElementException] if the channel is empty.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.last(): E =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- throw NoSuchElementException("ReceiveChannel is empty.")
- var last = iterator.next()
- while (iterator.hasNext())
- last = iterator.next()
- return last
- }
-
-/**
- * Returns the last element matching the given [predicate].
- * @throws [NoSuchElementException] if no such element is found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E {
- var last: E? = null
- var found = false
- consumeEach {
- if (predicate(it)) {
- last = it
- found = true
- }
- }
- if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
- @Suppress("UNCHECKED_CAST")
- return last as E
-}
-
-/**
- * Returns last index of [element], or -1 if the channel does not contain element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
- var lastIndex = -1
- var index = 0
- consumeEach {
- if (element == it)
- lastIndex = index
- index++
- }
- return lastIndex
-}
-
-/**
- * Returns the last element, or `null` if the channel is empty.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- return null
- var last = iterator.next()
- while (iterator.hasNext())
- last = iterator.next()
- return last
- }
-
-/**
- * Returns the last element matching the given [predicate], or `null` if no such element was found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? {
- var last: E? = null
- consumeEach {
- if (predicate(it)) {
- last = it
- }
- }
- return last
-}
-
-/**
- * Returns the single element, or throws an exception if the channel is empty or has more than one element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.single(): E =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- throw NoSuchElementException("ReceiveChannel is empty.")
- val single = iterator.next()
- if (iterator.hasNext())
- throw IllegalArgumentException("ReceiveChannel has more than one element.")
- return single
- }
-
-/**
- * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E {
- var single: E? = null
- var found = false
- consumeEach {
- if (predicate(it)) {
- if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.")
- single = it
- found = true
- }
- }
- if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.")
- @Suppress("UNCHECKED_CAST")
- return single as E
-}
-
-/**
- * Returns single element, or `null` if the channel is empty or has more than one element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext())
- return null
- val single = iterator.next()
- if (iterator.hasNext())
- return null
- return single
- }
-
-/**
- * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? {
- var single: E? = null
- var found = false
- consumeEach {
- if (predicate(it)) {
- if (found) return null
- single = it
- found = true
- }
- }
- if (!found) return null
- return single
-}
-
-/**
- * Returns a channel containing all elements except first [n] elements.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- require(n >= 0) { "Requested element count $n is less than zero." }
- var remaining: Int = n
- if (remaining > 0)
- for (e in this@drop) {
- remaining--
- if (remaining == 0)
- break
- }
- for (e in this@drop) {
- send(e)
- }
- }
-
-/**
- * Returns a channel containing all elements except first elements that satisfy the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- for (e in this@dropWhile) {
- if (!predicate(e)) {
- send(e)
- break
- }
- }
- for (e in this@dropWhile) {
- send(e)
- }
- }
-
-/**
- * Returns a channel containing only elements matching the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- for (e in this@filter) {
- if (predicate(e)) send(e)
- }
- }
-
-/**
- * Returns a channel containing only elements matching the given [predicate].
- * @param [predicate] function that takes the index of an element and the element itself
- * and returns the result of predicate evaluation on the element.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- var index = 0
- for (e in this@filterIndexed) {
- if (predicate(index++, e)) send(e)
- }
- }
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- * @param [predicate] function that takes the index of an element and the element itself
- * and returns the result of predicate evaluation on the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
- consumeEachIndexed { (index, element) ->
- if (predicate(index, element)) destination.add(element)
- }
- return destination
-}
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- * @param [predicate] function that takes the index of an element and the element itself
- * and returns the result of predicate evaluation on the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C {
- consumeEachIndexed { (index, element) ->
- if (predicate(index, element)) destination.send(element)
- }
- return destination
-}
-
-/**
- * Returns a channel containing all elements not matching the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- filter(context) { !predicate(it) }
-
-/**
- * Returns a channel containing all elements that are not `null`.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-@Suppress("UNCHECKED_CAST")
-public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
- filter { it != null } as ReceiveChannel<E>
-
-/**
- * Appends all elements that are not `null` to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
- consumeEach {
- if (it != null) destination.add(it)
- }
- return destination
-}
-
-/**
- * Appends all elements that are not `null` to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
- consumeEach {
- if (it != null) destination.send(it)
- }
- return destination
-}
-
-/**
- * Appends all elements not matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
- consumeEach {
- if (!predicate(it)) destination.add(it)
- }
- return destination
-}
-
-/**
- * Appends all elements not matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C {
- consumeEach {
- if (!predicate(it)) destination.send(it)
- }
- return destination
-}
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
- consumeEach {
- if (predicate(it)) destination.add(it)
- }
- return destination
-}
-
-/**
- * Appends all elements matching the given [predicate] to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C {
- consumeEach {
- if (predicate(it)) destination.send(it)
- }
- return destination
-}
-
-/**
- * Returns a channel containing first [n] elements.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- if (n == 0) return@produce
- require(n >= 0) { "Requested element count $n is less than zero." }
- var remaining: Int = n
- for (e in this@take) {
- send(e)
- remaining--
- if (remaining == 0)
- return@produce
- }
- }
-
-/**
- * Returns a channel containing first elements satisfying the given [predicate].
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- for (e in this@takeWhile) {
- if (!predicate(e)) return@produce
- send(e)
- }
- }
-
-/**
- * Returns a [Map] containing key-value pairs provided by [transform] function
- * applied to elements of the given channel.
- *
- * If any of two pairs would have the same key the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> =
- associateTo(LinkedHashMap(), transform)
-
-/**
- * Returns a [Map] containing the elements from the given channel indexed by the key
- * returned from [keySelector] function applied to each element.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> =
- associateByTo(LinkedHashMap(), keySelector)
-
-/**
- * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The returned map preserves the entry iteration order of the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> =
- associateByTo(LinkedHashMap(), keySelector, valueTransform)
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs,
- * where key is provided by the [keySelector] function applied to each element of the given channel
- * and value is the element itself.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M {
- consumeEach {
- destination.put(keySelector(it), it)
- }
- return destination
-}
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs,
- * where key is provided by the [keySelector] function and
- * and value is provided by the [valueTransform] function applied to elements of the given channel.
- *
- * If any two elements would have the same key returned by [keySelector] the last one gets added to the map.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
- consumeEach {
- destination.put(keySelector(it), valueTransform(it))
- }
- return destination
-}
-
-/**
- * Populates and returns the [destination] mutable map with key-value pairs
- * provided by [transform] function applied to each element of the given channel.
- *
- * If any of two pairs would have the same key the last one gets added to the map.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M {
- consumeEach {
- destination += transform(it)
- }
- return destination
-}
-
-/**
- * Send each element of the original channel
- * and appends the results to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
- consumeEach {
- destination.send(it)
- }
- return destination
-}
-
-/**
- * Appends all elements to the given [destination] collection.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
- consumeEach {
- destination.add(it)
- }
- return destination
-}
-
-/**
- * Returns a [Map] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
- toMap(LinkedHashMap())
-
-/**
- * Returns a [MutableMap] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
- consumeEach {
- destination += it
- }
- return destination
-}
-
-/**
- * Returns a [MutableList] filled with all elements of this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
- toCollection(ArrayList())
-
-/**
- * Returns a [Set] of all elements.
- *
- * The returned set preserves the element iteration order of the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
- this.toMutableSet()
-
-/**
- * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- for (e in this@flatMap) {
- transform(e).toChannel(this)
- }
- }
-
-/**
- * Groups elements of the original channel by the key returned by the given [keySelector] function
- * applied to each element and returns a map where each group key is associated with a list of corresponding elements.
- *
- * The returned map preserves the entry iteration order of the keys produced from the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> =
- groupByTo(LinkedHashMap(), keySelector)
-
-/**
- * Groups values returned by the [valueTransform] function applied to each element of the original channel
- * by the key returned by the given [keySelector] function applied to the element
- * and returns a map where each group key is associated with a list of corresponding values.
- *
- * The returned map preserves the entry iteration order of the keys produced from the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> =
- groupByTo(LinkedHashMap(), keySelector, valueTransform)
-
-/**
- * Groups elements of the original channel by the key returned by the given [keySelector] function
- * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements.
- *
- * @return The [destination] map.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M {
- consumeEach {
- val key = keySelector(it)
- val list = destination.getOrPut(key) { ArrayList() }
- list.add(it)
- }
- return destination
-}
-
-/**
- * Groups values returned by the [valueTransform] function applied to each element of the original channel
- * by the key returned by the given [keySelector] function applied to the element
- * and puts to the [destination] map each group key associated with a list of corresponding values.
- *
- * @return The [destination] map.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M {
- consumeEach {
- val key = keySelector(it)
- val list = destination.getOrPut(key) { ArrayList() }
- list.add(valueTransform(it))
- }
- return destination
-}
-
-/**
- * Returns a channel containing the results of applying the given [transform] function
- * to each element in the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- consumeEach {
- send(transform(it))
- }
- }
-
-/**
- * Returns a channel containing the results of applying the given [transform] function
- * to each element and its index in the original channel.
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- var index = 0
- for (e in this@mapIndexed) {
- send(transform(index++, e))
- }
- }
-
-/**
- * Returns a channel containing only the non-null results of applying the given [transform] function
- * to each element and its index in the original channel.
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> =
- mapIndexed(context, transform).filterNotNull()
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends only the non-null results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
- consumeEachIndexed { (index, element) ->
- transform(index, element)?.let { destination.add(it) }
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends only the non-null results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C {
- consumeEachIndexed { (index, element) ->
- transform(index, element)?.let { destination.send(it) }
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends the results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
- var index = 0
- consumeEach {
- destination.add(transform(index++, it))
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element and its index in the original channel
- * and appends the results to the given [destination].
- * @param [transform] function that takes the index of an element and the element itself
- * and returns the result of the transform applied to the element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C {
- var index = 0
- consumeEach {
- destination.send(transform(index++, it))
- }
- return destination
-}
-
-/**
- * Returns a channel containing only the non-null results of applying the given [transform] function
- * to each element in the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> =
- map(context, transform).filterNotNull()
-
-/**
- * Applies the given [transform] function to each element in the original channel
- * and appends only the non-null results to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
- consumeEach {
- transform(it)?.let { destination.add(it) }
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element in the original channel
- * and appends only the non-null results to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C {
- consumeEach {
- transform(it)?.let { destination.send(it) }
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element of the original channel
- * and appends the results to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
- consumeEach {
- destination.add(transform(it))
- }
- return destination
-}
-
-/**
- * Applies the given [transform] function to each element of the original channel
- * and appends the results to the given [destination].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C {
- consumeEach {
- destination.send(transform(it))
- }
- return destination
-}
-
-/**
- * Returns a channel of [IndexedValue] for each element of the original channel.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- var index = 0
- for (e in this@withIndex) {
- send(IndexedValue(index++, e))
- }
- }
-
-/**
- * Returns a channel containing only distinct elements from the given channel.
- *
- * The elements in the resulting channel are in the same order as they were in the source channel.
- *
- * The operation is _intermediate_ and _stateful_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
- this.distinctBy { it }
-
-/**
- * Returns a channel containing only elements from the given channel
- * having distinct keys returned by the given [selector] function.
- *
- * The elements in the resulting channel are in the same order as they were in the source channel.
- *
- * The operation is _intermediate_ and _stateful_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> =
- GlobalScope.produce(context, onCompletion = consumes()) {
- val keys = HashSet<K>()
- for (e in this@distinctBy) {
- val k = selector(e)
- if (k !in keys) {
- send(e)
- keys += k
- }
- }
- }
-
-/**
- * Returns a mutable set containing all distinct elements from the given channel.
- *
- * The returned set preserves the element iteration order of the original channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
- toCollection(LinkedHashSet())
-
-/**
- * Returns `true` if all elements match the given [predicate].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean {
- consumeEach {
- if (!predicate(it)) return false
- }
- return true
-}
-
-/**
- * Returns `true` if channel has at least one element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
- consume {
- return iterator().hasNext()
- }
-
-/**
- * Returns `true` if at least one element matches the given [predicate].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean {
- consumeEach {
- if (predicate(it)) return true
- }
- return false
-}
-
-/**
- * Returns the number of elements in this channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.count(): Int {
- var count = 0
- consumeEach { count++ }
- return count
-}
-
-/**
- * Returns the number of elements matching the given [predicate].
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int {
- var count = 0
- consumeEach {
- if (predicate(it)) count++
- }
- return count
-}
-
-/**
- * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R {
- var accumulator = initial
- consumeEach {
- accumulator = operation(accumulator, it)
- }
- return accumulator
-}
-
-/**
- * Accumulates value starting with [initial] value and applying [operation] from left to right
- * to current accumulator value and each element with its index in the original channel.
- * @param [operation] function that takes the index of an element, current accumulator value
- * and the element itself, and calculates the next accumulator value.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R {
- var index = 0
- var accumulator = initial
- consumeEach {
- accumulator = operation(index++, accumulator, it)
- }
- return accumulator
-}
-
-/**
- * Returns the first element yielding the largest value of the given function or `null` if there are no elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext()) return null
- var maxElem = iterator.next()
- var maxValue = selector(maxElem)
- while (iterator.hasNext()) {
- val e = iterator.next()
- val v = selector(e)
- if (maxValue < v) {
- maxElem = e
- maxValue = v
- }
- }
- return maxElem
- }
-
-/**
- * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext()) return null
- var max = iterator.next()
- while (iterator.hasNext()) {
- val e = iterator.next()
- if (comparator.compare(max, e) < 0) max = e
- }
- return max
- }
-
-/**
- * Returns the first element yielding the smallest value of the given function or `null` if there are no elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext()) return null
- var minElem = iterator.next()
- var minValue = selector(minElem)
- while (iterator.hasNext()) {
- val e = iterator.next()
- val v = selector(e)
- if (minValue > v) {
- minElem = e
- minValue = v
- }
- }
- return minElem
- }
-
-/**
- * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
- consume {
- val iterator = iterator()
- if (!iterator.hasNext()) return null
- var min = iterator.next()
- while (iterator.hasNext()) {
- val e = iterator.next()
- if (comparator.compare(min, e) > 0) min = e
- }
- return min
- }
-
-/**
- * Returns `true` if the channel has no elements.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
- consume {
- return !iterator().hasNext()
- }
-
-/**
- * Returns `true` if no elements match the given [predicate].
+ * Returns a [List] containing all elements.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
*/
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean {
+@OptIn(ExperimentalStdlibApi::class)
+public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
- if (predicate(it)) return false
+ add(it)
}
- return true
}
/**
- * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S =
- consume {
- val iterator = this.iterator()
- if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
- var accumulator: S = iterator.next()
- while (iterator.hasNext()) {
- accumulator = operation(accumulator, iterator.next())
- }
- return accumulator
- }
-
-/**
- * Accumulates value starting with the first element and applying [operation] from left to right
- * to current accumulator value and each element with its index in the original channel.
- * @param [operation] function that takes the index of an element, current accumulator value
- * and the element itself and calculates the next accumulator value.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
+ * Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
*
* **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
* See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
*/
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S =
+@ObsoleteCoroutinesApi
+public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
- val iterator = this.iterator()
- if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.")
- var index = 1
- var accumulator: S = iterator.next()
- while (iterator.hasNext()) {
- accumulator = operation(index++, accumulator, iterator.next())
- }
- return accumulator
- }
-
-/**
- * Returns the sum of all values produced by [selector] function applied to each element in the channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int {
- var sum = 0
- consumeEach {
- sum += selector(it)
- }
- return sum
-}
-
-/**
- * Returns the sum of all values produced by [selector] function applied to each element in the channel.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double {
- var sum = 0.0
- consumeEach {
- sum += selector(it)
+ for (element in this) action(element)
}
- return sum
-}
-/**
- * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
- map { it ?: throw IllegalArgumentException("null element found in $this.") }
-/**
- * Splits the original channel into pair of lists,
- * where *first* list contains elements for which [predicate] yielded `true`,
- * while *second* list contains elements for which [predicate] yielded `false`.
- *
- * The operation is _terminal_.
- * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> {
- val first = ArrayList<E>()
- val second = ArrayList<E>()
- consumeEach {
- if (predicate(it)) {
- first.add(it)
- } else {
- second.add(it)
- }
- }
- return Pair(first, second)
+@PublishedApi
+internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
+ cancel(cause?.let {
+ it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it)
+ })
}
-/**
- * Returns a channel of pairs built from elements of both channels with same indexes.
- * Resulting channel has length of shortest input channel.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one.
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
- zip(other) { t1, t2 -> t1 to t2 }
-
-/**
- * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels.
- *
- * The operation is _intermediate_ and _stateless_.
- * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one.
- *
- * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.**
- * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254).
- */
-@Deprecated(
- message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x",
- level = DeprecationLevel.ERROR
-)
-public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
- GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
- val otherIterator = other.iterator()
- this@zip.consumeEach { element1 ->
- if (!otherIterator.hasNext()) return@consumeEach
- val element2 = otherIterator.next()
- send(transform(element1, element2))
- }
- }
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index f1d092e3..b768d7c3 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -17,7 +17,7 @@ import kotlin.jvm.*
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
- * Sender to this broadcast channel never suspends and [offer] always returns `true`.
+ * Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
@@ -26,10 +26,10 @@ import kotlin.jvm.*
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow]
- * when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow].
*/
-@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
+@ObsoleteCoroutinesApi
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Creates an instance of this class that already holds a value.
@@ -94,7 +94,6 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
}
public override val isClosedForSend: Boolean get() = _state.value is Closed
- public override val isFull: Boolean get() = false
@Suppress("UNCHECKED_CAST")
public override fun openSubscription(): ReceiveChannel<E> {
@@ -229,12 +228,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
- * future subscribers. This implementation always returns `true`.
- * It throws exception if the channel [isClosedForSend] (see [close] for details).
+ * future subscribers. This implementation always returns either successful result
+ * or closed with an exception.
*/
- public override fun offer(element: E): Boolean {
- offerInternal(element)?.let { throw it.sendException }
- return true
+ public override fun trySend(element: E): ChannelResult<Unit> {
+ offerInternal(element)?.let { return ChannelResult.closed(it.sendException) }
+ return ChannelResult.success(Unit)
}
@Suppress("UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
index 0e686447..f7f60cf9 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
@@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
/**
- * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
+ * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
* while previously sent elements **are lost**.
- * Sender to this channel never suspends and [offer] always returns `true`.
+ * Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*/
@@ -123,6 +123,7 @@ internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredEleme
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
}
+ @Suppress("UNCHECKED_CAST")
private fun updateValueLocked(element: Any?): UndeliveredElementException? {
val old = value
val undeliveredElementException = if (old === EMPTY) null else
diff --git a/kotlinx-coroutines-core/common/src/channels/Deprecated.kt b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt
new file mode 100644
index 00000000..2b9ed42d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt
@@ -0,0 +1,478 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:JvmMultifileClass
+@file:JvmName("ChannelsKt")
+@file:Suppress("unused")
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlin.coroutines.*
+import kotlin.jvm.*
+
+/** @suppress **/
+@PublishedApi // Binary compatibility
+internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
+ { cause: Throwable? ->
+ var exception: Throwable? = null
+ for (channel in channels)
+ try {
+ channel.cancelConsumed(cause)
+ } catch (e: Throwable) {
+ if (exception == null) {
+ exception = e
+ } else {
+ exception.addSuppressedThrowable(e)
+ }
+ }
+ exception?.let { throw it }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume {
+ if (index < 0)
+ throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
+ var count = 0
+ for (element in this) {
+ @Suppress("UNUSED_CHANGED_VALUE") // KT-47628
+ if (index == count++)
+ return element
+ }
+ throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.")
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? =
+ consume {
+ if (index < 0)
+ return null
+ var count = 0
+ for (element in this) {
+ if (index == count++)
+ return element
+ }
+ return null
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.first(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ return iterator.next()
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ return iterator.next()
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int {
+ var index = 0
+ consumeEach {
+ if (element == it)
+ return index
+ index++
+ }
+ return -1
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.last(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int {
+ var lastIndex = -1
+ var index = 0
+ consumeEach {
+ if (element == it)
+ lastIndex = index
+ index++
+ }
+ return lastIndex
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ var last = iterator.next()
+ while (iterator.hasNext())
+ last = iterator.next()
+ return last
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.single(): E =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ throw NoSuchElementException("ReceiveChannel is empty.")
+ val single = iterator.next()
+ if (iterator.hasNext())
+ throw IllegalArgumentException("ReceiveChannel has more than one element.")
+ return single
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext())
+ return null
+ val single = iterator.next()
+ if (iterator.hasNext())
+ return null
+ return single
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ if (remaining > 0)
+ for (e in this@drop) {
+ remaining--
+ if (remaining == 0)
+ break
+ }
+ for (e in this@drop) {
+ send(e)
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.dropWhile(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ predicate: suspend (E) -> Boolean
+): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@dropWhile) {
+ if (!predicate(e)) {
+ send(e)
+ break
+ }
+ }
+ for (e in this@dropWhile) {
+ send(e)
+ }
+ }
+
+@PublishedApi
+internal fun <E> ReceiveChannel<E>.filter(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ predicate: suspend (E) -> Boolean
+): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@filter) {
+ if (predicate(e)) send(e)
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.filterIndexed(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ predicate: suspend (index: Int, E) -> Boolean
+): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@filterIndexed) {
+ if (predicate(index++, e)) send(e)
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.filterNot(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ predicate: suspend (E) -> Boolean
+): ReceiveChannel<E> =
+ filter(context) { !predicate(it) }
+
+@PublishedApi
+@Suppress("UNCHECKED_CAST")
+internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> =
+ filter { it != null } as ReceiveChannel<E>
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.add(it)
+ }
+ return destination
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C {
+ consumeEach {
+ if (it != null) destination.send(it)
+ }
+ return destination
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ if (n == 0) return@produce
+ require(n >= 0) { "Requested element count $n is less than zero." }
+ var remaining: Int = n
+ for (e in this@take) {
+ send(e)
+ remaining--
+ if (remaining == 0)
+ return@produce
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.takeWhile(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ predicate: suspend (E) -> Boolean
+): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@takeWhile) {
+ if (!predicate(e)) return@produce
+ send(e)
+ }
+ }
+
+@PublishedApi
+internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C {
+ consumeEach {
+ destination.send(it)
+ }
+ return destination
+}
+
+@PublishedApi
+internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C {
+ consumeEach {
+ destination.add(it)
+ }
+ return destination
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> =
+ toMap(LinkedHashMap())
+
+@PublishedApi
+internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M {
+ consumeEach {
+ destination += it
+ }
+ return destination
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> =
+ toCollection(ArrayList())
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> =
+ this.toMutableSet()
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E, R> ReceiveChannel<E>.flatMap(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: suspend (E) -> ReceiveChannel<R>
+): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ for (e in this@flatMap) {
+ transform(e).toChannel(this)
+ }
+ }
+
+@PublishedApi
+internal fun <E, R> ReceiveChannel<E>.map(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: suspend (E) -> R
+): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ consumeEach {
+ send(transform(it))
+ }
+ }
+
+@PublishedApi
+internal fun <E, R> ReceiveChannel<E>.mapIndexed(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: suspend (index: Int, E) -> R
+): ReceiveChannel<R> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@mapIndexed) {
+ send(transform(index++, e))
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: suspend (index: Int, E) -> R?
+): ReceiveChannel<R> =
+ mapIndexed(context, transform).filterNotNull()
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: suspend (E) -> R?
+): ReceiveChannel<R> =
+ map(context, transform).filterNotNull()
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ var index = 0
+ for (e in this@withIndex) {
+ send(IndexedValue(index++, e))
+ }
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> =
+ this.distinctBy { it }
+
+@PublishedApi
+internal fun <E, K> ReceiveChannel<E>.distinctBy(
+ context: CoroutineContext = Dispatchers.Unconfined,
+ selector: suspend (E) -> K
+): ReceiveChannel<E> =
+ GlobalScope.produce(context, onCompletion = consumes()) {
+ val keys = HashSet<K>()
+ for (e in this@distinctBy) {
+ val k = selector(e)
+ if (k !in keys) {
+ send(e)
+ keys += k
+ }
+ }
+ }
+
+@PublishedApi
+internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> =
+ toCollection(LinkedHashSet())
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.any(): Boolean =
+ consume {
+ return iterator().hasNext()
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.count(): Int {
+ var count = 0
+ consumeEach { count++ }
+ return count
+}
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var max = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(max, e) < 0) max = e
+ }
+ return max
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? =
+ consume {
+ val iterator = iterator()
+ if (!iterator.hasNext()) return null
+ var min = iterator.next()
+ while (iterator.hasNext()) {
+ val e = iterator.next()
+ if (comparator.compare(min, e) > 0) min = e
+ }
+ return min
+ }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public suspend fun <E> ReceiveChannel<E>.none(): Boolean =
+ consume {
+ return !iterator().hasNext()
+ }
+
+/** @suppress **/
+@Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN)
+public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> =
+ map { it ?: throw IllegalArgumentException("null element found in $this.") }
+
+/** @suppress **/
+@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
+public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> =
+ zip(other) { t1, t2 -> t1 to t2 }
+
+@PublishedApi // Binary compatibility
+internal fun <E, R, V> ReceiveChannel<E>.zip(
+ other: ReceiveChannel<R>,
+ context: CoroutineContext = Dispatchers.Unconfined,
+ transform: (a: E, b: R) -> V
+): ReceiveChannel<V> =
+ GlobalScope.produce(context, onCompletion = consumesAll(this, other)) {
+ val otherIterator = other.iterator()
+ this@zip.consumeEach { element1 ->
+ if (!otherIterator.hasNext()) return@consumeEach
+ val element2 = otherIterator.next()
+ send(transform(element1, element2))
+ }
+ }
+
+@PublishedApi // Binary compatibility
+internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? ->
+ cancelConsumed(cause)
+}
diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
index 83175273..b5f607b2 100644
--- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
@@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.*
/**
* Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
- * Sender to this channel never suspends and [offer] always returns `true`.
+ * Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
*
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index 3c183587..3342fb6e 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce(
internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
-) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
+) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
override val isActive: Boolean
get() = super.isActive