diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/channels')
12 files changed, 881 insertions, 2344 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 9721583e..bcf19215 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -127,8 +127,7 @@ internal abstract class AbstractSendChannel<E>( // ------ SendChannel ------ public final override val isClosedForSend: Boolean get() = closedForSend != null - public override val isFull: Boolean get() = isFullImpl - protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull + private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull public final override suspend fun send(element: E) { // fast path -- try offer non-blocking @@ -137,23 +136,43 @@ internal abstract class AbstractSendChannel<E>( return sendSuspend(element) } - public final override fun offer(element: E): Boolean { + override fun offer(element: E): Boolean { + // Temporary migration for offer users who rely on onUndeliveredElement + try { + return super.offer(element) + } catch (e: Throwable) { + onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { + // If it crashes, add send exception as suppressed for better diagnostics + it.addSuppressed(e) + throw it + } + throw e + } + } + + public final override fun trySend(element: E): ChannelResult<Unit> { val result = offerInternal(element) return when { - result === OFFER_SUCCESS -> true + result === OFFER_SUCCESS -> ChannelResult.success(Unit) result === OFFER_FAILED -> { - // We should check for closed token on offer as well, otherwise offer won't be linearizable + // We should check for closed token on trySend as well, otherwise trySend won't be linearizable // in the face of concurrent close() // See https://github.com/Kotlin/kotlinx.coroutines/issues/359 - throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false)) + val closedForSend = closedForSend ?: return ChannelResult.failure() + ChannelResult.closed(helpCloseAndGetSendException(closedForSend)) } result is Closed<*> -> { - throw recoverStackTrace(helpCloseAndGetSendException(element, result)) + ChannelResult.closed(helpCloseAndGetSendException(result)) } - else -> error("offerInternal returned $result") + else -> error("trySend returned $result") } } + private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable { + helpClose(closed) + return closed.sendException + } + private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable { // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419 @@ -604,26 +623,8 @@ internal abstract class AbstractChannel<E>( if (result) onReceiveEnqueued() } - public final override suspend fun receiveOrNull(): E? { - // fast path -- try poll non-blocking - val result = pollInternal() - @Suppress("UNCHECKED_CAST") - if (result !== POLL_FAILED && result !is Closed<*>) return result as E - // slow-path does suspend - return receiveSuspend(RECEIVE_NULL_ON_CLOSE) - } - @Suppress("UNCHECKED_CAST") - private fun receiveOrNullResult(result: Any?): E? { - if (result is Closed<*>) { - if (result.closeCause != null) throw recoverStackTrace(result.closeCause) - return null - } - return result as E - } - - @Suppress("UNCHECKED_CAST") - public final override suspend fun receiveOrClosed(): ValueOrClosed<E> { + public final override suspend fun receiveCatching(): ChannelResult<E> { // fast path -- try poll non-blocking val result = pollInternal() if (result !== POLL_FAILED) return result.toResult() @@ -632,9 +633,11 @@ internal abstract class AbstractChannel<E>( } @Suppress("UNCHECKED_CAST") - public final override fun poll(): E? { + public final override fun tryReceive(): ChannelResult<E> { val result = pollInternal() - return if (result === POLL_FAILED) null else receiveOrNullResult(result) + if (result === POLL_FAILED) return ChannelResult.failure() + if (result is Closed<*>) return ChannelResult.closed(result.closeCause) + return ChannelResult.success(result as E) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") @@ -734,18 +737,10 @@ internal abstract class AbstractChannel<E>( } } - final override val onReceiveOrNull: SelectClause1<E?> - get() = object : SelectClause1<E?> { + final override val onReceiveCatching: SelectClause1<ChannelResult<E>> + get() = object : SelectClause1<ChannelResult<E>> { @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) { - registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R) - } - } - - final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> - get() = object : SelectClause1<ValueOrClosed<E>> { - @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) { + override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) { registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R) } } @@ -776,15 +771,7 @@ internal abstract class AbstractChannel<E>( } RECEIVE_RESULT -> { if (!select.trySelect()) return - startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion) - } - RECEIVE_NULL_ON_CLOSE -> { - if (value.closeCause == null) { - if (!select.trySelect()) return - startCoroutineUnintercepted(null, select.completion) - } else { - throw recoverStackTrace(value.receiveException) - } + startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion) } } } @@ -905,7 +892,7 @@ internal abstract class AbstractChannel<E>( @JvmField val receiveMode: Int ) : Receive<E>() { fun resumeValue(value: E): Any? = when (receiveMode) { - RECEIVE_RESULT -> ValueOrClosed.value(value) + RECEIVE_RESULT -> ChannelResult.success(value) else -> value } @@ -921,7 +908,6 @@ internal abstract class AbstractChannel<E>( override fun resumeReceiveClosed(closed: Closed<*>) { when { - receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>()) else -> cont.resumeWithException(closed.receiveException) } @@ -990,7 +976,7 @@ internal abstract class AbstractChannel<E>( @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( - if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, + if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value, select.completion, resumeOnCancellationFun(value) ) @@ -1000,12 +986,7 @@ internal abstract class AbstractChannel<E>( if (!select.trySelect()) return when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException) - RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion) - RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { - block.startCoroutineCancellable(null, select.completion) - } else { - select.resumeSelectWithException(closed.receiveException) - } + RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion) } } @@ -1023,8 +1004,7 @@ internal abstract class AbstractChannel<E>( // receiveMode values internal const val RECEIVE_THROWS_ON_CLOSE = 0 -internal const val RECEIVE_NULL_ON_CLOSE = 1 -internal const val RECEIVE_RESULT = 2 +internal const val RECEIVE_RESULT = 1 @JvmField @SharedImmutable @@ -1128,9 +1108,9 @@ internal class Closed<in E>( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed@$hexAddress[$closeCause]" @@ -1143,8 +1123,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose } @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") -private inline fun <E> Any?.toResult(): ValueOrClosed<E> = - if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E) +private inline fun <E> Any?.toResult(): ChannelResult<E> = + if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E) @Suppress("NOTHING_TO_INLINE") -private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause) +private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause) diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 4569ec72..7e6c0e68 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -49,7 +49,6 @@ internal open class ArrayChannel<E>( protected final override val isBufferAlwaysFull: Boolean get() = false protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND - override val isFull: Boolean get() = lock.withLock { isFullImpl } override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl } override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive } diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index 07e75976..b1c24b45 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.* * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with * the broadcasting coroutine in hard-to-specify ways. * - * **Note: This API is obsolete.** It will be deprecated and replaced with the - * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param start coroutine start option. The default value is [CoroutineStart.LAZY]. */ +@ObsoleteCoroutinesApi public fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY @@ -95,10 +97,12 @@ public fun <E> ReceiveChannel<E>.broadcast( * * ### Future replacement * + * This API is obsolete since 1.5.0. * This function has an inappropriate result type of [BroadcastChannel] which provides * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with - * the broadcasting coroutine in hard-to-specify ways. It will be replaced with - * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future. + * the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param capacity capacity of the channel's buffer (1 by default). @@ -106,6 +110,7 @@ public fun <E> ReceiveChannel<E>.broadcast( * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]). * @param block the coroutine code. */ +@ObsoleteCoroutinesApi public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, @@ -127,7 +132,13 @@ private open class BroadcastCoroutine<E>( parentContext: CoroutineContext, protected val _channel: BroadcastChannel<E>, active: Boolean -) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel { +) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active), + ProducerScope<E>, BroadcastChannel<E> by _channel { + + init { + initParentJob(parentContext[Job]) + } + override val isActive: Boolean get() = super.isActive override val channel: SendChannel<E> diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt index 6cd79373..c82b8dbd 100644 --- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt @@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED * See `BroadcastChannel()` factory function for the description of available * broadcast channel implementations. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public interface BroadcastChannel<E> : SendChannel<E> { /** * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it. @@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> { * * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity. * * otherwise -- throws [IllegalArgumentException]. * - * **Note: This is an experimental api.** It may be changed in the future updates. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow] + * and [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi +@ObsoleteCoroutinesApi public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = when (capacity) { 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index b8b81aac..b15c4262 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.internal.* import kotlin.jvm.* @@ -23,7 +24,7 @@ import kotlin.jvm.* public interface SendChannel<in E> { /** * Returns `true` if this channel was closed by an invocation of [close]. This means that - * calling [send] or [offer] will result in an exception. + * calling [send] will result in an exception. * * **Note: This is an experimental api.** This property may change its semantics and/or name in the future. */ @@ -31,16 +32,6 @@ public interface SendChannel<in E> { public val isClosedForSend: Boolean /** - * Returns `true` if the channel is full (out of capacity), which means that an attempt to [send] will suspend. - * This function returns `false` if the channel [is closed for `send`][isClosedForSend]. - * - * @suppress **Will be removed in next releases, no replacement.** - */ - @ExperimentalCoroutinesApi - @Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement") - public val isFull: Boolean - - /** * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). * @@ -60,7 +51,7 @@ public interface SendChannel<in E> { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onSend] clause. - * Use [offer] to try sending to this channel without waiting. + * Use [trySend] to try sending to this channel without waiting. */ public suspend fun send(element: E) @@ -73,19 +64,17 @@ public interface SendChannel<in E> { */ public val onSend: SelectClause2<E, SendChannel<E>> + /** * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, - * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off - * in situations when `send` suspends. - * - * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). + * and returns the successful result. Otherwise, returns failed or closed result. + * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws. * - * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it - * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed, - * then it calls `onUndeliveredElement` before throwing an exception. + * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and + * it does not call `onUndeliveredElement` that was installed for this channel. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. */ - public fun offer(element: E): Boolean + public fun trySend(element: E): ChannelResult<Unit> /** * Closes this channel. @@ -97,7 +86,7 @@ public interface SendChannel<in E> { * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements * are received. * - * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer] + * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive]. * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or * receive on a failed channel throw the specified [cause] exception. @@ -116,10 +105,11 @@ public interface SendChannel<in E> { * * the cause of `close` or `cancel` otherwise. * * Example of usage (exception handling is omitted): + * * ``` * val events = Channel(UNLIMITED) * callbackBasedApi.registerCallback { event -> - * events.offer(event) + * events.trySend(event) * } * * val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) { @@ -128,7 +118,6 @@ public interface SendChannel<in E> { * } * * events.invokeOnClose { callbackBasedApi.stop() } - * * ``` * * **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future. @@ -140,6 +129,44 @@ public interface SendChannel<in E> { */ @ExperimentalCoroutinesApi public fun invokeOnClose(handler: (cause: Throwable?) -> Unit) + + /** + * **Deprecated** offer method. + * + * This method was deprecated in the favour of [trySend]. + * It has proven itself as the most error-prone method in Channel API: + * + * * `Boolean` return type creates the false sense of security, implying that `false` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to + * oversee such error during code review. + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * **NB** Automatic migration provides best-effort for the user experience, but requires removal + * or adjusting of the code that relied on the exception handling. + * The complete replacement has a more verbose form: + * ``` + * channel.trySend(element) + * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } + * .isSuccess + * ``` + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + * + * @suppress **Deprecated**. + */ + @Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySend' method", + replaceWith = ReplaceWith("trySend(element).isSuccess") + ) // Warning since 1.5.0 + public fun offer(element: E): Boolean { + val result = trySend(element) + if (result.isSuccess) return true + throw recoverStackTrace(result.exceptionOrNull() ?: return false) + } } /** @@ -182,7 +209,7 @@ public interface ReceiveChannel<out E> { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceive] clause. - * Use [poll] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receive(): E @@ -195,94 +222,39 @@ public interface ReceiveChannel<out E> { public val onReceive: SelectClause1<E> /** - * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, - * or returns `null` if the channel is [closed for `receive`][isClosedForReceive] without cause, - * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with a [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrNull` call can retrieve the element from the channel, - * but then throw [CancellationException], thus failing to deliver the element. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. - * - * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. - * - * This function can be used in [select] invocations with the [onReceiveOrNull] clause. - * Use [poll] to try receiving from this channel without waiting. - * - * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension. - */ - @ObsoleteCoroutinesApi - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution - @Deprecated( - message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull") - ) - public suspend fun receiveOrNull(): E? - - /** - * Clause for the [select] expression of the [receiveOrNull] suspending function that selects with the element - * received from the channel or `null` if the channel is - * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension. - */ - @ObsoleteCoroutinesApi - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution - @Deprecated( - message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull") - ) - public val onReceiveOrNull: SelectClause1<E?> - - /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. - * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel - * or the close cause if the channel was closed. + * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel + * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally. + * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed]. * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with a [CancellationException]. * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel, + * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel, * but then throw [CancellationException], thus failing to deliver the element. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocations with the [onReceiveOrClosed] clause. - * Use [poll] to try receiving from this channel without waiting. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + * This function can be used in [select] invocations with the [onReceiveCatching] clause. + * Use [tryReceive] to try receiving from this channel without waiting. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public suspend fun receiveOrClosed(): ValueOrClosed<E> + public suspend fun receiveCatching(): ChannelResult<E> /** - * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value + * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value * that is received from the channel or with a close cause if the channel * [is closed for `receive`][isClosedForReceive]. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> + public val onReceiveCatching: SelectClause1<ChannelResult<E>> /** - * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty - * or is [is closed for `receive`][isClosedForReceive] without a cause. - * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. + * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] + * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] + * result if the channel is closed. */ - public fun poll(): E? + public fun tryReceive(): ChannelResult<E> /** * Returns a new iterator to receive elements from this channel using a `for` loop. @@ -318,107 +290,262 @@ public interface ReceiveChannel<out E> { */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(cause: Throwable? = null): Boolean + + /** + * **Deprecated** poll method. + * + * This method was deprecated in the favour of [tryReceive]. + * It has proven itself as error-prone method in Channel API: + * + * * Nullable return type creates the false sense of security, implying that `null` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + * + * ### Replacement note + * + * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and + * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception. + * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()` + * + * @suppress **Deprecated**. + */ + @Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'tryReceive'. " + + "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " + + "for the precise replacement please refer to the 'poll' documentation", + replaceWith = ReplaceWith("tryReceive().getOrNull()") + ) // Warning since 1.5.0 + public fun poll(): E? { + val result = tryReceive() + if (result.isSuccess) return result.getOrThrow() + throw recoverStackTrace(result.exceptionOrNull() ?: return null) + } + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * + * It had the following pitfalls: + * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" + * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' + * - It didn't really belong to core channel API and can be exposed as an extension instead. + * + * ### Replacement note + * + * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and + * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception. + * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`. + * + * @suppress **Deprecated** + */ + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + @LowPriorityInOverloadResolution + @Deprecated( + message = "Deprecated in favor of 'receiveCatching'. " + + "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " + + "for the detailed replacement please refer to the 'receiveOrNull' documentation", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("receiveCatching().getOrNull()") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0 + public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * See [receiveOrNull] documentation. + * + * @suppress **Deprecated**: in favor of onReceiveCatching extension. + */ + @Deprecated( + message = "Deprecated in favor of onReceiveCatching extension", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("onReceiveCatching") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0 + public val onReceiveOrNull: SelectClause1<E?> + get() { + return object : SelectClause1<E?> { + @InternalCoroutinesApi + override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) { + onReceiveCatching.registerSelectClause1(select) { + it.exceptionOrNull()?.let { throw it } + block(it.getOrNull()) + } + } + } + } } /** - * A discriminated union of [ReceiveChannel.receiveOrClosed] result - * that encapsulates either an element of type [T] successfully received from the channel or a close cause. + * A discriminated union of channel operation result. + * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with + * an optional cause. * - * :todo: Do not make it public before resolving todos in the code of this class. + * The successful result represents a successful operation with a value of type [T], for example, + * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend]. * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed. + * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. + * + * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. + * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving. */ -@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING") -@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed -public inline class ValueOrClosed<out T> -internal constructor(private val holder: Any?) { +@JvmInline +public value class ChannelResult<out T> +@PublishedApi internal constructor(@PublishedApi internal val holder: Any?) { /** - * Returns `true` if this instance represents a received element. - * In this case [isClosed] returns `false`. - * todo: it is commented for now, because it is not used + * Returns `true` if this instance represents a successful + * operation outcome. + * + * In this case [isFailure] and [isClosed] return `false`. */ - //public val isValue: Boolean get() = holder !is Closed + public val isSuccess: Boolean get() = holder !is Failed /** - * Returns `true` if this instance represents a close cause. - * In this case [isValue] returns `false`. + * Returns `true` if this instance represents unsuccessful operation. + * + * In this case [isSuccess] returns false, but it does not imply + * that the channel is failed or closed. + * + * Example of a failed operation without an exception and channel being closed + * is [Channel.trySend] attempt to a channel that is full. */ - public val isClosed: Boolean get() = holder is Closed + public val isFailure: Boolean get() = holder is Failed /** - * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise. + * Returns `true` if this instance represents unsuccessful operation + * to a closed or cancelled channel. * - * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]: + * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply + * that [exceptionOrNull] returns non-null value. * - * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`. - * On the other hand, the channel has the following `receive` variants: - * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh? - * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull` - * * `receiveOrClosed` - * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`. + * It can happen if the channel was [closed][Channel.close] normally without an exception. */ - @Suppress("UNCHECKED_CAST") - public val value: T - get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T + public val isClosed: Boolean get() = holder is Closed /** - * Returns the received value if this element represents a received value, or `null` otherwise. - * :todo: Decide if it shall be made into extension that is available only for non-null T. - * Note: it might become inconsistent with kotlin.Result + * Returns the encapsulated value if this instance represents success or `null` if it represents failed result. */ @Suppress("UNCHECKED_CAST") - public val valueOrNull: T? - get() = if (holder is Closed) null else holder as T + public fun getOrNull(): T? = if (holder !is Failed) holder as T else null /** - * :todo: Decide, if it is needed, how it shall be named with relation to [value]. - * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site, - * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used. - * Otherwise, it could be very hard to locate the source of the exception. - * todo: it is commented for now, because it is not used + * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. */ - //@Suppress("UNCHECKED_CAST") - //public val valueOrThrow: T - // get() = if (holder is Closed) throw holder.exception else holder as T + public fun getOrThrow(): T { + @Suppress("UNCHECKED_CAST") + if (holder !is Failed) return holder as T + if (holder is Closed && holder.cause != null) throw holder.cause + error("Trying to call 'getOrThrow' on a failed channel result: $holder") + } /** - * Returns the close cause of the channel if this instance represents a close cause, or throws - * an [IllegalStateException] otherwise. + * Returns the encapsulated exception if this instance represents failure or `null` if it is success + * or unsuccessful operation to closed channel. */ - @Suppress("UNCHECKED_CAST") - public val closeCause: Throwable? get() = - if (holder is Closed) holder.cause else error("Channel was not closed") + public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause + + internal open class Failed { + override fun toString(): String = "Failed" + } + + internal class Closed(@JvmField val cause: Throwable?): Failed() { + override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause + override fun hashCode(): Int = cause.hashCode() + override fun toString(): String = "Closed($cause)" + } + + @Suppress("NOTHING_TO_INLINE") + @InternalCoroutinesApi + public companion object { + private val failed = Failed() + + @InternalCoroutinesApi + public fun <E> success(value: E): ChannelResult<E> = + ChannelResult(value) + + @InternalCoroutinesApi + public fun <E> failure(): ChannelResult<E> = + ChannelResult(failed) + + @InternalCoroutinesApi + public fun <E> closed(cause: Throwable?): ChannelResult<E> = + ChannelResult(Closed(cause)) + } - /** - * @suppress - */ public override fun toString(): String = when (holder) { is Closed -> holder.toString() else -> "Value($holder)" + } +} + +/** + * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the + * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed + * result. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T { + contract { + callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T +} - internal class Closed(@JvmField val cause: Throwable?) { - // todo: it is commented for now, because it is not used - //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) - override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause - override fun hashCode(): Int = cause.hashCode() - override fun toString(): String = "Closed($cause)" +/** + * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess]. + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + if (holder !is ChannelResult.Failed) action(holder as T) + return this +} - /** - * todo: consider making value/closed constructors public in the future. - */ - internal companion object { - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> value(value: E): ValueOrClosed<E> = - ValueOrClosed(value) - - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> = - ValueOrClosed(Closed(cause)) +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Failed) action(exceptionOrNull()) + return this +} + +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure] + * due to channel being [closed][Channel.close]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Closed) action(exceptionOrNull()) + return this } /** @@ -493,14 +620,14 @@ public interface ChannelIterator<out E> { * * * When `capacity` is [Channel.UNLIMITED] — it creates a channel with effectively unlimited buffer. * This channel has a linked-list buffer of unlimited capacity (limited only by available memory). - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is [Channel.CONFLATED] — it creates a _conflated_ channel - * This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the last element sent. * Back-to-back sent elements are conflated — only the last sent element is received, * while previously sent elements **are lost**. - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is positive but less than [UNLIMITED] — it creates an array-based channel with the specified capacity. * This channel has an array buffer of a fixed `capacity`. @@ -547,8 +674,6 @@ public interface ChannelIterator<out E> { * * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually * send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. - * * When [offer][SendChannel.offer] operation throws an exception when - * the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext] * operation throws an exception when it had retrieved the element from the * channel but was cancelled before the code following the receive call resumed. diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index b2b257de..57b2797d 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -11,8 +11,10 @@ import kotlin.coroutines.* internal open class ChannelCoroutine<E>( parentContext: CoroutineContext, protected val _channel: Channel<E>, + initParentJob: Boolean, active: Boolean -) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel { +) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel { + val channel: Channel<E> get() = this override fun cancel() { diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index e3567e31..e0b4f9d2 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -37,129 +37,40 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() } /** - * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty] - * or returns `null` if the channel is [closed][Channel.isClosedForReceive]. + * This function is deprecated in the favour of [ReceiveChannel.receiveCatching]. * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way - * to tell if some element was already received from the channel or not. See [Channel] documentation for details. + * This function is considered error-prone for the following reasons; + * * Is throwing if the channel has failed even though its signature may suggest it returns 'null' + * * It is easy to forget that exception handling still have to be explicit + * * During code reviews and code reading, intentions of the code are frequently unclear: + * are potential exceptions ignored deliberately or not? * - * Note, that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. + * @suppress doc */ +@Deprecated( + "Deprecated in the favour of 'receiveCatching'", + ReplaceWith("receiveCatching().getOrNull()"), + DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 @Suppress("EXTENSION_SHADOWED_BY_MEMBER") -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel<E?>).receiveOrNull() } /** - * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that - * is received from the channel or selects with `null` if the channel - * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. - **/ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x + * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching] + */ +@Deprecated( + "Deprecated in the favour of 'onReceiveCatching'", + level = DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel<E?>).onReceiveOrNull } /** - * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@ObsoleteCoroutinesApi -public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit = - consume { - for (element in this) action(element) - } - -// -------- Operations on ReceiveChannel -------- - -/** - * Returns a [List] containing all elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - */ -@OptIn(ExperimentalStdlibApi::class) -public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { - consumeEach { - add(it) - } -} - -/** - * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel] - * with the corresponding cause. See also [ReceiveChannel.consume]. - * - * **WARNING**: It is planned that in the future a second invocation of this method - * on an channel that is already being consumed is going to fail fast, that it - * immediately throws an [IllegalStateException]. - * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167) - * for details. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> - cancelConsumed(cause) -} - -@PublishedApi -internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { - cancel(cause?.let { - it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) - }) -} - -/** - * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the - * specified [ReceiveChannel] instances with the corresponding cause. - * See also [ReceiveChannel.consumes()] for a version on one channel. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = - { cause: Throwable? -> - var exception: Throwable? = null - for (channel in channels) - try { - channel.cancelConsumed(cause) - } catch (e: Throwable) { - if (exception == null) { - exception = e - } else { - exception.addSuppressedThrowable(e) - } - } - exception?.let { throw it } - } - -/** * Makes sure that the given [block] consumes all elements from the given channel * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. * @@ -194,2006 +105,35 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) } /** - * Performs the given [action] for each received element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) { - var index = 0 - consumeEach { - action(IndexedValue(index++, it)) - } -} - -/** - * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = - elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") } - -/** - * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E = - consume { - if (index < 0) - return defaultValue(index) - var count = 0 - for (element in this) { - if (index == count++) - return element - } - return defaultValue(index) - } - -/** - * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? = - consume { - if (index < 0) - return null - var count = 0 - for (element in this) { - if (index == count++) - return element - } - return null - } - -/** - * Returns the first element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? = - firstOrNull(predicate) - -/** - * Returns the last element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? = - lastOrNull(predicate) - -/** - * Returns first element. - * @throws [NoSuchElementException] if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.first(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - return iterator.next() - } - -/** - * Returns the first element matching the given [predicate]. - * @throws [NoSuchElementException] if no such element is found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E { - consumeEach { - if (predicate(it)) return it - } - throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") -} - -/** - * Returns the first element, or `null` if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - return iterator.next() - } - -/** - * Returns the first element matching the given [predicate], or `null` if element was not found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? { - consumeEach { - if (predicate(it)) return it - } - return null -} - -/** - * Returns first index of [element], or -1 if the channel does not contain element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int { - var index = 0 - consumeEach { - if (element == it) - return index - index++ - } - return -1 -} - -/** - * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int { - var index = 0 - consumeEach { - if (predicate(it)) - return index - index++ - } - return -1 -} - -/** - * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int { - var lastIndex = -1 - var index = 0 - consumeEach { - if (predicate(it)) - lastIndex = index - index++ - } - return lastIndex -} - -/** - * Returns the last element. - * @throws [NoSuchElementException] if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.last(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - var last = iterator.next() - while (iterator.hasNext()) - last = iterator.next() - return last - } - -/** - * Returns the last element matching the given [predicate]. - * @throws [NoSuchElementException] if no such element is found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E { - var last: E? = null - var found = false - consumeEach { - if (predicate(it)) { - last = it - found = true - } - } - if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") - @Suppress("UNCHECKED_CAST") - return last as E -} - -/** - * Returns last index of [element], or -1 if the channel does not contain element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int { - var lastIndex = -1 - var index = 0 - consumeEach { - if (element == it) - lastIndex = index - index++ - } - return lastIndex -} - -/** - * Returns the last element, or `null` if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - var last = iterator.next() - while (iterator.hasNext()) - last = iterator.next() - return last - } - -/** - * Returns the last element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? { - var last: E? = null - consumeEach { - if (predicate(it)) { - last = it - } - } - return last -} - -/** - * Returns the single element, or throws an exception if the channel is empty or has more than one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.single(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - val single = iterator.next() - if (iterator.hasNext()) - throw IllegalArgumentException("ReceiveChannel has more than one element.") - return single - } - -/** - * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E { - var single: E? = null - var found = false - consumeEach { - if (predicate(it)) { - if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.") - single = it - found = true - } - } - if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") - @Suppress("UNCHECKED_CAST") - return single as E -} - -/** - * Returns single element, or `null` if the channel is empty or has more than one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - val single = iterator.next() - if (iterator.hasNext()) - return null - return single - } - -/** - * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? { - var single: E? = null - var found = false - consumeEach { - if (predicate(it)) { - if (found) return null - single = it - found = true - } - } - if (!found) return null - return single -} - -/** - * Returns a channel containing all elements except first [n] elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - require(n >= 0) { "Requested element count $n is less than zero." } - var remaining: Int = n - if (remaining > 0) - for (e in this@drop) { - remaining-- - if (remaining == 0) - break - } - for (e in this@drop) { - send(e) - } - } - -/** - * Returns a channel containing all elements except first elements that satisfy the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@dropWhile) { - if (!predicate(e)) { - send(e) - break - } - } - for (e in this@dropWhile) { - send(e) - } - } - -/** - * Returns a channel containing only elements matching the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@filter) { - if (predicate(e)) send(e) - } - } - -/** - * Returns a channel containing only elements matching the given [predicate]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@filterIndexed) { - if (predicate(index++, e)) send(e) - } - } - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { - consumeEachIndexed { (index, element) -> - if (predicate(index, element)) destination.add(element) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { - consumeEachIndexed { (index, element) -> - if (predicate(index, element)) destination.send(element) - } - return destination -} - -/** - * Returns a channel containing all elements not matching the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - filter(context) { !predicate(it) } - -/** - * Returns a channel containing all elements that are not `null`. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -@Suppress("UNCHECKED_CAST") -public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> = - filter { it != null } as ReceiveChannel<E> - -/** - * Appends all elements that are not `null` to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { - consumeEach { - if (it != null) destination.add(it) - } - return destination -} - -/** - * Appends all elements that are not `null` to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { - consumeEach { - if (it != null) destination.send(it) - } - return destination -} - -/** - * Appends all elements not matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (!predicate(it)) destination.add(it) - } - return destination -} - -/** - * Appends all elements not matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (!predicate(it)) destination.send(it) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (predicate(it)) destination.add(it) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (predicate(it)) destination.send(it) - } - return destination -} - -/** - * Returns a channel containing first [n] elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - if (n == 0) return@produce - require(n >= 0) { "Requested element count $n is less than zero." } - var remaining: Int = n - for (e in this@take) { - send(e) - remaining-- - if (remaining == 0) - return@produce - } - } - -/** - * Returns a channel containing first elements satisfying the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@takeWhile) { - if (!predicate(e)) return@produce - send(e) - } - } - -/** - * Returns a [Map] containing key-value pairs provided by [transform] function - * applied to elements of the given channel. - * - * If any of two pairs would have the same key the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> = - associateTo(LinkedHashMap(), transform) - -/** - * Returns a [Map] containing the elements from the given channel indexed by the key - * returned from [keySelector] function applied to each element. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> = - associateByTo(LinkedHashMap(), keySelector) - -/** - * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> = - associateByTo(LinkedHashMap(), keySelector, valueTransform) - -/** - * Populates and returns the [destination] mutable map with key-value pairs, - * where key is provided by the [keySelector] function applied to each element of the given channel - * and value is the element itself. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M { - consumeEach { - destination.put(keySelector(it), it) - } - return destination -} - -/** - * Populates and returns the [destination] mutable map with key-value pairs, - * where key is provided by the [keySelector] function and - * and value is provided by the [valueTransform] function applied to elements of the given channel. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { - consumeEach { - destination.put(keySelector(it), valueTransform(it)) - } - return destination -} - -/** - * Populates and returns the [destination] mutable map with key-value pairs - * provided by [transform] function applied to each element of the given channel. - * - * If any of two pairs would have the same key the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M { - consumeEach { - destination += transform(it) - } - return destination -} - -/** - * Send each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C { - consumeEach { - destination.send(it) - } - return destination -} - -/** - * Appends all elements to the given [destination] collection. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C { - consumeEach { - destination.add(it) - } - return destination -} - -/** - * Returns a [Map] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> = - toMap(LinkedHashMap()) - -/** - * Returns a [MutableMap] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M { - consumeEach { - destination += it - } - return destination -} - -/** - * Returns a [MutableList] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> = - toCollection(ArrayList()) - -/** - * Returns a [Set] of all elements. - * - * The returned set preserves the element iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> = - this.toMutableSet() - -/** - * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@flatMap) { - transform(e).toChannel(this) - } - } - -/** - * Groups elements of the original channel by the key returned by the given [keySelector] function - * applied to each element and returns a map where each group key is associated with a list of corresponding elements. - * - * The returned map preserves the entry iteration order of the keys produced from the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> = - groupByTo(LinkedHashMap(), keySelector) - -/** - * Groups values returned by the [valueTransform] function applied to each element of the original channel - * by the key returned by the given [keySelector] function applied to the element - * and returns a map where each group key is associated with a list of corresponding values. - * - * The returned map preserves the entry iteration order of the keys produced from the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> = - groupByTo(LinkedHashMap(), keySelector, valueTransform) - -/** - * Groups elements of the original channel by the key returned by the given [keySelector] function - * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements. - * - * @return The [destination] map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M { - consumeEach { - val key = keySelector(it) - val list = destination.getOrPut(key) { ArrayList() } - list.add(it) - } - return destination -} - -/** - * Groups values returned by the [valueTransform] function applied to each element of the original channel - * by the key returned by the given [keySelector] function applied to the element - * and puts to the [destination] map each group key associated with a list of corresponding values. - * - * @return The [destination] map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { - consumeEach { - val key = keySelector(it) - val list = destination.getOrPut(key) { ArrayList() } - list.add(valueTransform(it)) - } - return destination -} - -/** - * Returns a channel containing the results of applying the given [transform] function - * to each element in the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - consumeEach { - send(transform(it)) - } - } - -/** - * Returns a channel containing the results of applying the given [transform] function - * to each element and its index in the original channel. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@mapIndexed) { - send(transform(index++, e)) - } - } - -/** - * Returns a channel containing only the non-null results of applying the given [transform] function - * to each element and its index in the original channel. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> = - mapIndexed(context, transform).filterNotNull() - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends only the non-null results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { - consumeEachIndexed { (index, element) -> - transform(index, element)?.let { destination.add(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends only the non-null results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { - consumeEachIndexed { (index, element) -> - transform(index, element)?.let { destination.send(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends the results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { - var index = 0 - consumeEach { - destination.add(transform(index++, it)) - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends the results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { - var index = 0 - consumeEach { - destination.send(transform(index++, it)) - } - return destination -} - -/** - * Returns a channel containing only the non-null results of applying the given [transform] function - * to each element in the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> = - map(context, transform).filterNotNull() - -/** - * Applies the given [transform] function to each element in the original channel - * and appends only the non-null results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C { - consumeEach { - transform(it)?.let { destination.add(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element in the original channel - * and appends only the non-null results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C { - consumeEach { - transform(it)?.let { destination.send(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C { - consumeEach { - destination.add(transform(it)) - } - return destination -} - -/** - * Applies the given [transform] function to each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C { - consumeEach { - destination.send(transform(it)) - } - return destination -} - -/** - * Returns a channel of [IndexedValue] for each element of the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@withIndex) { - send(IndexedValue(index++, e)) - } - } - -/** - * Returns a channel containing only distinct elements from the given channel. - * - * The elements in the resulting channel are in the same order as they were in the source channel. - * - * The operation is _intermediate_ and _stateful_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> = - this.distinctBy { it } - -/** - * Returns a channel containing only elements from the given channel - * having distinct keys returned by the given [selector] function. - * - * The elements in the resulting channel are in the same order as they were in the source channel. - * - * The operation is _intermediate_ and _stateful_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - val keys = HashSet<K>() - for (e in this@distinctBy) { - val k = selector(e) - if (k !in keys) { - send(e) - keys += k - } - } - } - -/** - * Returns a mutable set containing all distinct elements from the given channel. - * - * The returned set preserves the element iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> = - toCollection(LinkedHashSet()) - -/** - * Returns `true` if all elements match the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean { - consumeEach { - if (!predicate(it)) return false - } - return true -} - -/** - * Returns `true` if channel has at least one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.any(): Boolean = - consume { - return iterator().hasNext() - } - -/** - * Returns `true` if at least one element matches the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean { - consumeEach { - if (predicate(it)) return true - } - return false -} - -/** - * Returns the number of elements in this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.count(): Int { - var count = 0 - consumeEach { count++ } - return count -} - -/** - * Returns the number of elements matching the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int { - var count = 0 - consumeEach { - if (predicate(it)) count++ - } - return count -} - -/** - * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R { - var accumulator = initial - consumeEach { - accumulator = operation(accumulator, it) - } - return accumulator -} - -/** - * Accumulates value starting with [initial] value and applying [operation] from left to right - * to current accumulator value and each element with its index in the original channel. - * @param [operation] function that takes the index of an element, current accumulator value - * and the element itself, and calculates the next accumulator value. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R { - var index = 0 - var accumulator = initial - consumeEach { - accumulator = operation(index++, accumulator, it) - } - return accumulator -} - -/** - * Returns the first element yielding the largest value of the given function or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var maxElem = iterator.next() - var maxValue = selector(maxElem) - while (iterator.hasNext()) { - val e = iterator.next() - val v = selector(e) - if (maxValue < v) { - maxElem = e - maxValue = v - } - } - return maxElem - } - -/** - * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var max = iterator.next() - while (iterator.hasNext()) { - val e = iterator.next() - if (comparator.compare(max, e) < 0) max = e - } - return max - } - -/** - * Returns the first element yielding the smallest value of the given function or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var minElem = iterator.next() - var minValue = selector(minElem) - while (iterator.hasNext()) { - val e = iterator.next() - val v = selector(e) - if (minValue > v) { - minElem = e - minValue = v - } - } - return minElem - } - -/** - * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var min = iterator.next() - while (iterator.hasNext()) { - val e = iterator.next() - if (comparator.compare(min, e) > 0) min = e - } - return min - } - -/** - * Returns `true` if the channel has no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.none(): Boolean = - consume { - return !iterator().hasNext() - } - -/** - * Returns `true` if no elements match the given [predicate]. + * Returns a [List] containing all elements. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean { +@OptIn(ExperimentalStdlibApi::class) +public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { consumeEach { - if (predicate(it)) return false + add(it) } - return true } /** - * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S = - consume { - val iterator = this.iterator() - if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") - var accumulator: S = iterator.next() - while (iterator.hasNext()) { - accumulator = operation(accumulator, iterator.next()) - } - return accumulator - } - -/** - * Accumulates value starting with the first element and applying [operation] from left to right - * to current accumulator value and each element with its index in the original channel. - * @param [operation] function that takes the index of an element, current accumulator value - * and the element itself and calculates the next accumulator value. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. * * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S = +@ObsoleteCoroutinesApi +public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit = consume { - val iterator = this.iterator() - if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") - var index = 1 - var accumulator: S = iterator.next() - while (iterator.hasNext()) { - accumulator = operation(index++, accumulator, iterator.next()) - } - return accumulator - } - -/** - * Returns the sum of all values produced by [selector] function applied to each element in the channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int { - var sum = 0 - consumeEach { - sum += selector(it) - } - return sum -} - -/** - * Returns the sum of all values produced by [selector] function applied to each element in the channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double { - var sum = 0.0 - consumeEach { - sum += selector(it) + for (element in this) action(element) } - return sum -} -/** - * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> = - map { it ?: throw IllegalArgumentException("null element found in $this.") } -/** - * Splits the original channel into pair of lists, - * where *first* list contains elements for which [predicate] yielded `true`, - * while *second* list contains elements for which [predicate] yielded `false`. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> { - val first = ArrayList<E>() - val second = ArrayList<E>() - consumeEach { - if (predicate(it)) { - first.add(it) - } else { - second.add(it) - } - } - return Pair(first, second) +@PublishedApi +internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { + cancel(cause?.let { + it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) + }) } -/** - * Returns a channel of pairs built from elements of both channels with same indexes. - * Resulting channel has length of shortest input channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> = - zip(other) { t1, t2 -> t1 to t2 } - -/** - * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> = - GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { - val otherIterator = other.iterator() - this@zip.consumeEach { element1 -> - if (!otherIterator.hasNext()) return@consumeEach - val element2 = otherIterator.next() - send(transform(element1, element2)) - } - } diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index f1d092e3..b768d7c3 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -17,7 +17,7 @@ import kotlin.jvm.* * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received, * while previously sent elements **are lost**. * Every subscriber immediately receives the most recently sent element. - * Sender to this broadcast channel never suspends and [offer] always returns `true`. + * Sender to this broadcast channel never suspends and [trySend] always succeeds. * * A secondary constructor can be used to create an instance of this class that already holds a value. * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation. @@ -26,10 +26,10 @@ import kotlin.jvm.* * [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the * number of subscribers. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Creates an instance of this class that already holds a value. @@ -94,7 +94,6 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { } public override val isClosedForSend: Boolean get() = _state.value is Closed - public override val isFull: Boolean get() = false @Suppress("UNCHECKED_CAST") public override fun openSubscription(): ReceiveChannel<E> { @@ -229,12 +228,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Sends the value to all subscribed receives and stores this value as the most recent state for - * future subscribers. This implementation always returns `true`. - * It throws exception if the channel [isClosedForSend] (see [close] for details). + * future subscribers. This implementation always returns either successful result + * or closed with an exception. */ - public override fun offer(element: E): Boolean { - offerInternal(element)?.let { throw it.sendException } - return true + public override fun trySend(element: E): ChannelResult<Unit> { + offerInternal(element)?.let { return ChannelResult.closed(it.sendException) } + return ChannelResult.success(Unit) } @Suppress("UNCHECKED_CAST") diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 0e686447..f7f60cf9 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* /** - * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the most recently sent element. * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, * while previously sent elements **are lost**. - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation. */ @@ -123,6 +123,7 @@ internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredEleme undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } + @Suppress("UNCHECKED_CAST") private fun updateValueLocked(element: Any?): UndeliveredElementException? { val old = value val undeliveredElementException = if (old === EMPTY) null else diff --git a/kotlinx-coroutines-core/common/src/channels/Deprecated.kt b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt new file mode 100644 index 00000000..2b9ed42d --- /dev/null +++ b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt @@ -0,0 +1,478 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:JvmMultifileClass +@file:JvmName("ChannelsKt") +@file:Suppress("unused") + +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.jvm.* + +/** @suppress **/ +@PublishedApi // Binary compatibility +internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = + { cause: Throwable? -> + var exception: Throwable? = null + for (channel in channels) + try { + channel.cancelConsumed(cause) + } catch (e: Throwable) { + if (exception == null) { + exception = e + } else { + exception.addSuppressedThrowable(e) + } + } + exception?.let { throw it } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume { + if (index < 0) + throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") + var count = 0 + for (element in this) { + @Suppress("UNUSED_CHANGED_VALUE") // KT-47628 + if (index == count++) + return element + } + throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? = + consume { + if (index < 0) + return null + var count = 0 + for (element in this) { + if (index == count++) + return element + } + return null + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.first(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + return iterator.next() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + return iterator.next() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int { + var index = 0 + consumeEach { + if (element == it) + return index + index++ + } + return -1 +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.last(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int { + var lastIndex = -1 + var index = 0 + consumeEach { + if (element == it) + lastIndex = index + index++ + } + return lastIndex +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.single(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + val single = iterator.next() + if (iterator.hasNext()) + throw IllegalArgumentException("ReceiveChannel has more than one element.") + return single + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + val single = iterator.next() + if (iterator.hasNext()) + return null + return single + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + require(n >= 0) { "Requested element count $n is less than zero." } + var remaining: Int = n + if (remaining > 0) + for (e in this@drop) { + remaining-- + if (remaining == 0) + break + } + for (e in this@drop) { + send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.dropWhile( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@dropWhile) { + if (!predicate(e)) { + send(e) + break + } + } + for (e in this@dropWhile) { + send(e) + } + } + +@PublishedApi +internal fun <E> ReceiveChannel<E>.filter( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@filter) { + if (predicate(e)) send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.filterIndexed( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (index: Int, E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@filterIndexed) { + if (predicate(index++, e)) send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.filterNot( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + filter(context) { !predicate(it) } + +@PublishedApi +@Suppress("UNCHECKED_CAST") +internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> = + filter { it != null } as ReceiveChannel<E> + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { + consumeEach { + if (it != null) destination.add(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { + consumeEach { + if (it != null) destination.send(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + if (n == 0) return@produce + require(n >= 0) { "Requested element count $n is less than zero." } + var remaining: Int = n + for (e in this@take) { + send(e) + remaining-- + if (remaining == 0) + return@produce + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.takeWhile( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@takeWhile) { + if (!predicate(e)) return@produce + send(e) + } + } + +@PublishedApi +internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C { + consumeEach { + destination.send(it) + } + return destination +} + +@PublishedApi +internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C { + consumeEach { + destination.add(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> = + toMap(LinkedHashMap()) + +@PublishedApi +internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M { + consumeEach { + destination += it + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> = + toCollection(ArrayList()) + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> = + this.toMutableSet() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R> ReceiveChannel<E>.flatMap( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> ReceiveChannel<R> +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@flatMap) { + transform(e).toChannel(this) + } + } + +@PublishedApi +internal fun <E, R> ReceiveChannel<E>.map( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> R +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + consumeEach { + send(transform(it)) + } + } + +@PublishedApi +internal fun <E, R> ReceiveChannel<E>.mapIndexed( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (index: Int, E) -> R +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@mapIndexed) { + send(transform(index++, e)) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (index: Int, E) -> R? +): ReceiveChannel<R> = + mapIndexed(context, transform).filterNotNull() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R : Any> ReceiveChannel<E>.mapNotNull( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> R? +): ReceiveChannel<R> = + map(context, transform).filterNotNull() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@withIndex) { + send(IndexedValue(index++, e)) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> = + this.distinctBy { it } + +@PublishedApi +internal fun <E, K> ReceiveChannel<E>.distinctBy( + context: CoroutineContext = Dispatchers.Unconfined, + selector: suspend (E) -> K +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + val keys = HashSet<K>() + for (e in this@distinctBy) { + val k = selector(e) + if (k !in keys) { + send(e) + keys += k + } + } + } + +@PublishedApi +internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> = + toCollection(LinkedHashSet()) + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.any(): Boolean = + consume { + return iterator().hasNext() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.count(): Int { + var count = 0 + consumeEach { count++ } + return count +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) return null + var max = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(max, e) < 0) max = e + } + return max + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) return null + var min = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(min, e) > 0) min = e + } + return min + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.none(): Boolean = + consume { + return !iterator().hasNext() + } + +/** @suppress **/ +@Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> = + map { it ?: throw IllegalArgumentException("null element found in $this.") } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> = + zip(other) { t1, t2 -> t1 to t2 } + +@PublishedApi // Binary compatibility +internal fun <E, R, V> ReceiveChannel<E>.zip( + other: ReceiveChannel<R>, + context: CoroutineContext = Dispatchers.Unconfined, + transform: (a: E, b: R) -> V +): ReceiveChannel<V> = + GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { + val otherIterator = other.iterator() + this@zip.consumeEach { element1 -> + if (!otherIterator.hasNext()) return@consumeEach + val element2 = otherIterator.next() + send(transform(element1, element2)) + } + } + +@PublishedApi // Binary compatibility +internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> + cancelConsumed(cause) +} diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 83175273..b5f607b2 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.* /** * Channel with linked-list buffer of a unlimited capacity (limited only by available memory). - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation. * diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 3c183587..3342fb6e 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce( internal open class ProducerCoroutine<E>( parentContext: CoroutineContext, channel: Channel<E> -) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> { +) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> { override val isActive: Boolean get() = super.isActive |