diff options
Diffstat (limited to 'kotlinx-coroutines-core/common')
80 files changed, 1974 insertions, 3236 deletions
diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md index 6712648a..fcfe334c 100644 --- a/kotlinx-coroutines-core/common/README.md +++ b/kotlinx-coroutines-core/common/README.md @@ -19,7 +19,7 @@ Coroutine dispatchers implementing [CoroutineDispatcher]: | [Dispatchers.Unconfined] | Does not confine coroutine execution in any way | [newSingleThreadContext] | Creates a single-threaded coroutine context | [newFixedThreadPoolContext] | Creates a thread pool of a fixed size -| [Executor.asCoroutineDispatcher][java.util.concurrent.Executor.asCoroutineDispatcher] | Extension to convert any executor +| [Executor.asCoroutineDispatcher][asCoroutineDispatcher] | Extension to convert any executor More context elements: @@ -57,9 +57,9 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | ---------------- | --------------------------------------------- | ------------------------------------------------ | -------------------------- | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted] | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] -| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] +| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none @@ -108,8 +108,8 @@ Low-level primitives for finer-grained control of coroutines. [Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html [newSingleThreadContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html [newFixedThreadPoolContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-fixed-thread-pool-context.html -[java.util.concurrent.Executor.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/java.util.concurrent.-executor/as-coroutine-dispatcher.html -[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-non-cancellable.html +[asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/as-coroutine-dispatcher.html +[NonCancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-non-cancellable/index.html [CoroutineExceptionHandler]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-exception-handler/index.html [delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html [yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html @@ -146,16 +146,16 @@ Low-level primitives for finer-grained control of coroutines. [kotlinx.coroutines.channels.SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html [kotlinx.coroutines.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html [kotlinx.coroutines.channels.SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html -[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html +[kotlinx.coroutines.channels.SendChannel.trySend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html [kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html -[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html -[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html -[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html +[kotlinx.coroutines.channels.ReceiveChannel.tryReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/try-receive.html +[kotlinx.coroutines.channels.ReceiveChannel.receiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-catching.html +[kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html <!--- INDEX kotlinx.coroutines.selects --> [kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html -[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html +[kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/on-timeout.html <!--- INDEX kotlinx.coroutines.test --> diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index af392b63..439a9ac7 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -8,7 +8,6 @@ package kotlinx.coroutines import kotlinx.coroutines.CoroutineStart.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* -import kotlin.jvm.* /** * Abstract base class for implementation of coroutines in coroutine builders. @@ -26,6 +25,9 @@ import kotlin.jvm.* * * [onCancelled] in invoked when the coroutine completes with an exception (cancelled). * * @param parentContext the context of the parent coroutine. + * @param initParentJob specifies whether the parent-child relationship should be instantiated directly + * in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class + * to invoke [initParentJob] manually. * @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state. * See [Job] for details. * @@ -33,13 +35,22 @@ import kotlin.jvm.* */ @InternalCoroutinesApi public abstract class AbstractCoroutine<in T>( - /** - * The context of the parent coroutine. - */ - @JvmField - protected val parentContext: CoroutineContext, - active: Boolean = true + parentContext: CoroutineContext, + initParentJob: Boolean, + active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { + + init { + /* + * Setup parent-child relationship between the parent in the context and the current coroutine. + * It may cause this coroutine to become _cancelling_ if the parent is already cancelled. + * It is dangerous to install parent-child relationship here if the coroutine class + * operates its state from within onCancelled or onCancelling + * (with exceptions for rx integrations that can't have any parent) + */ + if (initParentJob) initParentJob(parentContext[Job]) + } + /** * The context of this coroutine that includes this coroutine as a [Job]. */ @@ -54,28 +65,6 @@ public abstract class AbstractCoroutine<in T>( override val isActive: Boolean get() = super.isActive /** - * Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction. - * It shall be invoked at most once after construction after all other initialization. - * - * Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled, - * in which case it synchronously invokes all the corresponding handlers. - * @suppress **This is unstable API and it is subject to change.** - */ - internal fun initParentJob() { - initParentJobInternal(parentContext[Job]) - } - - /** - * This function is invoked once when a non-active coroutine (constructed with `active` set to `false) - * is [started][start]. - */ - protected open fun onStart() {} - - internal final override fun onStartInternal() { - onStart() - } - - /** * This function is invoked once when the job was completed normally with the specified [value], * right before all the waiters for the coroutine's completion are notified. */ @@ -127,26 +116,6 @@ public abstract class AbstractCoroutine<in T>( /** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. - * - * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine based on [start] parameter: - * - * * [DEFAULT] uses [startCoroutineCancellable]. - * * [ATOMIC] uses [startCoroutine]. - * * [UNDISPATCHED] uses [startCoroutineUndispatched]. - * * [LAZY] does nothing. - */ - public fun start(start: CoroutineStart, block: suspend () -> T) { - initParentJob() - start(block, this) - } - - /** - * Starts this coroutine with the given code [block] and [start] strategy. - * This function shall be invoked at most once on this coroutine. - * - * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. @@ -154,7 +123,6 @@ public abstract class AbstractCoroutine<in T>( * * [LAZY] does nothing. */ public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { - initParentJob() start(block, receiver, this) } } diff --git a/kotlinx-coroutines-core/common/src/Annotations.kt b/kotlinx-coroutines-core/common/src/Annotations.kt index 70adad9b..724cc8cb 100644 --- a/kotlinx-coroutines-core/common/src/Annotations.kt +++ b/kotlinx-coroutines-core/common/src/Annotations.kt @@ -7,6 +7,22 @@ package kotlinx.coroutines import kotlinx.coroutines.flow.* /** + * Marks declarations in the coroutines that are **delicate** — + * they have limited use-case and shall be used with care in general code. + * Any use of a delicate declaration has to be carefully reviewed to make sure it is + * properly used and does not create problems like memory and resource leaks. + * Carefully read documentation of any declaration marked as `DelicateCoroutinesApi`. + */ +@MustBeDocumented +@Retention(value = AnnotationRetention.BINARY) +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This is a delicate API and its use requires care." + + " Make sure you fully read and understand documentation of the declaration that is marked as a delicate API." +) +public annotation class DelicateCoroutinesApi + +/** * Marks declarations that are still **experimental** in coroutines API, which means that the design of the * corresponding declarations has open issues which may (or may not) lead to their changes in the future. * Roughly speaking, there is a chance that those declarations will be deprecated in the near future or @@ -59,7 +75,7 @@ public annotation class ObsoleteCoroutinesApi @Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.TYPEALIAS, AnnotationTarget.PROPERTY) @RequiresOptIn( level = RequiresOptIn.Level.ERROR, message = "This is an internal kotlinx.coroutines API that " + - "should not be used from outside of kotlinx.coroutines. No compatibility guarantees are provided." + + "should not be used from outside of kotlinx.coroutines. No compatibility guarantees are provided. " + "It is recommended to report your use-case of internal API to kotlinx.coroutines issue tracker, " + "so stable API could be provided instead" ) diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index 93b3ee48..a11ffe9e 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -96,7 +96,7 @@ public fun <T> CoroutineScope.async( private open class DeferredCoroutine<T>( parentContext: CoroutineContext, active: Boolean -) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> { +) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> { override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T override val onAwait: SelectClause1<T> get() = this @@ -150,7 +150,7 @@ public suspend fun <T> withContext( val oldContext = uCont.context val newContext = oldContext + context // always check for cancellation of new context - newContext.checkCompletion() + newContext.ensureActive() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) @@ -167,7 +167,6 @@ public suspend fun <T> withContext( } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) - coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } @@ -188,7 +187,7 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke( private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean -) : AbstractCoroutine<Unit>(parentContext, active) { +) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 8f589912..b133b793 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> { public fun completeResume(token: Any) /** - * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. - * This function does nothing and is left only for binary compatibility with old compiled code. + * Internal function that setups cancellation behavior in [suspendCancellableCoroutine]. + * It's illegal to call this function in any non-`kotlinx.coroutines` code and + * such calls lead to undefined behaviour. + * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** */ @@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable( internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> { // If used outside of our dispatcher if (delegate !is DispatchedContinuation<T>) { - return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE) + return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } /* * Attempt to claim reusable instance. diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1a8f3566..1a0169b6 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>( */ private val _state = atomic<Any?>(Active) - private val _parentHandle = atomic<DisposableHandle?>(null) - private var parentHandle: DisposableHandle? - get() = _parentHandle.value - set(value) { _parentHandle.value = value } + private var parentHandle: DisposableHandle? = null internal val state: Any? get() = _state.value @@ -93,10 +90,24 @@ internal open class CancellableContinuationImpl<in T>( } public override fun initCancellability() { - setupCancellation() + /* + * Invariant: at the moment of invocation, `this` has not yet + * leaked to user code and no one is able to invoke `resume` or `cancel` + * on it yet. Also, this function is not invoked for reusable continuations. + */ + val handle = installParentHandle() + ?: return // fast path -- don't do anything without parent + // now check our state _after_ registering, could have completed while we were registering, + // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, + // so we are helping it and cleaning the node ourselves + if (isCompleted) { + // Can be invoked concurrently in 'parentCancelled', no problems here + handle.dispose() + parentHandle = NonDisposableHandle + } } - private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) + private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable() /** * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work. @@ -104,7 +115,7 @@ internal open class CancellableContinuationImpl<in T>( */ @JvmName("resetStateReusable") // Prettier stack traces internal fun resetStateReusable(): Boolean { - assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl + assert { resumeMode == MODE_CANCELLABLE_REUSABLE } assert { parentHandle !== NonDisposableHandle } val state = _state.value assert { state !is NotCompleted } @@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>( return true } - /** - * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function for reusable continuations - * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. - */ - private fun setupCancellation() { - if (checkCompleted()) return - if (parentHandle !== null) return // fast path 2 -- was already initialized - val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent - val handle = parent.invokeOnCompletion( - onCancelling = true, - handler = ChildContinuation(this).asHandler - ) - parentHandle = handle - // now check our state _after_ registering (could have completed while we were registering) - // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us - if (isCompleted && !isReusable()) { - handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle - parentHandle = NonDisposableHandle // release it just in case, to aid GC - } - } - - private fun checkCompleted(): Boolean { - val completed = isCompleted - if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations - val dispatched = delegate as? DispatchedContinuation<*> ?: return completed - val cause = dispatched.checkPostponedCancellation(this) ?: return completed - if (!completed) { - // Note: this cancel may fail if one more concurrent cancel is currently being invoked - cancel(cause) - } - return true - } - public override val callerFrame: CoroutineStackFrame? get() = delegate as? CoroutineStackFrame @@ -187,8 +164,9 @@ internal open class CancellableContinuationImpl<in T>( * Attempt to postpone cancellation for reusable cancellable continuation */ private fun cancelLater(cause: Throwable): Boolean { - if (!resumeMode.isReusableMode) return false - val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false + // Ensure that we are postponing cancellation to the right reusable instance + if (!isReusable()) return false + val dispatched = delegate as DispatchedContinuation<*> return dispatched.postponeCancellation(cause) } @@ -216,7 +194,7 @@ internal open class CancellableContinuationImpl<in T>( private inline fun callCancelHandlerSafely(block: () -> Unit) { try { - block() + block() } catch (ex: Throwable) { // Handler should never fail, if it does -- it is an unhandled exception handleCoroutineException( @@ -276,9 +254,37 @@ internal open class CancellableContinuationImpl<in T>( @PublishedApi internal fun getResult(): Any? { - setupCancellation() - if (trySuspend()) return COROUTINE_SUSPENDED + val isReusable = isReusable() + // trySuspend may fail either if 'block' has resumed/cancelled a continuation + // or we got async cancellation from parent. + if (trySuspend()) { + /* + * Invariant: parentHandle is `null` *only* for reusable continuations. + * We were neither resumed nor cancelled, time to suspend. + * But first we have to install parent cancellation handle (if we didn't yet), + * so CC could be properly resumed on parent cancellation. + * + * This read has benign data-race with write of 'NonDisposableHandle' + * in 'detachChildIfNotReusable'. + */ + if (parentHandle == null) { + installParentHandle() + } + /* + * Release the continuation after installing the handle (if needed). + * If we were successful, then do nothing, it's ok to reuse the instance now. + * Otherwise, dispose the handle by ourselves. + */ + if (isReusable) { + releaseClaimedReusableContinuation() + } + return COROUTINE_SUSPENDED + } // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state + if (isReusable) { + // release claimed reusable continuation for the future reuse + releaseClaimedReusableContinuation() + } val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) // if the parent job was already cancelled, then throw the corresponding cancellation exception @@ -296,6 +302,28 @@ internal open class CancellableContinuationImpl<in T>( return getSuccessfulResult(state) } + private fun installParentHandle(): DisposableHandle? { + val parent = context[Job] ?: return null // don't do anything without a parent + // Install the handle + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(this).asHandler + ) + parentHandle = handle + return handle + } + + /** + * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, + * in which case it detaches from the parent and cancels this continuation. + */ + private fun releaseClaimedReusableContinuation() { + // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return + detachChild() + cancel(cancellationCause) + } + override fun resumeWith(result: Result<T>) = resumeImpl(result.toState(this), resumeMode) @@ -462,11 +490,10 @@ internal open class CancellableContinuationImpl<in T>( /** * Detaches from the parent. - * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` */ internal fun detachChild() { - val handle = parentHandle - handle?.dispose() + val handle = parentHandle ?: return + handle.dispose() parentHandle = NonDisposableHandle } diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt index c8073796..5e76593d 100644 --- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt +++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt @@ -80,7 +80,7 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab private class CompletableDeferredImpl<T>( parent: Job? ) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> { - init { initParentJobInternal(parent) } + init { initParentJob(parent) } override val onCancelComplete get() = true override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index b2b88798..d5613d41 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -99,9 +99,13 @@ public abstract class CoroutineDispatcher : public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) - @InternalCoroutinesApi - public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { - (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() + public final override fun releaseInterceptedContinuation(continuation: Continuation<*>) { + /* + * Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`, + * any ClassCastException can only indicate compiler bug + */ + val dispatched = continuation as DispatchedContinuation<*> + dispatched.release() } /** diff --git a/kotlinx-coroutines-core/common/src/CoroutineScope.kt b/kotlinx-coroutines-core/common/src/CoroutineScope.kt index e7c243a4..627318f6 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineScope.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineScope.kt @@ -16,7 +16,10 @@ import kotlin.coroutines.intrinsics.* * is an extension on [CoroutineScope] and inherits its [coroutineContext][CoroutineScope.coroutineContext] * to automatically propagate all its elements and cancellation. * - * The best ways to obtain a standalone instance of the scope are [CoroutineScope()] and [MainScope()] factory functions. + * The best ways to obtain a standalone instance of the scope are [CoroutineScope()] and [MainScope()] factory functions, + * taking care to cancel these coroutine scopes when they are no longer needed (see section on custom usage below for + * explanation and example). + * * Additional context elements can be appended to the scope using the [plus][CoroutineScope.plus] operator. * * ### Convention for structured concurrency @@ -38,12 +41,23 @@ import kotlin.coroutines.intrinsics.* * * ### Custom usage * - * [CoroutineScope] should be implemented or declared as a property on entities with a well-defined lifecycle that are - * responsible for launching children coroutines, for example: + * `CoroutineScope` should be declared as a property on entities with a well-defined lifecycle that are + * responsible for launching children coroutines. The corresponding instance of `CoroutineScope` shall be created + * with either `CoroutineScope()` or `MainScope()` functions. The difference between them is only in the + * [CoroutineDispatcher]: + * + * * `CoroutineScope()` uses [Dispatchers.Default] for its coroutines. + * * `MainScope()` uses [Dispatchers.Main] for its coroutines. + * + * **The key part of custom usage of `CustomScope` is cancelling it and the end of the lifecycle.** + * The [CoroutineScope.cancel] extension function shall be used when the entity that was launching coroutines + * is no longer needed. It cancels all the coroutines that might still be running on behalf of it. + * + * For example: * * ``` * class MyUIClass { - * val scope = MainScope() // the scope of MyUIClass + * val scope = MainScope() // the scope of MyUIClass, uses Dispatchers.Main * * fun destroy() { // destroys an instance of MyUIClass * scope.cancel() // cancels all coroutines launched in this scope @@ -124,25 +138,81 @@ public val CoroutineScope.isActive: Boolean /** * A global [CoroutineScope] not bound to any job. - * * Global scope is used to launch top-level coroutines which are operating on the whole application lifetime * and are not cancelled prematurely. - * Another use of the global scope is operators running in [Dispatchers.Unconfined], which don't have any job associated with them. * - * Application code usually should use an application-defined [CoroutineScope]. Using - * [async][CoroutineScope.async] or [launch][CoroutineScope.launch] - * on the instance of [GlobalScope] is highly discouraged. + * Active coroutines launched in `GlobalScope` do not keep the process alive. They are like daemon threads. + * + * This is a **delicate** API. It is easy to accidentally create resource or memory leaks when + * `GlobalScope` is used. A coroutine launched in `GlobalScope` is not subject to the principle of structured + * concurrency, so if it hangs or gets delayed due to a problem (e.g. due to a slow network), it will stay working + * and consuming resources. For example, consider the following code: + * + * ``` + * fun loadConfiguration() { + * GlobalScope.launch { + * val config = fetchConfigFromServer() // network request + * updateConfiguration(config) + * } + * } + * ``` + * + * A call to `loadConfiguration` creates a coroutine in the `GlobalScope` that works in background without any + * provision to cancel it or to wait for its completion. If a network is slow, it keeps waiting in background, + * consuming resources. Repeated calls to `loadConfiguration` will consume more and more resources. + * + * ### Possible replacements + * + * In many cases uses of `GlobalScope` should be removed, marking the containing operation with `suspend`, for example: + * + * ``` + * suspend fun loadConfiguration() { + * val config = fetchConfigFromServer() // network request + * updateConfiguration(config) + * } + * ``` + * + * In cases when `GlobalScope.launch` was used to launch multiple concurrent operations, the corresponding + * operations shall be grouped with [coroutineScope] instead: + * + * ``` + * // concurrently load configuration and data + * suspend fun loadConfigurationAndData() { + * coroutinesScope { + * launch { loadConfiguration() } + * launch { loadData() } + * } + * } + * ``` + * + * In top-level code, when launching a concurrent operation operation from a non-suspending context, an appropriately + * confined instance of [CoroutineScope] shall be used instead of a `GlobalScope`. See docs on [CoroutineScope] for + * details. + * + * ### GlobalScope vs custom scope + * + * Do not replace `GlobalScope.launch { ... }` with `CoroutineScope().launch { ... }` constructor function call. + * The latter has the same pitfalls as `GlobalScope`. See [CoroutineScope] documentation on the intended usage of + * `CoroutineScope()` constructor function. + * + * ### Legitimate use-cases * - * Usage of this interface may look like this: + * There are limited circumstances under which `GlobalScope` can be legitimately and safely used, such as top-level background + * processes that must stay active for the whole duration of the application's lifetime. Because of that, any use + * of `GlobalScope` requires an explicit opt-in with `@OptIn(DelicateCoroutinesApi::class)`, like this: * * ``` - * fun ReceiveChannel<Int>.sqrt(): ReceiveChannel<Double> = GlobalScope.produce(Dispatchers.Unconfined) { - * for (number in this) { - * send(Math.sqrt(number)) + * // A global coroutine to log statistics every second, must be always active + * @OptIn(DelicateCoroutinesApi::class) + * val globalScopeReporter = GlobalScope.launch { + * while (true) { + * delay(1000) + * logStatistics() * } * } * ``` */ +@DelicateCoroutinesApi public object GlobalScope : CoroutineScope { /** * Returns [EmptyCoroutineContext]. diff --git a/kotlinx-coroutines-core/common/src/Debug.common.kt b/kotlinx-coroutines-core/common/src/Debug.common.kt index 1381ecd8..185ad295 100644 --- a/kotlinx-coroutines-core/common/src/Debug.common.kt +++ b/kotlinx-coroutines-core/common/src/Debug.common.kt @@ -32,10 +32,15 @@ public interface CopyableThrowable<T> where T : Throwable, T : CopyableThrowable /** * Creates a copy of the current instance. + * * For better debuggability, it is recommended to use original exception as [cause][Throwable.cause] of the resulting one. * Stacktrace of copied exception will be overwritten by stacktrace recovery machinery by [Throwable.setStackTrace] call. * An exception can opt-out of copying by returning `null` from this function. * Suppressed exceptions of the original exception should not be copied in order to avoid circular exceptions. + * + * This function is allowed to create a copy with a modified [message][Throwable.message], but it should be noted + * that the copy can be later recovered as well and message modification code should handle this situation correctly + * (e.g. by also storing the original message and checking it) to produce a human-readable result. */ public fun createCopy(): T? } diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 53dadf97..4543c5dd 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -150,4 +150,4 @@ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) */ @ExperimentalTime internal fun Duration.toDelayMillis(): Long = - if (this > Duration.ZERO) toLongMilliseconds().coerceAtLeast(1) else 0 + if (this > Duration.ZERO) inWholeMilliseconds.coerceAtLeast(1) else 0 diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 31e2ef22..be582132 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -113,16 +113,7 @@ public interface Job : CoroutineContext.Element { /** * Key for [Job] instance in the coroutine context. */ - public companion object Key : CoroutineContext.Key<Job> { - init { - /* - * Here we make sure that CoroutineExceptionHandler is always initialized in advance, so - * that if a coroutine fails due to StackOverflowError we don't fail to report this error - * trying to initialize CoroutineExceptionHandler - */ - CoroutineExceptionHandler - } - } + public companion object Key : CoroutineContext.Key<Job> // ------------ state query ------------ @@ -217,11 +208,10 @@ public interface Job : CoroutineContext.Element { * immediately cancels all its children. * * Parent cannot complete until all its children are complete. Parent waits for all its children to * complete in _completing_ or _cancelling_ state. - * * Uncaught exception in a child, by default, cancels parent. In particular, this applies to - * children created with [launch][CoroutineScope.launch] coroutine builder. Note that - * [async][CoroutineScope.async] and other future-like - * coroutine builders do not have uncaught exceptions by definition, since all their exceptions are - * caught and are encapsulated in their result. + * * Uncaught exception in a child, by default, cancels parent. This applies even to + * children created with [async][CoroutineScope.async] and other future-like + * coroutine builders, even though their exceptions are caught and are encapsulated in their result. + * This default behavior can be overridden with [SupervisorJob]. */ public val children: Sequence<Job> @@ -262,9 +252,9 @@ public interface Job : CoroutineContext.Element { * suspending function is invoked or while it is suspended, this function * throws [CancellationException]. * - * In particular, it means that a parent coroutine invoking `join` on a child coroutine that was started using - * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child - * had crashed, unless a non-standard [CoroutineExceptionHandler] is installed in the context. + * In particular, it means that a parent coroutine invoking `join` on a child coroutine throws + * [CancellationException] if the child had failed, since a failure of a child coroutine cancels parent by default, + * unless the child was launched from within [supervisorScope]. * * This function can be used in [select] invocation with [onJoin] clause. * Use [isCompleted] to check for a completion of this job without waiting. @@ -467,6 +457,14 @@ public interface ParentJob : Job { @InternalCoroutinesApi @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases") public interface ChildHandle : DisposableHandle { + + /** + * Returns the parent of the current parent-child relationship. + * @suppress **This is unstable API and it is subject to change.** + */ + @InternalCoroutinesApi + public val parent: Job? + /** * Child is cancelling its parent by invoking this method. * This method is invoked by the child twice. The first time child report its root cause as soon as possible, @@ -500,9 +498,9 @@ internal fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle * suspending function is invoked or while it is suspended, this function * throws [CancellationException]. * - * In particular, it means that a parent coroutine invoking `cancelAndJoin` on a child coroutine that was started using - * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child - * had crashed, unless a non-standard [CoroutineExceptionHandler] is installed in the context. + * In particular, it means that a parent coroutine invoking `cancelAndJoin` on a child coroutine throws + * [CancellationException] if the child had failed, since a failure of a child coroutine cancels parent by default, + * unless the child was launched from within [supervisorScope]. * * This is a shortcut for the invocation of [cancel][Job.cancel] followed by [join][Job.join]. */ @@ -660,6 +658,9 @@ private fun Throwable?.orCancellation(job: Job): Throwable = this ?: JobCancella */ @InternalCoroutinesApi public object NonDisposableHandle : DisposableHandle, ChildHandle { + + override val parent: Job? get() = null + /** * Does not do anything. * @suppress diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5b516ae2..0a3dd234 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -96,7 +96,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren ~ waits for start >> start / join / await invoked ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList - + onStartInternal / onStart (lazy coroutine is started) + + onStart (lazy coroutine is started) ~ active coroutine is working (or scheduled to execution) >> childCancelled / cancelImpl invoked ## CANCELLING: state is Finishing, state.rootCause != null @@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Initializes parent job. * It shall be invoked at most once after construction after all other initialization. */ - internal fun initParentJobInternal(parent: Job?) { + protected fun initParentJob(parent: Job?) { assert { parentHandle == null } if (parent == null) { parentHandle = NonDisposableHandle @@ -393,12 +393,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) return FALSE // already active if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY - onStartInternal() + onStart() return TRUE } is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers if (!_state.compareAndSet(state, state.list)) return RETRY - onStartInternal() + onStart() return TRUE } else -> return FALSE // not a new state @@ -409,7 +409,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Override to provide the actual [start] action. * This function is invoked exactly once when non-active coroutine is [started][start]. */ - internal open fun onStartInternal() {} + protected open fun onStart() {} public final override fun getCancellationException(): CancellationException = when (val state = this.state) { @@ -541,7 +541,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren public final override suspend fun join() { if (!joinInternal()) { // fast-path no wait - coroutineContext.checkCompletion() + coroutineContext.ensureActive() return // do not suspend } return joinSuspend() // slow-path wait @@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) + // we are mimicking suspendCancellableCoroutine here and call initCancellability, too. + cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) cont.getResult() } @@ -1311,7 +1313,7 @@ private class Empty(override val isActive: Boolean) : Incomplete { } internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob { - init { initParentJobInternal(parent) } + init { initParentJob(parent) } override val onCancelComplete get() = true /* * Check whether parent is able to handle exceptions as well. @@ -1459,6 +1461,7 @@ private class InvokeOnCancelling( internal class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobCancellingNode(), ChildHandle { + override val parent: Job get() = job override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) } diff --git a/kotlinx-coroutines-core/common/src/NonCancellable.kt b/kotlinx-coroutines-core/common/src/NonCancellable.kt index a9c68f09..c2781092 100644 --- a/kotlinx-coroutines-core/common/src/NonCancellable.kt +++ b/kotlinx-coroutines-core/common/src/NonCancellable.kt @@ -18,41 +18,51 @@ import kotlin.coroutines.* * // this code will not be cancelled * } * ``` + * + * **WARNING**: This object is not designed to be used with [launch], [async], and other coroutine builders. + * if you write `launch(NonCancellable) { ... }` then not only the newly launched job will not be cancelled + * when the parent is cancelled, the whole parent-child relation between parent and child is severed. + * The parent will not wait for the child's completion, nor will be cancelled when the child crashed. */ +@Suppress("DeprecatedCallableAddReplaceWith") public object NonCancellable : AbstractCoroutineContextElement(Job), Job { + + private const val message = "NonCancellable can be used only as an argument for 'withContext', direct usages of its API are prohibited" + /** * Always returns `true`. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi - override val isActive: Boolean get() = true + @Deprecated(level = DeprecationLevel.WARNING, message = message) + override val isActive: Boolean + get() = true /** * Always returns `false`. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override val isCompleted: Boolean get() = false /** * Always returns `false`. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override val isCancelled: Boolean get() = false /** * Always returns `false`. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun start(): Boolean = false /** * Always throws [UnsupportedOperationException]. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override suspend fun join() { throw UnsupportedOperationException("This job is always active") } @@ -61,6 +71,7 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Always throws [UnsupportedOperationException]. * @suppress **This an internal API and should not be used from general code.** */ + @Deprecated(level = DeprecationLevel.WARNING, message = message) override val onJoin: SelectClause0 get() = throw UnsupportedOperationException("This job is always active") @@ -68,14 +79,13 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Always throws [IllegalStateException]. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun getCancellationException(): CancellationException = throw IllegalStateException("This job is always active") /** * @suppress **This an internal API and should not be used from general code.** */ - @Suppress("OverridingDeprecatedMember") - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle @@ -83,7 +93,7 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Always returns no-op handle. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler): DisposableHandle = NonDisposableHandle @@ -91,7 +101,7 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Does nothing. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun cancel(cause: CancellationException?) {} /** @@ -105,7 +115,7 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Always returns [emptySequence]. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override val children: Sequence<Job> get() = emptySequence() @@ -113,7 +123,7 @@ public object NonCancellable : AbstractCoroutineContextElement(Job), Job { * Always returns [NonDisposableHandle] and does not do anything. * @suppress **This an internal API and should not be used from general code.** */ - @InternalCoroutinesApi + @Deprecated(level = DeprecationLevel.WARNING, message = message) override fun attachChild(child: ChildJob): ChildHandle = NonDisposableHandle /** @suppress */ diff --git a/kotlinx-coroutines-core/common/src/Supervisor.kt b/kotlinx-coroutines-core/common/src/Supervisor.kt index 01a8e705..8411c5c6 100644 --- a/kotlinx-coroutines-core/common/src/Supervisor.kt +++ b/kotlinx-coroutines-core/common/src/Supervisor.kt @@ -42,11 +42,15 @@ public fun SupervisorJob0(parent: Job? = null) : Job = SupervisorJob(parent) * Creates a [CoroutineScope] with [SupervisorJob] and calls the specified suspend block with this scope. * The provided scope inherits its [coroutineContext][CoroutineScope.coroutineContext] from the outer scope, but overrides * context's [Job] with [SupervisorJob]. + * This function returns as soon as the given block and all its child coroutines are completed. * - * A failure of a child does not cause this scope to fail and does not affect its other children, - * so a custom policy for handling failures of its children can be implemented. See [SupervisorJob] for details. - * A failure of the scope itself (exception thrown in the [block] or cancellation) fails the scope with all its children, + * Unlike [coroutineScope], a failure of a child does not cause this scope to fail and does not affect its other children, + * so a custom policy for handling failures of its children can be implemented. See [SupervisorJob] for additional details. + * A failure of the scope itself (exception thrown in the [block] or external cancellation) fails the scope with all its children, * but does not cancel parent job. + * + * The method may throw a [CancellationException] if the current job was cancelled externally, + * or rethrow an exception thrown by the given [block]. */ public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R { contract { diff --git a/kotlinx-coroutines-core/common/src/Yield.kt b/kotlinx-coroutines-core/common/src/Yield.kt index 975a4893..98e21041 100644 --- a/kotlinx-coroutines-core/common/src/Yield.kt +++ b/kotlinx-coroutines-core/common/src/Yield.kt @@ -30,7 +30,7 @@ import kotlin.coroutines.intrinsics.* */ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val context = uCont.context - context.checkCompletion() + context.ensureActive() val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit if (cont.dispatcher.isDispatchNeeded(context)) { // this is a regular dispatcher -- do simple dispatchYield @@ -50,8 +50,3 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u } COROUTINE_SUSPENDED } - -internal fun CoroutineContext.checkCompletion() { - val job = get(Job) - if (job != null && !job.isActive) throw job.getCancellationException() -} diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 9721583e..bcf19215 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -127,8 +127,7 @@ internal abstract class AbstractSendChannel<E>( // ------ SendChannel ------ public final override val isClosedForSend: Boolean get() = closedForSend != null - public override val isFull: Boolean get() = isFullImpl - protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull + private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull public final override suspend fun send(element: E) { // fast path -- try offer non-blocking @@ -137,23 +136,43 @@ internal abstract class AbstractSendChannel<E>( return sendSuspend(element) } - public final override fun offer(element: E): Boolean { + override fun offer(element: E): Boolean { + // Temporary migration for offer users who rely on onUndeliveredElement + try { + return super.offer(element) + } catch (e: Throwable) { + onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { + // If it crashes, add send exception as suppressed for better diagnostics + it.addSuppressed(e) + throw it + } + throw e + } + } + + public final override fun trySend(element: E): ChannelResult<Unit> { val result = offerInternal(element) return when { - result === OFFER_SUCCESS -> true + result === OFFER_SUCCESS -> ChannelResult.success(Unit) result === OFFER_FAILED -> { - // We should check for closed token on offer as well, otherwise offer won't be linearizable + // We should check for closed token on trySend as well, otherwise trySend won't be linearizable // in the face of concurrent close() // See https://github.com/Kotlin/kotlinx.coroutines/issues/359 - throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false)) + val closedForSend = closedForSend ?: return ChannelResult.failure() + ChannelResult.closed(helpCloseAndGetSendException(closedForSend)) } result is Closed<*> -> { - throw recoverStackTrace(helpCloseAndGetSendException(element, result)) + ChannelResult.closed(helpCloseAndGetSendException(result)) } - else -> error("offerInternal returned $result") + else -> error("trySend returned $result") } } + private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable { + helpClose(closed) + return closed.sendException + } + private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable { // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419 @@ -604,26 +623,8 @@ internal abstract class AbstractChannel<E>( if (result) onReceiveEnqueued() } - public final override suspend fun receiveOrNull(): E? { - // fast path -- try poll non-blocking - val result = pollInternal() - @Suppress("UNCHECKED_CAST") - if (result !== POLL_FAILED && result !is Closed<*>) return result as E - // slow-path does suspend - return receiveSuspend(RECEIVE_NULL_ON_CLOSE) - } - @Suppress("UNCHECKED_CAST") - private fun receiveOrNullResult(result: Any?): E? { - if (result is Closed<*>) { - if (result.closeCause != null) throw recoverStackTrace(result.closeCause) - return null - } - return result as E - } - - @Suppress("UNCHECKED_CAST") - public final override suspend fun receiveOrClosed(): ValueOrClosed<E> { + public final override suspend fun receiveCatching(): ChannelResult<E> { // fast path -- try poll non-blocking val result = pollInternal() if (result !== POLL_FAILED) return result.toResult() @@ -632,9 +633,11 @@ internal abstract class AbstractChannel<E>( } @Suppress("UNCHECKED_CAST") - public final override fun poll(): E? { + public final override fun tryReceive(): ChannelResult<E> { val result = pollInternal() - return if (result === POLL_FAILED) null else receiveOrNullResult(result) + if (result === POLL_FAILED) return ChannelResult.failure() + if (result is Closed<*>) return ChannelResult.closed(result.closeCause) + return ChannelResult.success(result as E) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") @@ -734,18 +737,10 @@ internal abstract class AbstractChannel<E>( } } - final override val onReceiveOrNull: SelectClause1<E?> - get() = object : SelectClause1<E?> { + final override val onReceiveCatching: SelectClause1<ChannelResult<E>> + get() = object : SelectClause1<ChannelResult<E>> { @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) { - registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R) - } - } - - final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> - get() = object : SelectClause1<ValueOrClosed<E>> { - @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) { + override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) { registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R) } } @@ -776,15 +771,7 @@ internal abstract class AbstractChannel<E>( } RECEIVE_RESULT -> { if (!select.trySelect()) return - startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion) - } - RECEIVE_NULL_ON_CLOSE -> { - if (value.closeCause == null) { - if (!select.trySelect()) return - startCoroutineUnintercepted(null, select.completion) - } else { - throw recoverStackTrace(value.receiveException) - } + startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion) } } } @@ -905,7 +892,7 @@ internal abstract class AbstractChannel<E>( @JvmField val receiveMode: Int ) : Receive<E>() { fun resumeValue(value: E): Any? = when (receiveMode) { - RECEIVE_RESULT -> ValueOrClosed.value(value) + RECEIVE_RESULT -> ChannelResult.success(value) else -> value } @@ -921,7 +908,6 @@ internal abstract class AbstractChannel<E>( override fun resumeReceiveClosed(closed: Closed<*>) { when { - receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>()) else -> cont.resumeWithException(closed.receiveException) } @@ -990,7 +976,7 @@ internal abstract class AbstractChannel<E>( @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( - if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, + if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value, select.completion, resumeOnCancellationFun(value) ) @@ -1000,12 +986,7 @@ internal abstract class AbstractChannel<E>( if (!select.trySelect()) return when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException) - RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion) - RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { - block.startCoroutineCancellable(null, select.completion) - } else { - select.resumeSelectWithException(closed.receiveException) - } + RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion) } } @@ -1023,8 +1004,7 @@ internal abstract class AbstractChannel<E>( // receiveMode values internal const val RECEIVE_THROWS_ON_CLOSE = 0 -internal const val RECEIVE_NULL_ON_CLOSE = 1 -internal const val RECEIVE_RESULT = 2 +internal const val RECEIVE_RESULT = 1 @JvmField @SharedImmutable @@ -1128,9 +1108,9 @@ internal class Closed<in E>( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed@$hexAddress[$closeCause]" @@ -1143,8 +1123,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose } @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") -private inline fun <E> Any?.toResult(): ValueOrClosed<E> = - if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E) +private inline fun <E> Any?.toResult(): ChannelResult<E> = + if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E) @Suppress("NOTHING_TO_INLINE") -private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause) +private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause) diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 4569ec72..7e6c0e68 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -49,7 +49,6 @@ internal open class ArrayChannel<E>( protected final override val isBufferAlwaysFull: Boolean get() = false protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND - override val isFull: Boolean get() = lock.withLock { isFullImpl } override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl } override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive } diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index 07e75976..b1c24b45 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.* * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with * the broadcasting coroutine in hard-to-specify ways. * - * **Note: This API is obsolete.** It will be deprecated and replaced with the - * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param start coroutine start option. The default value is [CoroutineStart.LAZY]. */ +@ObsoleteCoroutinesApi public fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY @@ -95,10 +97,12 @@ public fun <E> ReceiveChannel<E>.broadcast( * * ### Future replacement * + * This API is obsolete since 1.5.0. * This function has an inappropriate result type of [BroadcastChannel] which provides * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with - * the broadcasting coroutine in hard-to-specify ways. It will be replaced with - * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future. + * the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param capacity capacity of the channel's buffer (1 by default). @@ -106,6 +110,7 @@ public fun <E> ReceiveChannel<E>.broadcast( * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]). * @param block the coroutine code. */ +@ObsoleteCoroutinesApi public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, @@ -127,7 +132,13 @@ private open class BroadcastCoroutine<E>( parentContext: CoroutineContext, protected val _channel: BroadcastChannel<E>, active: Boolean -) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel { +) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active), + ProducerScope<E>, BroadcastChannel<E> by _channel { + + init { + initParentJob(parentContext[Job]) + } + override val isActive: Boolean get() = super.isActive override val channel: SendChannel<E> diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt index 6cd79373..c82b8dbd 100644 --- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt @@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED * See `BroadcastChannel()` factory function for the description of available * broadcast channel implementations. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public interface BroadcastChannel<E> : SendChannel<E> { /** * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it. @@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> { * * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity. * * otherwise -- throws [IllegalArgumentException]. * - * **Note: This is an experimental api.** It may be changed in the future updates. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow] + * and [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi +@ObsoleteCoroutinesApi public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = when (capacity) { 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index b8b81aac..b15c4262 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.internal.* import kotlin.jvm.* @@ -23,7 +24,7 @@ import kotlin.jvm.* public interface SendChannel<in E> { /** * Returns `true` if this channel was closed by an invocation of [close]. This means that - * calling [send] or [offer] will result in an exception. + * calling [send] will result in an exception. * * **Note: This is an experimental api.** This property may change its semantics and/or name in the future. */ @@ -31,16 +32,6 @@ public interface SendChannel<in E> { public val isClosedForSend: Boolean /** - * Returns `true` if the channel is full (out of capacity), which means that an attempt to [send] will suspend. - * This function returns `false` if the channel [is closed for `send`][isClosedForSend]. - * - * @suppress **Will be removed in next releases, no replacement.** - */ - @ExperimentalCoroutinesApi - @Deprecated(level = DeprecationLevel.ERROR, message = "Will be removed in next releases without replacement") - public val isFull: Boolean - - /** * Sends the specified [element] to this channel, suspending the caller while the buffer of this channel is full * or if it does not exist, or throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). * @@ -60,7 +51,7 @@ public interface SendChannel<in E> { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onSend] clause. - * Use [offer] to try sending to this channel without waiting. + * Use [trySend] to try sending to this channel without waiting. */ public suspend fun send(element: E) @@ -73,19 +64,17 @@ public interface SendChannel<in E> { */ public val onSend: SelectClause2<E, SendChannel<E>> + /** * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, - * and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off - * in situations when `send` suspends. - * - * Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details). + * and returns the successful result. Otherwise, returns failed or closed result. + * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws. * - * When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it - * it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed, - * then it calls `onUndeliveredElement` before throwing an exception. + * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and + * it does not call `onUndeliveredElement` that was installed for this channel. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. */ - public fun offer(element: E): Boolean + public fun trySend(element: E): ChannelResult<Unit> /** * Closes this channel. @@ -97,7 +86,7 @@ public interface SendChannel<in E> { * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements * are received. * - * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer] + * A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] * and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive]. * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or * receive on a failed channel throw the specified [cause] exception. @@ -116,10 +105,11 @@ public interface SendChannel<in E> { * * the cause of `close` or `cancel` otherwise. * * Example of usage (exception handling is omitted): + * * ``` * val events = Channel(UNLIMITED) * callbackBasedApi.registerCallback { event -> - * events.offer(event) + * events.trySend(event) * } * * val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) { @@ -128,7 +118,6 @@ public interface SendChannel<in E> { * } * * events.invokeOnClose { callbackBasedApi.stop() } - * * ``` * * **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future. @@ -140,6 +129,44 @@ public interface SendChannel<in E> { */ @ExperimentalCoroutinesApi public fun invokeOnClose(handler: (cause: Throwable?) -> Unit) + + /** + * **Deprecated** offer method. + * + * This method was deprecated in the favour of [trySend]. + * It has proven itself as the most error-prone method in Channel API: + * + * * `Boolean` return type creates the false sense of security, implying that `false` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Due to signature and explicit `if (ch.offer(...))` checks it was easy to + * oversee such error during code review. + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * **NB** Automatic migration provides best-effort for the user experience, but requires removal + * or adjusting of the code that relied on the exception handling. + * The complete replacement has a more verbose form: + * ``` + * channel.trySend(element) + * .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } + * .isSuccess + * ``` + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + * + * @suppress **Deprecated**. + */ + @Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySend' method", + replaceWith = ReplaceWith("trySend(element).isSuccess") + ) // Warning since 1.5.0 + public fun offer(element: E): Boolean { + val result = trySend(element) + if (result.isSuccess) return true + throw recoverStackTrace(result.exceptionOrNull() ?: return false) + } } /** @@ -182,7 +209,7 @@ public interface ReceiveChannel<out E> { * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * * This function can be used in [select] invocations with the [onReceive] clause. - * Use [poll] to try receiving from this channel without waiting. + * Use [tryReceive] to try receiving from this channel without waiting. */ public suspend fun receive(): E @@ -195,94 +222,39 @@ public interface ReceiveChannel<out E> { public val onReceive: SelectClause1<E> /** - * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, - * or returns `null` if the channel is [closed for `receive`][isClosedForReceive] without cause, - * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with a [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrNull` call can retrieve the element from the channel, - * but then throw [CancellationException], thus failing to deliver the element. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. - * - * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. - * - * This function can be used in [select] invocations with the [onReceiveOrNull] clause. - * Use [poll] to try receiving from this channel without waiting. - * - * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension. - */ - @ObsoleteCoroutinesApi - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution - @Deprecated( - message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull") - ) - public suspend fun receiveOrNull(): E? - - /** - * Clause for the [select] expression of the [receiveOrNull] suspending function that selects with the element - * received from the channel or `null` if the channel is - * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension. - */ - @ObsoleteCoroutinesApi - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution - @Deprecated( - message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull") - ) - public val onReceiveOrNull: SelectClause1<E?> - - /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. - * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel - * or the close cause if the channel was closed. + * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel + * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally. + * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed]. * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with a [CancellationException]. * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel, + * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel, * but then throw [CancellationException], thus failing to deliver the element. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocations with the [onReceiveOrClosed] clause. - * Use [poll] to try receiving from this channel without waiting. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + * This function can be used in [select] invocations with the [onReceiveCatching] clause. + * Use [tryReceive] to try receiving from this channel without waiting. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public suspend fun receiveOrClosed(): ValueOrClosed<E> + public suspend fun receiveCatching(): ChannelResult<E> /** - * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value + * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value * that is received from the channel or with a close cause if the channel * [is closed for `receive`][isClosedForReceive]. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> + public val onReceiveCatching: SelectClause1<ChannelResult<E>> /** - * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty - * or is [is closed for `receive`][isClosedForReceive] without a cause. - * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. + * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] + * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] + * result if the channel is closed. */ - public fun poll(): E? + public fun tryReceive(): ChannelResult<E> /** * Returns a new iterator to receive elements from this channel using a `for` loop. @@ -318,107 +290,262 @@ public interface ReceiveChannel<out E> { */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(cause: Throwable? = null): Boolean + + /** + * **Deprecated** poll method. + * + * This method was deprecated in the favour of [tryReceive]. + * It has proven itself as error-prone method in Channel API: + * + * * Nullable return type creates the false sense of security, implying that `null` + * is returned instead of throwing an exception. + * * It was used mostly from non-suspending APIs where CancellationException triggered + * internal failures in the application (the most common source of bugs). + * * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead. + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context. + * + * ### Replacement note + * + * The replacement `tryReceive().getOrNull()` is a default that ignores all close exceptions and + * proceeds with `null`, while `poll` throws an exception if the channel was closed with an exception. + * Replacement with the very same 'poll' semantics is `tryReceive().onClosed { if (it != null) throw it }.getOrNull()` + * + * @suppress **Deprecated**. + */ + @Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'tryReceive'. " + + "Please note that the provided replacement does not rethrow channel's close cause as 'poll' did, " + + "for the precise replacement please refer to the 'poll' documentation", + replaceWith = ReplaceWith("tryReceive().getOrNull()") + ) // Warning since 1.5.0 + public fun poll(): E? { + val result = tryReceive() + if (result.isSuccess) return result.getOrThrow() + throw recoverStackTrace(result.exceptionOrNull() ?: return null) + } + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * + * It had the following pitfalls: + * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" + * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' + * - It didn't really belong to core channel API and can be exposed as an extension instead. + * + * ### Replacement note + * + * The replacement `receiveCatching().getOrNull()` is a safe default that ignores all close exceptions and + * proceeds with `null`, while `receiveOrNull` throws an exception if the channel was closed with an exception. + * Replacement with the very same `receiveOrNull` semantics is `receiveCatching().onClosed { if (it != null) throw it }.getOrNull()`. + * + * @suppress **Deprecated** + */ + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + @LowPriorityInOverloadResolution + @Deprecated( + message = "Deprecated in favor of 'receiveCatching'. " + + "Please note that the provided replacement does not rethrow channel's close cause as 'receiveOrNull' did, " + + "for the detailed replacement please refer to the 'receiveOrNull' documentation", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("receiveCatching().getOrNull()") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0 + public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() + + /** + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * See [receiveOrNull] documentation. + * + * @suppress **Deprecated**: in favor of onReceiveCatching extension. + */ + @Deprecated( + message = "Deprecated in favor of onReceiveCatching extension", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("onReceiveCatching") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0 + public val onReceiveOrNull: SelectClause1<E?> + get() { + return object : SelectClause1<E?> { + @InternalCoroutinesApi + override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) { + onReceiveCatching.registerSelectClause1(select) { + it.exceptionOrNull()?.let { throw it } + block(it.getOrNull()) + } + } + } + } } /** - * A discriminated union of [ReceiveChannel.receiveOrClosed] result - * that encapsulates either an element of type [T] successfully received from the channel or a close cause. + * A discriminated union of channel operation result. + * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with + * an optional cause. * - * :todo: Do not make it public before resolving todos in the code of this class. + * The successful result represents a successful operation with a value of type [T], for example, + * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend]. * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed. + * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. + * + * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. + * It is guaranteed that if the result is _closed_, then the target channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on whether the failed operation was sending or receiving. */ -@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING") -@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed -public inline class ValueOrClosed<out T> -internal constructor(private val holder: Any?) { +@JvmInline +public value class ChannelResult<out T> +@PublishedApi internal constructor(@PublishedApi internal val holder: Any?) { /** - * Returns `true` if this instance represents a received element. - * In this case [isClosed] returns `false`. - * todo: it is commented for now, because it is not used + * Returns `true` if this instance represents a successful + * operation outcome. + * + * In this case [isFailure] and [isClosed] return `false`. */ - //public val isValue: Boolean get() = holder !is Closed + public val isSuccess: Boolean get() = holder !is Failed /** - * Returns `true` if this instance represents a close cause. - * In this case [isValue] returns `false`. + * Returns `true` if this instance represents unsuccessful operation. + * + * In this case [isSuccess] returns false, but it does not imply + * that the channel is failed or closed. + * + * Example of a failed operation without an exception and channel being closed + * is [Channel.trySend] attempt to a channel that is full. */ - public val isClosed: Boolean get() = holder is Closed + public val isFailure: Boolean get() = holder is Failed /** - * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise. + * Returns `true` if this instance represents unsuccessful operation + * to a closed or cancelled channel. * - * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]: + * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply + * that [exceptionOrNull] returns non-null value. * - * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`. - * On the other hand, the channel has the following `receive` variants: - * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh? - * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull` - * * `receiveOrClosed` - * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`. + * It can happen if the channel was [closed][Channel.close] normally without an exception. */ - @Suppress("UNCHECKED_CAST") - public val value: T - get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T + public val isClosed: Boolean get() = holder is Closed /** - * Returns the received value if this element represents a received value, or `null` otherwise. - * :todo: Decide if it shall be made into extension that is available only for non-null T. - * Note: it might become inconsistent with kotlin.Result + * Returns the encapsulated value if this instance represents success or `null` if it represents failed result. */ @Suppress("UNCHECKED_CAST") - public val valueOrNull: T? - get() = if (holder is Closed) null else holder as T + public fun getOrNull(): T? = if (holder !is Failed) holder as T else null /** - * :todo: Decide, if it is needed, how it shall be named with relation to [value]. - * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site, - * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used. - * Otherwise, it could be very hard to locate the source of the exception. - * todo: it is commented for now, because it is not used + * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. */ - //@Suppress("UNCHECKED_CAST") - //public val valueOrThrow: T - // get() = if (holder is Closed) throw holder.exception else holder as T + public fun getOrThrow(): T { + @Suppress("UNCHECKED_CAST") + if (holder !is Failed) return holder as T + if (holder is Closed && holder.cause != null) throw holder.cause + error("Trying to call 'getOrThrow' on a failed channel result: $holder") + } /** - * Returns the close cause of the channel if this instance represents a close cause, or throws - * an [IllegalStateException] otherwise. + * Returns the encapsulated exception if this instance represents failure or `null` if it is success + * or unsuccessful operation to closed channel. */ - @Suppress("UNCHECKED_CAST") - public val closeCause: Throwable? get() = - if (holder is Closed) holder.cause else error("Channel was not closed") + public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause + + internal open class Failed { + override fun toString(): String = "Failed" + } + + internal class Closed(@JvmField val cause: Throwable?): Failed() { + override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause + override fun hashCode(): Int = cause.hashCode() + override fun toString(): String = "Closed($cause)" + } + + @Suppress("NOTHING_TO_INLINE") + @InternalCoroutinesApi + public companion object { + private val failed = Failed() + + @InternalCoroutinesApi + public fun <E> success(value: E): ChannelResult<E> = + ChannelResult(value) + + @InternalCoroutinesApi + public fun <E> failure(): ChannelResult<E> = + ChannelResult(failed) + + @InternalCoroutinesApi + public fun <E> closed(cause: Throwable?): ChannelResult<E> = + ChannelResult(Closed(cause)) + } - /** - * @suppress - */ public override fun toString(): String = when (holder) { is Closed -> holder.toString() else -> "Value($holder)" + } +} + +/** + * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the + * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed + * result. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.getOrElse(onFailure: (exception: Throwable?) -> T): T { + contract { + callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T +} - internal class Closed(@JvmField val cause: Throwable?) { - // todo: it is commented for now, because it is not used - //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) - override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause - override fun hashCode(): Int = cause.hashCode() - override fun toString(): String = "Closed($cause)" +/** + * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess]. + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onSuccess(action: (value: T) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + if (holder !is ChannelResult.Failed) action(holder as T) + return this +} - /** - * todo: consider making value/closed constructors public in the future. - */ - internal companion object { - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> value(value: E): ValueOrClosed<E> = - ValueOrClosed(value) - - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> = - ValueOrClosed(Closed(cause)) +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Failed) action(exceptionOrNull()) + return this +} + +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure] + * due to channel being [closed][Channel.close]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * It is guaranteed that if action is invoked, then the channel is either [closed for send][Channel.isClosedForSend] + * or is [closed for receive][Channel.isClosedForReceive] depending on the failed operation. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T> { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Closed) action(exceptionOrNull()) + return this } /** @@ -493,14 +620,14 @@ public interface ChannelIterator<out E> { * * * When `capacity` is [Channel.UNLIMITED] — it creates a channel with effectively unlimited buffer. * This channel has a linked-list buffer of unlimited capacity (limited only by available memory). - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is [Channel.CONFLATED] — it creates a _conflated_ channel - * This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the last element sent. * Back-to-back sent elements are conflated — only the last sent element is received, * while previously sent elements **are lost**. - * [Sending][send] to this channel never suspends, and [offer] always returns `true`. + * [Sending][send] to this channel never suspends, and [trySend] always succeeds. * * * When `capacity` is positive but less than [UNLIMITED] — it creates an array-based channel with the specified capacity. * This channel has an array buffer of a fixed `capacity`. @@ -547,8 +674,6 @@ public interface ChannelIterator<out E> { * * * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually * send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. - * * When [offer][SendChannel.offer] operation throws an exception when - * the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]. * * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext] * operation throws an exception when it had retrieved the element from the * channel but was cancelled before the code following the receive call resumed. diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index b2b257de..57b2797d 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -11,8 +11,10 @@ import kotlin.coroutines.* internal open class ChannelCoroutine<E>( parentContext: CoroutineContext, protected val _channel: Channel<E>, + initParentJob: Boolean, active: Boolean -) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel { +) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel { + val channel: Channel<E> get() = this override fun cancel() { diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index e3567e31..e0b4f9d2 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -37,129 +37,40 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() } /** - * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty] - * or returns `null` if the channel is [closed][Channel.isClosedForReceive]. + * This function is deprecated in the favour of [ReceiveChannel.receiveCatching]. * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way - * to tell if some element was already received from the channel or not. See [Channel] documentation for details. + * This function is considered error-prone for the following reasons; + * * Is throwing if the channel has failed even though its signature may suggest it returns 'null' + * * It is easy to forget that exception handling still have to be explicit + * * During code reviews and code reading, intentions of the code are frequently unclear: + * are potential exceptions ignored deliberately or not? * - * Note, that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. + * @suppress doc */ +@Deprecated( + "Deprecated in the favour of 'receiveCatching'", + ReplaceWith("receiveCatching().getOrNull()"), + DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 @Suppress("EXTENSION_SHADOWED_BY_MEMBER") -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel<E?>).receiveOrNull() } /** - * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that - * is received from the channel or selects with `null` if the channel - * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. - **/ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x + * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching] + */ +@Deprecated( + "Deprecated in the favour of 'onReceiveCatching'", + level = DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel<E?>).onReceiveOrNull } /** - * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@ObsoleteCoroutinesApi -public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit = - consume { - for (element in this) action(element) - } - -// -------- Operations on ReceiveChannel -------- - -/** - * Returns a [List] containing all elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - */ -@OptIn(ExperimentalStdlibApi::class) -public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { - consumeEach { - add(it) - } -} - -/** - * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on the [ReceiveChannel] - * with the corresponding cause. See also [ReceiveChannel.consume]. - * - * **WARNING**: It is planned that in the future a second invocation of this method - * on an channel that is already being consumed is going to fail fast, that it - * immediately throws an [IllegalStateException]. - * See [this issue](https://github.com/Kotlin/kotlinx.coroutines/issues/167) - * for details. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> - cancelConsumed(cause) -} - -@PublishedApi -internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { - cancel(cause?.let { - it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) - }) -} - -/** - * Returns a [CompletionHandler] that invokes [cancel][ReceiveChannel.cancel] on all the - * specified [ReceiveChannel] instances with the corresponding cause. - * See also [ReceiveChannel.consumes()] for a version on one channel. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = - { cause: Throwable? -> - var exception: Throwable? = null - for (channel in channels) - try { - channel.cancelConsumed(cause) - } catch (e: Throwable) { - if (exception == null) { - exception = e - } else { - exception.addSuppressedThrowable(e) - } - } - exception?.let { throw it } - } - -/** * Makes sure that the given [block] consumes all elements from the given channel * by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block. * @@ -194,2006 +105,35 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) } /** - * Performs the given [action] for each received element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.consumeEachIndexed(action: (IndexedValue<E>) -> Unit) { - var index = 0 - consumeEach { - action(IndexedValue(index++, it)) - } -} - -/** - * Returns an element at the given [index] or throws an [IndexOutOfBoundsException] if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = - elementAtOrElse(index) { throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") } - -/** - * Returns an element at the given [index] or the result of calling the [defaultValue] function if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.elementAtOrElse(index: Int, defaultValue: (Int) -> E): E = - consume { - if (index < 0) - return defaultValue(index) - var count = 0 - for (element in this) { - if (index == count++) - return element - } - return defaultValue(index) - } - -/** - * Returns an element at the given [index] or `null` if the [index] is out of bounds of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? = - consume { - if (index < 0) - return null - var count = 0 - for (element in this) { - if (index == count++) - return element - } - return null - } - -/** - * Returns the first element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.find(predicate: (E) -> Boolean): E? = - firstOrNull(predicate) - -/** - * Returns the last element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.findLast(predicate: (E) -> Boolean): E? = - lastOrNull(predicate) - -/** - * Returns first element. - * @throws [NoSuchElementException] if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.first(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - return iterator.next() - } - -/** - * Returns the first element matching the given [predicate]. - * @throws [NoSuchElementException] if no such element is found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.first(predicate: (E) -> Boolean): E { - consumeEach { - if (predicate(it)) return it - } - throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") -} - -/** - * Returns the first element, or `null` if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - return iterator.next() - } - -/** - * Returns the first element matching the given [predicate], or `null` if element was not found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.firstOrNull(predicate: (E) -> Boolean): E? { - consumeEach { - if (predicate(it)) return it - } - return null -} - -/** - * Returns first index of [element], or -1 if the channel does not contain element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int { - var index = 0 - consumeEach { - if (element == it) - return index - index++ - } - return -1 -} - -/** - * Returns index of the first element matching the given [predicate], or -1 if the channel does not contain such element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.indexOfFirst(predicate: (E) -> Boolean): Int { - var index = 0 - consumeEach { - if (predicate(it)) - return index - index++ - } - return -1 -} - -/** - * Returns index of the last element matching the given [predicate], or -1 if the channel does not contain such element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.indexOfLast(predicate: (E) -> Boolean): Int { - var lastIndex = -1 - var index = 0 - consumeEach { - if (predicate(it)) - lastIndex = index - index++ - } - return lastIndex -} - -/** - * Returns the last element. - * @throws [NoSuchElementException] if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.last(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - var last = iterator.next() - while (iterator.hasNext()) - last = iterator.next() - return last - } - -/** - * Returns the last element matching the given [predicate]. - * @throws [NoSuchElementException] if no such element is found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.last(predicate: (E) -> Boolean): E { - var last: E? = null - var found = false - consumeEach { - if (predicate(it)) { - last = it - found = true - } - } - if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") - @Suppress("UNCHECKED_CAST") - return last as E -} - -/** - * Returns last index of [element], or -1 if the channel does not contain element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int { - var lastIndex = -1 - var index = 0 - consumeEach { - if (element == it) - lastIndex = index - index++ - } - return lastIndex -} - -/** - * Returns the last element, or `null` if the channel is empty. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - var last = iterator.next() - while (iterator.hasNext()) - last = iterator.next() - return last - } - -/** - * Returns the last element matching the given [predicate], or `null` if no such element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.lastOrNull(predicate: (E) -> Boolean): E? { - var last: E? = null - consumeEach { - if (predicate(it)) { - last = it - } - } - return last -} - -/** - * Returns the single element, or throws an exception if the channel is empty or has more than one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.single(): E = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - throw NoSuchElementException("ReceiveChannel is empty.") - val single = iterator.next() - if (iterator.hasNext()) - throw IllegalArgumentException("ReceiveChannel has more than one element.") - return single - } - -/** - * Returns the single element matching the given [predicate], or throws exception if there is no or more than one matching element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.single(predicate: (E) -> Boolean): E { - var single: E? = null - var found = false - consumeEach { - if (predicate(it)) { - if (found) throw IllegalArgumentException("ReceiveChannel contains more than one matching element.") - single = it - found = true - } - } - if (!found) throw NoSuchElementException("ReceiveChannel contains no element matching the predicate.") - @Suppress("UNCHECKED_CAST") - return single as E -} - -/** - * Returns single element, or `null` if the channel is empty or has more than one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) - return null - val single = iterator.next() - if (iterator.hasNext()) - return null - return single - } - -/** - * Returns the single element matching the given [predicate], or `null` if element was not found or more than one element was found. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.singleOrNull(predicate: (E) -> Boolean): E? { - var single: E? = null - var found = false - consumeEach { - if (predicate(it)) { - if (found) return null - single = it - found = true - } - } - if (!found) return null - return single -} - -/** - * Returns a channel containing all elements except first [n] elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - require(n >= 0) { "Requested element count $n is less than zero." } - var remaining: Int = n - if (remaining > 0) - for (e in this@drop) { - remaining-- - if (remaining == 0) - break - } - for (e in this@drop) { - send(e) - } - } - -/** - * Returns a channel containing all elements except first elements that satisfy the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.dropWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@dropWhile) { - if (!predicate(e)) { - send(e) - break - } - } - for (e in this@dropWhile) { - send(e) - } - } - -/** - * Returns a channel containing only elements matching the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@filter) { - if (predicate(e)) send(e) - } - } - -/** - * Returns a channel containing only elements matching the given [predicate]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filterIndexed(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (index: Int, E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@filterIndexed) { - if (predicate(index++, e)) send(e) - } - } - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { - consumeEachIndexed { (index, element) -> - if (predicate(index, element)) destination.add(element) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * @param [predicate] function that takes the index of an element and the element itself - * and returns the result of predicate evaluation on the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexedTo(destination: C, predicate: (index: Int, E) -> Boolean): C { - consumeEachIndexed { (index, element) -> - if (predicate(index, element)) destination.send(element) - } - return destination -} - -/** - * Returns a channel containing all elements not matching the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - filter(context) { !predicate(it) } - -/** - * Returns a channel containing all elements that are not `null`. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -@Suppress("UNCHECKED_CAST") -public fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> = - filter { it != null } as ReceiveChannel<E> - -/** - * Appends all elements that are not `null` to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { - consumeEach { - if (it != null) destination.add(it) - } - return destination -} - -/** - * Appends all elements that are not `null` to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { - consumeEach { - if (it != null) destination.send(it) - } - return destination -} - -/** - * Appends all elements not matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (!predicate(it)) destination.add(it) - } - return destination -} - -/** - * Appends all elements not matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterNotTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (!predicate(it)) destination.send(it) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (predicate(it)) destination.add(it) - } - return destination -} - -/** - * Appends all elements matching the given [predicate] to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterTo(destination: C, predicate: (E) -> Boolean): C { - consumeEach { - if (predicate(it)) destination.send(it) - } - return destination -} - -/** - * Returns a channel containing first [n] elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - if (n == 0) return@produce - require(n >= 0) { "Requested element count $n is less than zero." } - var remaining: Int = n - for (e in this@take) { - send(e) - remaining-- - if (remaining == 0) - return@produce - } - } - -/** - * Returns a channel containing first elements satisfying the given [predicate]. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.takeWhile(context: CoroutineContext = Dispatchers.Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@takeWhile) { - if (!predicate(e)) return@produce - send(e) - } - } - -/** - * Returns a [Map] containing key-value pairs provided by [transform] function - * applied to elements of the given channel. - * - * If any of two pairs would have the same key the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.associate(transform: (E) -> Pair<K, V>): Map<K, V> = - associateTo(LinkedHashMap(), transform) - -/** - * Returns a [Map] containing the elements from the given channel indexed by the key - * returned from [keySelector] function applied to each element. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K> ReceiveChannel<E>.associateBy(keySelector: (E) -> K): Map<K, E> = - associateByTo(LinkedHashMap(), keySelector) - -/** - * Returns a [Map] containing the values provided by [valueTransform] and indexed by [keySelector] functions applied to elements of the given channel. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The returned map preserves the entry iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.associateBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, V> = - associateByTo(LinkedHashMap(), keySelector, valueTransform) - -/** - * Populates and returns the [destination] mutable map with key-value pairs, - * where key is provided by the [keySelector] function applied to each element of the given channel - * and value is the element itself. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, M : MutableMap<in K, in E>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K): M { - consumeEach { - destination.put(keySelector(it), it) - } - return destination -} - -/** - * Populates and returns the [destination] mutable map with key-value pairs, - * where key is provided by the [keySelector] function and - * and value is provided by the [valueTransform] function applied to elements of the given channel. - * - * If any two elements would have the same key returned by [keySelector] the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { - consumeEach { - destination.put(keySelector(it), valueTransform(it)) - } - return destination -} - -/** - * Populates and returns the [destination] mutable map with key-value pairs - * provided by [transform] function applied to each element of the given channel. - * - * If any of two pairs would have the same key the last one gets added to the map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, in V>> ReceiveChannel<E>.associateTo(destination: M, transform: (E) -> Pair<K, V>): M { - consumeEach { - destination += transform(it) - } - return destination -} - -/** - * Send each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C { - consumeEach { - destination.send(it) - } - return destination -} - -/** - * Appends all elements to the given [destination] collection. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C { - consumeEach { - destination.add(it) - } - return destination -} - -/** - * Returns a [Map] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> = - toMap(LinkedHashMap()) - -/** - * Returns a [MutableMap] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M { - consumeEach { - destination += it - } - return destination -} - -/** - * Returns a [MutableList] filled with all elements of this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> = - toCollection(ArrayList()) - -/** - * Returns a [Set] of all elements. - * - * The returned set preserves the element iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> = - this.toMutableSet() - -/** - * Returns a single channel of all elements from results of [transform] function being invoked on each element of original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.flatMap(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - for (e in this@flatMap) { - transform(e).toChannel(this) - } - } - -/** - * Groups elements of the original channel by the key returned by the given [keySelector] function - * applied to each element and returns a map where each group key is associated with a list of corresponding elements. - * - * The returned map preserves the entry iteration order of the keys produced from the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K> ReceiveChannel<E>.groupBy(keySelector: (E) -> K): Map<K, List<E>> = - groupByTo(LinkedHashMap(), keySelector) - -/** - * Groups values returned by the [valueTransform] function applied to each element of the original channel - * by the key returned by the given [keySelector] function applied to the element - * and returns a map where each group key is associated with a list of corresponding values. - * - * The returned map preserves the entry iteration order of the keys produced from the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V> ReceiveChannel<E>.groupBy(keySelector: (E) -> K, valueTransform: (E) -> V): Map<K, List<V>> = - groupByTo(LinkedHashMap(), keySelector, valueTransform) - -/** - * Groups elements of the original channel by the key returned by the given [keySelector] function - * applied to each element and puts to the [destination] map each group key associated with a list of corresponding elements. - * - * @return The [destination] map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, M : MutableMap<in K, MutableList<E>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K): M { - consumeEach { - val key = keySelector(it) - val list = destination.getOrPut(key) { ArrayList() } - list.add(it) - } - return destination -} - -/** - * Groups values returned by the [valueTransform] function applied to each element of the original channel - * by the key returned by the given [keySelector] function applied to the element - * and puts to the [destination] map each group key associated with a list of corresponding values. - * - * @return The [destination] map. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, K, V, M : MutableMap<in K, MutableList<V>>> ReceiveChannel<E>.groupByTo(destination: M, keySelector: (E) -> K, valueTransform: (E) -> V): M { - consumeEach { - val key = keySelector(it) - val list = destination.getOrPut(key) { ArrayList() } - list.add(valueTransform(it)) - } - return destination -} - -/** - * Returns a channel containing the results of applying the given [transform] function - * to each element in the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - consumeEach { - send(transform(it)) - } - } - -/** - * Returns a channel containing the results of applying the given [transform] function - * to each element and its index in the original channel. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R> ReceiveChannel<E>.mapIndexed(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R): ReceiveChannel<R> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@mapIndexed) { - send(transform(index++, e)) - } - } - -/** - * Returns a channel containing only the non-null results of applying the given [transform] function - * to each element and its index in the original channel. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (index: Int, E) -> R?): ReceiveChannel<R> = - mapIndexed(context, transform).filterNotNull() - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends only the non-null results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { - consumeEachIndexed { (index, element) -> - transform(index, element)?.let { destination.add(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends only the non-null results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedNotNullTo(destination: C, transform: (index: Int, E) -> R?): C { - consumeEachIndexed { (index, element) -> - transform(index, element)?.let { destination.send(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends the results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { - var index = 0 - consumeEach { - destination.add(transform(index++, it)) - } - return destination -} - -/** - * Applies the given [transform] function to each element and its index in the original channel - * and appends the results to the given [destination]. - * @param [transform] function that takes the index of an element and the element itself - * and returns the result of the transform applied to the element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapIndexedTo(destination: C, transform: (index: Int, E) -> R): C { - var index = 0 - consumeEach { - destination.send(transform(index++, it)) - } - return destination -} - -/** - * Returns a channel containing only the non-null results of applying the given [transform] function - * to each element in the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R : Any> ReceiveChannel<E>.mapNotNull(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R?): ReceiveChannel<R> = - map(context, transform).filterNotNull() - -/** - * Applies the given [transform] function to each element in the original channel - * and appends only the non-null results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : MutableCollection<in R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C { - consumeEach { - transform(it)?.let { destination.add(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element in the original channel - * and appends only the non-null results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Any, C : SendChannel<R>> ReceiveChannel<E>.mapNotNullTo(destination: C, transform: (E) -> R?): C { - consumeEach { - transform(it)?.let { destination.send(it) } - } - return destination -} - -/** - * Applies the given [transform] function to each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : MutableCollection<in R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C { - consumeEach { - destination.add(transform(it)) - } - return destination -} - -/** - * Applies the given [transform] function to each element of the original channel - * and appends the results to the given [destination]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R, C : SendChannel<R>> ReceiveChannel<E>.mapTo(destination: C, transform: (E) -> R): C { - consumeEach { - destination.send(transform(it)) - } - return destination -} - -/** - * Returns a channel of [IndexedValue] for each element of the original channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> = - GlobalScope.produce(context, onCompletion = consumes()) { - var index = 0 - for (e in this@withIndex) { - send(IndexedValue(index++, e)) - } - } - -/** - * Returns a channel containing only distinct elements from the given channel. - * - * The elements in the resulting channel are in the same order as they were in the source channel. - * - * The operation is _intermediate_ and _stateful_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> = - this.distinctBy { it } - -/** - * Returns a channel containing only elements from the given channel - * having distinct keys returned by the given [selector] function. - * - * The elements in the resulting channel are in the same order as they were in the source channel. - * - * The operation is _intermediate_ and _stateful_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, K> ReceiveChannel<E>.distinctBy(context: CoroutineContext = Dispatchers.Unconfined, selector: suspend (E) -> K): ReceiveChannel<E> = - GlobalScope.produce(context, onCompletion = consumes()) { - val keys = HashSet<K>() - for (e in this@distinctBy) { - val k = selector(e) - if (k !in keys) { - send(e) - keys += k - } - } - } - -/** - * Returns a mutable set containing all distinct elements from the given channel. - * - * The returned set preserves the element iteration order of the original channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> = - toCollection(LinkedHashSet()) - -/** - * Returns `true` if all elements match the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.all(predicate: (E) -> Boolean): Boolean { - consumeEach { - if (!predicate(it)) return false - } - return true -} - -/** - * Returns `true` if channel has at least one element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.any(): Boolean = - consume { - return iterator().hasNext() - } - -/** - * Returns `true` if at least one element matches the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.any(predicate: (E) -> Boolean): Boolean { - consumeEach { - if (predicate(it)) return true - } - return false -} - -/** - * Returns the number of elements in this channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.count(): Int { - var count = 0 - consumeEach { count++ } - return count -} - -/** - * Returns the number of elements matching the given [predicate]. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.count(predicate: (E) -> Boolean): Int { - var count = 0 - consumeEach { - if (predicate(it)) count++ - } - return count -} - -/** - * Accumulates value starting with [initial] value and applying [operation] from left to right to current accumulator value and each element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R> ReceiveChannel<E>.fold(initial: R, operation: (acc: R, E) -> R): R { - var accumulator = initial - consumeEach { - accumulator = operation(accumulator, it) - } - return accumulator -} - -/** - * Accumulates value starting with [initial] value and applying [operation] from left to right - * to current accumulator value and each element with its index in the original channel. - * @param [operation] function that takes the index of an element, current accumulator value - * and the element itself, and calculates the next accumulator value. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R> ReceiveChannel<E>.foldIndexed(initial: R, operation: (index: Int, acc: R, E) -> R): R { - var index = 0 - var accumulator = initial - consumeEach { - accumulator = operation(index++, accumulator, it) - } - return accumulator -} - -/** - * Returns the first element yielding the largest value of the given function or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.maxBy(selector: (E) -> R): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var maxElem = iterator.next() - var maxValue = selector(maxElem) - while (iterator.hasNext()) { - val e = iterator.next() - val v = selector(e) - if (maxValue < v) { - maxElem = e - maxValue = v - } - } - return maxElem - } - -/** - * Returns the first element having the largest value according to the provided [comparator] or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var max = iterator.next() - while (iterator.hasNext()) { - val e = iterator.next() - if (comparator.compare(max, e) < 0) max = e - } - return max - } - -/** - * Returns the first element yielding the smallest value of the given function or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E, R : Comparable<R>> ReceiveChannel<E>.minBy(selector: (E) -> R): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var minElem = iterator.next() - var minValue = selector(minElem) - while (iterator.hasNext()) { - val e = iterator.next() - val v = selector(e) - if (minValue > v) { - minElem = e - minValue = v - } - } - return minElem - } - -/** - * Returns the first element having the smallest value according to the provided [comparator] or `null` if there are no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? = - consume { - val iterator = iterator() - if (!iterator.hasNext()) return null - var min = iterator.next() - while (iterator.hasNext()) { - val e = iterator.next() - if (comparator.compare(min, e) > 0) min = e - } - return min - } - -/** - * Returns `true` if the channel has no elements. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend fun <E> ReceiveChannel<E>.none(): Boolean = - consume { - return !iterator().hasNext() - } - -/** - * Returns `true` if no elements match the given [predicate]. + * Returns a [List] containing all elements. * * The operation is _terminal_. * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.none(predicate: (E) -> Boolean): Boolean { +@OptIn(ExperimentalStdlibApi::class) +public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList { consumeEach { - if (predicate(it)) return false + add(it) } - return true } /** - * Accumulates value starting with the first element and applying [operation] from left to right to current accumulator value and each element. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <S, E : S> ReceiveChannel<E>.reduce(operation: (acc: S, E) -> S): S = - consume { - val iterator = this.iterator() - if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") - var accumulator: S = iterator.next() - while (iterator.hasNext()) { - accumulator = operation(accumulator, iterator.next()) - } - return accumulator - } - -/** - * Accumulates value starting with the first element and applying [operation] from left to right - * to current accumulator value and each element with its index in the original channel. - * @param [operation] function that takes the index of an element, current accumulator value - * and the element itself and calculates the next accumulator value. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. + * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. * * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <S, E : S> ReceiveChannel<E>.reduceIndexed(operation: (index: Int, acc: S, E) -> S): S = +@ObsoleteCoroutinesApi +public suspend inline fun <E> BroadcastChannel<E>.consumeEach(action: (E) -> Unit): Unit = consume { - val iterator = this.iterator() - if (!iterator.hasNext()) throw UnsupportedOperationException("Empty channel can't be reduced.") - var index = 1 - var accumulator: S = iterator.next() - while (iterator.hasNext()) { - accumulator = operation(index++, accumulator, iterator.next()) - } - return accumulator - } - -/** - * Returns the sum of all values produced by [selector] function applied to each element in the channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.sumBy(selector: (E) -> Int): Int { - var sum = 0 - consumeEach { - sum += selector(it) - } - return sum -} - -/** - * Returns the sum of all values produced by [selector] function applied to each element in the channel. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.sumByDouble(selector: (E) -> Double): Double { - var sum = 0.0 - consumeEach { - sum += selector(it) + for (element in this) action(element) } - return sum -} -/** - * Returns an original collection containing all the non-`null` elements, throwing an [IllegalArgumentException] if there are any `null` elements. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> = - map { it ?: throw IllegalArgumentException("null element found in $this.") } -/** - * Splits the original channel into pair of lists, - * where *first* list contains elements for which [predicate] yielded `true`, - * while *second* list contains elements for which [predicate] yielded `false`. - * - * The operation is _terminal_. - * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public suspend inline fun <E> ReceiveChannel<E>.partition(predicate: (E) -> Boolean): Pair<List<E>, List<E>> { - val first = ArrayList<E>() - val second = ArrayList<E>() - consumeEach { - if (predicate(it)) { - first.add(it) - } else { - second.add(it) - } - } - return Pair(first, second) +@PublishedApi +internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) { + cancel(cause?.let { + it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) + }) } -/** - * Returns a channel of pairs built from elements of both channels with same indexes. - * Resulting channel has length of shortest input channel. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][ReceiveChannel.consume] all elements of both the original [ReceiveChannel] and the `other` one. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> = - zip(other) { t1, t2 -> t1 to t2 } - -/** - * Returns a channel of values built from elements of both collections with same indexes using provided [transform]. Resulting channel has length of shortest input channels. - * - * The operation is _intermediate_ and _stateless_. - * This function [consumes][consume] all elements of both the original [ReceiveChannel] and the `other` one. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - */ -@Deprecated( - message = "Channel operators are deprecated in favour of Flow and will be removed in 1.4.x", - level = DeprecationLevel.ERROR -) -public fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Dispatchers.Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> = - GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { - val otherIterator = other.iterator() - this@zip.consumeEach { element1 -> - if (!otherIterator.hasNext()) return@consumeEach - val element2 = otherIterator.next() - send(transform(element1, element2)) - } - } diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index f1d092e3..b768d7c3 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -17,7 +17,7 @@ import kotlin.jvm.* * Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received, * while previously sent elements **are lost**. * Every subscriber immediately receives the most recently sent element. - * Sender to this broadcast channel never suspends and [offer] always returns `true`. + * Sender to this broadcast channel never suspends and [trySend] always succeeds. * * A secondary constructor can be used to create an instance of this class that already holds a value. * This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation. @@ -26,10 +26,10 @@ import kotlin.jvm.* * [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the * number of subscribers. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Creates an instance of this class that already holds a value. @@ -94,7 +94,6 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { } public override val isClosedForSend: Boolean get() = _state.value is Closed - public override val isFull: Boolean get() = false @Suppress("UNCHECKED_CAST") public override fun openSubscription(): ReceiveChannel<E> { @@ -229,12 +228,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Sends the value to all subscribed receives and stores this value as the most recent state for - * future subscribers. This implementation always returns `true`. - * It throws exception if the channel [isClosedForSend] (see [close] for details). + * future subscribers. This implementation always returns either successful result + * or closed with an exception. */ - public override fun offer(element: E): Boolean { - offerInternal(element)?.let { throw it.sendException } - return true + public override fun trySend(element: E): ChannelResult<Unit> { + offerInternal(element)?.let { return ChannelResult.closed(it.sendException) } + return ChannelResult.success(Unit) } @Suppress("UNCHECKED_CAST") diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 0e686447..f7f60cf9 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* /** - * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, + * Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations, * so that the receiver always gets the most recently sent element. * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, * while previously sent elements **are lost**. - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation. */ @@ -123,6 +123,7 @@ internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredEleme undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one } + @Suppress("UNCHECKED_CAST") private fun updateValueLocked(element: Any?): UndeliveredElementException? { val old = value val undeliveredElementException = if (old === EMPTY) null else diff --git a/kotlinx-coroutines-core/common/src/channels/Deprecated.kt b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt new file mode 100644 index 00000000..2b9ed42d --- /dev/null +++ b/kotlinx-coroutines-core/common/src/channels/Deprecated.kt @@ -0,0 +1,478 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:JvmMultifileClass +@file:JvmName("ChannelsKt") +@file:Suppress("unused") + +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.jvm.* + +/** @suppress **/ +@PublishedApi // Binary compatibility +internal fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = + { cause: Throwable? -> + var exception: Throwable? = null + for (channel in channels) + try { + channel.cancelConsumed(cause) + } catch (e: Throwable) { + if (exception == null) { + exception = e + } else { + exception.addSuppressedThrowable(e) + } + } + exception?.let { throw it } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.elementAt(index: Int): E = consume { + if (index < 0) + throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") + var count = 0 + for (element in this) { + @Suppress("UNUSED_CHANGED_VALUE") // KT-47628 + if (index == count++) + return element + } + throw IndexOutOfBoundsException("ReceiveChannel doesn't contain element at index $index.") +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.elementAtOrNull(index: Int): E? = + consume { + if (index < 0) + return null + var count = 0 + for (element in this) { + if (index == count++) + return element + } + return null + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.first(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + return iterator.next() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.firstOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + return iterator.next() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.indexOf(element: E): Int { + var index = 0 + consumeEach { + if (element == it) + return index + index++ + } + return -1 +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.last(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.lastIndexOf(element: E): Int { + var lastIndex = -1 + var index = 0 + consumeEach { + if (element == it) + lastIndex = index + index++ + } + return lastIndex +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.lastOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + var last = iterator.next() + while (iterator.hasNext()) + last = iterator.next() + return last + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.single(): E = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + throw NoSuchElementException("ReceiveChannel is empty.") + val single = iterator.next() + if (iterator.hasNext()) + throw IllegalArgumentException("ReceiveChannel has more than one element.") + return single + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.singleOrNull(): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) + return null + val single = iterator.next() + if (iterator.hasNext()) + return null + return single + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.drop(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + require(n >= 0) { "Requested element count $n is less than zero." } + var remaining: Int = n + if (remaining > 0) + for (e in this@drop) { + remaining-- + if (remaining == 0) + break + } + for (e in this@drop) { + send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.dropWhile( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@dropWhile) { + if (!predicate(e)) { + send(e) + break + } + } + for (e in this@dropWhile) { + send(e) + } + } + +@PublishedApi +internal fun <E> ReceiveChannel<E>.filter( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@filter) { + if (predicate(e)) send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.filterIndexed( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (index: Int, E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@filterIndexed) { + if (predicate(index++, e)) send(e) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.filterNot( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + filter(context) { !predicate(it) } + +@PublishedApi +@Suppress("UNCHECKED_CAST") +internal fun <E : Any> ReceiveChannel<E?>.filterNotNull(): ReceiveChannel<E> = + filter { it != null } as ReceiveChannel<E> + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E : Any, C : MutableCollection<in E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { + consumeEach { + if (it != null) destination.add(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E : Any, C : SendChannel<E>> ReceiveChannel<E?>.filterNotNullTo(destination: C): C { + consumeEach { + if (it != null) destination.send(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.take(n: Int, context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + if (n == 0) return@produce + require(n >= 0) { "Requested element count $n is less than zero." } + var remaining: Int = n + for (e in this@take) { + send(e) + remaining-- + if (remaining == 0) + return@produce + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.takeWhile( + context: CoroutineContext = Dispatchers.Unconfined, + predicate: suspend (E) -> Boolean +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@takeWhile) { + if (!predicate(e)) return@produce + send(e) + } + } + +@PublishedApi +internal suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.toChannel(destination: C): C { + consumeEach { + destination.send(it) + } + return destination +} + +@PublishedApi +internal suspend fun <E, C : MutableCollection<in E>> ReceiveChannel<E>.toCollection(destination: C): C { + consumeEach { + destination.add(it) + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <K, V> ReceiveChannel<Pair<K, V>>.toMap(): Map<K, V> = + toMap(LinkedHashMap()) + +@PublishedApi +internal suspend fun <K, V, M : MutableMap<in K, in V>> ReceiveChannel<Pair<K, V>>.toMap(destination: M): M { + consumeEach { + destination += it + } + return destination +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.toMutableList(): MutableList<E> = + toCollection(ArrayList()) + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.toSet(): Set<E> = + this.toMutableSet() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R> ReceiveChannel<E>.flatMap( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> ReceiveChannel<R> +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + for (e in this@flatMap) { + transform(e).toChannel(this) + } + } + +@PublishedApi +internal fun <E, R> ReceiveChannel<E>.map( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> R +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + consumeEach { + send(transform(it)) + } + } + +@PublishedApi +internal fun <E, R> ReceiveChannel<E>.mapIndexed( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (index: Int, E) -> R +): ReceiveChannel<R> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@mapIndexed) { + send(transform(index++, e)) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R : Any> ReceiveChannel<E>.mapIndexedNotNull( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (index: Int, E) -> R? +): ReceiveChannel<R> = + mapIndexed(context, transform).filterNotNull() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E, R : Any> ReceiveChannel<E>.mapNotNull( + context: CoroutineContext = Dispatchers.Unconfined, + transform: suspend (E) -> R? +): ReceiveChannel<R> = + map(context, transform).filterNotNull() + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.withIndex(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<IndexedValue<E>> = + GlobalScope.produce(context, onCompletion = consumes()) { + var index = 0 + for (e in this@withIndex) { + send(IndexedValue(index++, e)) + } + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E> ReceiveChannel<E>.distinct(): ReceiveChannel<E> = + this.distinctBy { it } + +@PublishedApi +internal fun <E, K> ReceiveChannel<E>.distinctBy( + context: CoroutineContext = Dispatchers.Unconfined, + selector: suspend (E) -> K +): ReceiveChannel<E> = + GlobalScope.produce(context, onCompletion = consumes()) { + val keys = HashSet<K>() + for (e in this@distinctBy) { + val k = selector(e) + if (k !in keys) { + send(e) + keys += k + } + } + } + +@PublishedApi +internal suspend fun <E> ReceiveChannel<E>.toMutableSet(): MutableSet<E> = + toCollection(LinkedHashSet()) + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.any(): Boolean = + consume { + return iterator().hasNext() + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.count(): Int { + var count = 0 + consumeEach { count++ } + return count +} + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.maxWith(comparator: Comparator<in E>): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) return null + var max = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(max, e) < 0) max = e + } + return max + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.minWith(comparator: Comparator<in E>): E? = + consume { + val iterator = iterator() + if (!iterator.hasNext()) return null + var min = iterator.next() + while (iterator.hasNext()) { + val e = iterator.next() + if (comparator.compare(min, e) > 0) min = e + } + return min + } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public suspend fun <E> ReceiveChannel<E>.none(): Boolean = + consume { + return !iterator().hasNext() + } + +/** @suppress **/ +@Deprecated(message = "Left for binary compatibility", level = DeprecationLevel.HIDDEN) +public fun <E : Any> ReceiveChannel<E?>.requireNoNulls(): ReceiveChannel<E> = + map { it ?: throw IllegalArgumentException("null element found in $this.") } + +/** @suppress **/ +@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN) +public infix fun <E, R> ReceiveChannel<E>.zip(other: ReceiveChannel<R>): ReceiveChannel<Pair<E, R>> = + zip(other) { t1, t2 -> t1 to t2 } + +@PublishedApi // Binary compatibility +internal fun <E, R, V> ReceiveChannel<E>.zip( + other: ReceiveChannel<R>, + context: CoroutineContext = Dispatchers.Unconfined, + transform: (a: E, b: R) -> V +): ReceiveChannel<V> = + GlobalScope.produce(context, onCompletion = consumesAll(this, other)) { + val otherIterator = other.iterator() + this@zip.consumeEach { element1 -> + if (!otherIterator.hasNext()) return@consumeEach + val element2 = otherIterator.next() + send(transform(element1, element2)) + } + } + +@PublishedApi // Binary compatibility +internal fun ReceiveChannel<*>.consumes(): CompletionHandler = { cause: Throwable? -> + cancelConsumed(cause) +} diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 83175273..b5f607b2 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.* /** * Channel with linked-list buffer of a unlimited capacity (limited only by available memory). - * Sender to this channel never suspends and [offer] always returns `true`. + * Sender to this channel never suspends and [trySend] always succeeds. * * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation. * diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 3c183587..3342fb6e 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce( internal open class ProducerCoroutine<E>( parentContext: CoroutineContext, channel: Channel<E> -) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> { +) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> { override val isActive: Boolean get() = super.isActive diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 10dd3aef..66b55a90 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -234,7 +234,7 @@ public fun <T> flowViaChannel( * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Examples of usage: @@ -261,7 +261,6 @@ public fun <T> flowViaChannel( * } * ``` */ -@ExperimentalCoroutinesApi public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = ChannelFlowBuilder(block) @@ -290,10 +289,10 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. * - * Adjacent applications of [callbackFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * - * Example of usage that converts a multi-short callback API to a flow. + * Example of usage that converts a multi-shot callback API to a flow. * For single-shot callbacks use [suspendCancellableCoroutine]. * * ``` @@ -302,11 +301,10 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() * override fun onNextValue(value: T) { * // To avoid blocking you can configure channel capacity using * // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill - * try { - * sendBlocking(value) - * } catch (e: Exception) { - * // Handle exception from the channel: failure in flow or premature closing - * } + * trySendBlocking(value) + * .onFailure { throwable -> + * // Downstream has been cancelled or failed, can log here + * } * } * override fun onApiError(cause: Throwable) { * cancel(CancellationException("API Error", cause)) @@ -327,7 +325,6 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() * > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even * > concurrently with the call of the callback. */ -@ExperimentalCoroutinesApi public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block) // ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index e883c3b4..5cc8ad8b 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -30,7 +30,8 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni emitAllImpl(channel, consume = true) private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) { - // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed". + ensureActive() + // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching". // It has smaller and more efficient spilled state which also allows to implement a manual kludge to // fix retention of the last emitted value. // See https://youtrack.jetbrains.com/issue/KT-16222 @@ -47,9 +48,9 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, // L$1 <- channel // L$2 <- cause // L$3 <- this$run (actually equal to this) - val result = run { channel.receiveOrClosed() } + val result = run { channel.receiveCatching() } if (result.isClosed) { - result.closeCause?.let { throw it } + result.exceptionOrNull()?.let { throw it } break // returns normally when result.closeCause == null } // result is spilled here to the coroutine state and retained after the call, even though @@ -58,7 +59,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, // L$1 <- channel // L$2 <- cause // L$3 <- result - emit(result.value) + emit(result.getOrThrow()) } } catch (e: Throwable) { cause = e @@ -133,17 +134,12 @@ private class ChannelAsFlow<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = + override fun dropChannelOperators(): Flow<T> = ChannelAsFlow(channel, consume) override suspend fun collectTo(scope: ProducerScope<T>) = SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll - override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - markConsumed() // fail fast on repeated attempt to collect it - return super.broadcastImpl(scope, start) - } - override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> { markConsumed() // fail fast on repeated attempt to collect it return if (capacity == Channel.OPTIONAL_CHANNEL) { @@ -173,22 +169,16 @@ private class ChannelAsFlow<T>( * 2) Flow consumer completes normally when the original channel completes (~is closed) normally. * 3) If the flow consumer fails with an exception, subscription is cancelled. */ -@FlowPreview +@Deprecated( + level = DeprecationLevel.WARNING, + message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " + + "in the favour of StateFlow and SharedFlow" +) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0 public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { emitAll(openSubscription()) } /** - * Creates a [broadcast] coroutine that collects the given flow. - * - * This transformation is **stateful**, it launches a [broadcast] coroutine - * that collects the given flow and thus resulting channel should be properly closed or cancelled. - * - * A channel with [default][Channel.Factory.BUFFERED] buffer size is created. - * Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than - * default and to control what happens when data is produced faster than it is consumed, - * that is to control backpressure behavior. - * * ### Deprecated * * **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows. @@ -202,13 +192,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { @Deprecated( message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel", replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"), - level = DeprecationLevel.WARNING -) + level = DeprecationLevel.ERROR +) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview) public fun <T> Flow<T>.broadcastIn( scope: CoroutineScope, start: CoroutineStart = CoroutineStart.LAZY -): BroadcastChannel<T> = - asChannelFlow().broadcastImpl(scope, start) +): BroadcastChannel<T> { + // Backwards compatibility with operator fusing + val channelFlow = asChannelFlow() + val capacity = when (channelFlow.onBufferOverflow) { + BufferOverflow.SUSPEND -> channelFlow.produceCapacity + BufferOverflow.DROP_OLDEST -> Channel.CONFLATED + BufferOverflow.DROP_LATEST -> + throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") + } + return scope.broadcast(channelFlow.context, capacity = capacity, start = start) { + collect { value -> + send(value) + } + } +} /** * Creates a [produce] coroutine that collects the given flow. diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index 57749523..6278081a 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -344,13 +344,13 @@ public fun <T> Flow<T>.concatWith(value: T): Flow<T> = noImpl() /** * Flow analogue of `concatWith` is [onCompletion]. - * Use `onCompletion { emitAll(other) }`. + * Use `onCompletion { if (it == null) emitAll(other) }`. * @suppress */ @Deprecated( level = DeprecationLevel.ERROR, - message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { emitAll(other) }'", - replaceWith = ReplaceWith("onCompletion { emitAll(other) }") + message = "Flow analogue of 'concatWith' is 'onCompletion'. Use 'onCompletion { if (it == null) emitAll(other) }'", + replaceWith = ReplaceWith("onCompletion { if (it == null) emitAll(other) }") ) public fun <T> Flow<T>.concatWith(other: Flow<T>): Flow<T> = noImpl() @@ -404,7 +404,7 @@ public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest( * @suppress */ @Deprecated( - level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0 + level = DeprecationLevel.ERROR, // since 1.3.0, error in 1.5.0 message = "Use 'onStart { delay(timeMillis) }'", replaceWith = ReplaceWith("onStart { delay(timeMillis) }") ) @@ -416,7 +416,7 @@ public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = onStart { delay(ti * @suppress */ @Deprecated( - level = DeprecationLevel.WARNING, // since 1.3.0, error in 1.4.0 + level = DeprecationLevel.ERROR, // since 1.3.0, error in 1.5.0 message = "Use 'onEach { delay(timeMillis) }'", replaceWith = ReplaceWith("onEach { delay(timeMillis) }") ) @@ -430,7 +430,7 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = onEach { delay(tim public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flatMapLatest(transform) @Deprecated( - level = DeprecationLevel.WARNING, // Since 1.3.8, was experimental when deprecated + level = DeprecationLevel.ERROR, // Warning since 1.3.8, was experimental when deprecated, ERROR since 1.5.0 message = "'scanReduce' was renamed to 'runningReduce' to be consistent with Kotlin standard library", replaceWith = ReplaceWith("runningReduce(operation)") ) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 048cd8b3..9bcf088e 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -33,7 +33,7 @@ import kotlin.native.concurrent.* * * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go. * For example, the following class encapsulates an event bus that distributes events to all subscribers - * in a _rendezvous_ manner, suspending until all subscribers process each event: + * in a _rendezvous_ manner, suspending until all subscribers receive emitted event: * * ``` * class EventBus { @@ -68,6 +68,13 @@ import kotlin.native.concurrent.* * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend. * + * **Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept + * the new value.** In the absence of subscribers only the most recent `replay` values are stored and the buffer + * overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never + * suspends despite [BufferOverflow.SUSPEND] option and [BufferOverflow.DROP_LATEST] option does not have effect either. + * Essentially, the behavior in the absence of subscribers is always similar to [BufferOverflow.DROP_OLDEST], + * but the buffer is just of `replay` size (without any `extraBufferCapacity`). + * * ### Unbuffered shared flow * * A default implementation of a shared flow that is created with `MutableSharedFlow()` constructor function @@ -80,7 +87,7 @@ import kotlin.native.concurrent.* * ### SharedFlow vs BroadcastChannel * * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel] - * and is designed to completely replace `BroadcastChannel` in the future. + * and is designed to completely replace it. * It has the following important differences: * * * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows @@ -92,7 +99,7 @@ import kotlin.native.concurrent.* * * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)` * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay - * values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls + * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators. * * ### Concurrency @@ -221,9 +228,12 @@ public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> { * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero). * @param extraBufferCapacity the number of values buffered in addition to `replay`. * [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero). - * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to - * [suspending][BufferOverflow.SUSPEND] attempts to [emit][MutableSharedFlow.emit] a value, - * supported only when `replay > 0` or `extraBufferCapacity > 0`). + * @param onBufferOverflow configures an [emit][MutableSharedFlow.emit] action on buffer overflow. Optional, defaults to + * [suspending][BufferOverflow.SUSPEND] attempts to emit a value. + * Values other than [BufferOverflow.SUSPEND] are supported only when `replay > 0` or `extraBufferCapacity > 0`. + * **Buffer overflow can happen only when there is at least one subscriber that is not ready to accept + * the new value.** In the absence of subscribers only the most recent [replay] values are stored and + * the buffer overflow behavior is never triggered and has no effect. */ @Suppress("FunctionName", "UNCHECKED_CAST") public fun <T> MutableSharedFlow( diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index ce568fb4..f4c6f2ee 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -140,7 +140,7 @@ public fun SharingStarted.Companion.WhileSubscribed( stopTimeout: Duration = Duration.ZERO, replayExpiration: Duration = Duration.INFINITE ): SharingStarted = - StartedWhileSubscribed(stopTimeout.toLongMilliseconds(), replayExpiration.toLongMilliseconds()) + StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds) // -------------------------------- implementation -------------------------------- diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index fc8aa02f..9e82e787 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -21,7 +21,7 @@ import kotlin.native.concurrent.* * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_. * * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with - * the initial value. The value of mutable state flow can be updated by setting its [value] property. + * the initial value. The value of mutable state flow can be updated by setting its [value] property. * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates, * but always collects the most recently emitted value. * @@ -37,7 +37,7 @@ import kotlin.native.concurrent.* * val counter = _counter.asStateFlow() // publicly exposed as read-only state flow * * fun inc() { - * _counter.value++ + * _counter.update { count -> count + 1 } // atomic, safe for concurrent use * } * } * ``` @@ -88,7 +88,7 @@ import kotlin.native.concurrent.* * ### StateFlow vs ConflatedBroadcastChannel * * Conceptually, state flow is similar to [ConflatedBroadcastChannel] - * and is designed to completely replace `ConflatedBroadcastChannel` in the future. + * and is designed to completely replace it. * It has the following important differences: * * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows @@ -107,7 +107,7 @@ import kotlin.native.concurrent.* * * To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()` * constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one. - * Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls + * Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls * with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators. * You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value. * @@ -186,6 +186,56 @@ public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> { @Suppress("FunctionName") public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL) +// ------------------------------------ Update methods ------------------------------------ + +/** + * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns the new + * value. + * + * [function] may be evaluated multiple times, if [value] is being concurrently updated. + */ +public inline fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T { + while (true) { + val prevValue = value + val nextValue = function(prevValue) + if (compareAndSet(prevValue, nextValue)) { + return nextValue + } + } +} + +/** + * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns its + * prior value. + * + * [function] may be evaluated multiple times, if [value] is being concurrently updated. + */ +public inline fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T { + while (true) { + val prevValue = value + val nextValue = function(prevValue) + if (compareAndSet(prevValue, nextValue)) { + return prevValue + } + } +} + + +/** + * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value. + * + * [function] may be evaluated multiple times, if [value] is being concurrently updated. + */ +public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) { + while (true) { + val prevValue = value + val nextValue = function(prevValue) + if (compareAndSet(prevValue, nextValue)) { + return + } + } +} + // ------------------------------------ Implementation ------------------------------------ @SharedImmutable @@ -366,10 +416,7 @@ private class StateFlowImpl<T>( } internal fun MutableStateFlow<Int>.increment(delta: Int) { - while (true) { // CAS loop - val current = value - if (compareAndSet(current, current + delta)) return - } + update { it + delta } } internal fun <T> StateFlow<T>.fuseStateFlow( diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index bf82cf9a..0efe5f86 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> { /** * Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other. * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting - * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * methods like ability to [produceIn] the corresponding flow, thus making it * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** @@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>( internal val collectToFun: suspend (ProducerScope<T>) -> Unit get() = { collectTo(it) } - private val produceCapacity: Int + internal val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity /** @@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>( protected abstract suspend fun collectTo(scope: ProducerScope<T>) - // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow. - // BroadcastChannel does not support onBufferOverflow beyond simple conflation - public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - val broadcastCapacity = when (onBufferOverflow) { - BufferOverflow.SUSPEND -> produceCapacity - BufferOverflow.DROP_OLDEST -> Channel.CONFLATED - BufferOverflow.DROP_LATEST -> - throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") - } - return scope.broadcast(context, broadcastCapacity, start, block = collectToFun) - } - /** * Here we use ATOMIC start for a reason (#1825). * NB: [produceImpl] is used for [flowOn]. @@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = flow + override fun dropChannelOperators(): Flow<T> = flow override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector) diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 7fde0636..c924c090 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -54,7 +54,7 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal( ++currentEpoch // Start batch // The very first receive in epoch should be suspending - var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here + var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here while (true) { val index = element.index // Update values @@ -65,7 +65,7 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal( // Received the second value from the same flow in the same epoch -- bail out if (lastReceivedEpoch[index] == currentEpoch) break lastReceivedEpoch[index] = currentEpoch - element = resultChannel.poll() ?: break + element = resultChannel.tryReceive().getOrNull() ?: break } // Process batch result if there is enough data @@ -129,7 +129,9 @@ internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: sus withContextUndispatched(coroutineContext + collectJob, Unit) { flow.collect { value -> withContextUndispatched(scopeContext, Unit, cnt) { - val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow) + val otherValue = second.receiveCatching().getOrElse { + throw it ?:AbortFlowException(this@unsafeFlow) + } emit(transform(value, NULL.unbox(otherValue))) } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index cbbb4196..04342ed0 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -79,7 +79,7 @@ import kotlin.jvm.* * * ### Operator fusion * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls, @@ -176,7 +176,7 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capaci * * ### Operator fusion * - * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are + * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are * always fused so that only one properly configured channel is used for execution. * **Conflation takes precedence over `buffer()` calls with any other capacity.** * @@ -219,7 +219,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) * * ### Operator fusion * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of @@ -309,6 +309,8 @@ private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlo * 3) It defers the execution of declarative [builder] until the moment of [collection][Flow.collect] similarly * to `Observable.defer`. But it is unexpected because nothing in the name `flowWith` reflects this fact. * 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer. + * + * @suppress */ @FlowPreview @Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4 diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 6381c467..fed5962b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -209,8 +209,7 @@ public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> = private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel - // Note: the actual type is Any, KT-30796 - val values = produce<Any?> { + val values = produce { collect { value -> send(value ?: NULL) } } // Now consume the values @@ -237,14 +236,15 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F lastValue = null // Consume the value } } - // Should be receiveOrClosed when boxing issues are fixed - values.onReceiveOrNull { value -> - if (value == null) { - if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) - lastValue = DONE - } else { - lastValue = value - } + values.onReceiveCatching { value -> + value + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + // If closed normally, emit the latest value + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + lastValue = DONE + } } } } @@ -278,21 +278,21 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> { require(periodMillis > 0) { "Sample period should be positive" } return scopedFlow { downstream -> - val values = produce<Any?>(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 + val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null val ticker = fixedPeriodTicker(periodMillis) while (lastValue !== DONE) { select<Unit> { - values.onReceiveOrNull { - if (it == null) { - ticker.cancel(ChildCancelledException()) - lastValue = DONE - } else { - lastValue = it - } + values.onReceiveCatching { result -> + result + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + ticker.cancel(ChildCancelledException()) + lastValue = DONE + } } // todo: shall be start sampling only when an element arrives or sample aways as here? diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index e0d3aebc..90879a97 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -194,7 +194,15 @@ public fun <T> Flow<T>.onEmpty( } } -private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> { +/* + * 'emitAll' methods call this to fail-fast before starting to collect + * their sources (that may not have any elements for a long time). + */ +internal fun FlowCollector<*>.ensureActive() { + if (this is ThrowingCollector) throw e +} + +internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> { override suspend fun emit(value: Any?) { throw e } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt index 2f7bc358..858c885c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt @@ -2,7 +2,7 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("unused", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") +@file:Suppress("unused", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER", "UNUSED_PARAMETER") package kotlinx.coroutines.flow diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index 04275375..432160f3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -57,7 +57,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R> * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. * * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected @@ -87,7 +87,7 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. */ @ExperimentalCoroutinesApi @@ -111,7 +111,7 @@ public fun <T> Iterable<Flow<T>>.merge(): Flow<T> { * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. */ @ExperimentalCoroutinesApi @@ -126,9 +126,12 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge( * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. * + * When [concurrency] is greater than 1, this operator is [buffered][buffer] by default + * and size of its output buffer can be changed by applying subsequent [buffer] operator. + * * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected * at the same time. By default it is equal to [DEFAULT_CONCURRENCY]. */ diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index fe1d7216..4fa74d8e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -68,7 +68,7 @@ import kotlin.jvm.* * ### Upstream completion and error handling * * **Normal completion of the upstream flow has no effect on subscribers**, and the sharing coroutine continues to run. If a - * a strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special + * strategy like [SharingStarted.WhileSubscribed] is used, then the upstream can get restarted again. If a special * action on upstream completion is needed, then an [onCompletion] operator can be used before the * `shareIn` operator to emit a special value in this case, like this: * @@ -144,8 +144,8 @@ public fun <T> Flow<T>.shareIn( onBufferOverflow = config.onBufferOverflow ) @Suppress("UNCHECKED_CAST") - scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) - return shared.asSharedFlow() + val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) + return ReadonlySharedFlow(shared, job) } private class SharingConfig<T>( @@ -197,7 +197,7 @@ private fun <T> CoroutineScope.launchSharing( shared: MutableSharedFlow<T>, started: SharingStarted, initialValue: T -) { +): Job = launch(context) { // the single coroutine to rule the sharing // Optimize common built-in started strategies when { @@ -230,7 +230,6 @@ private fun <T> CoroutineScope.launchSharing( } } } -} // -------------------------------- stateIn -------------------------------- @@ -303,8 +302,8 @@ public fun <T> Flow<T>.stateIn( ): StateFlow<T> { val config = configureSharing(1) val state = MutableStateFlow(initialValue) - scope.launchSharing(config.context, config.upstream, state, started, initialValue) - return state.asStateFlow() + val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue) + return ReadonlyStateFlow(state, job) } /** @@ -332,7 +331,7 @@ private fun <T> CoroutineScope.launchSharingDeferred( upstream.collect { value -> state?.let { it.value = value } ?: run { state = MutableStateFlow(value).also { - result.complete(it.asStateFlow()) + result.complete(ReadonlyStateFlow(it, coroutineContext.job)) } } } @@ -351,23 +350,27 @@ private fun <T> CoroutineScope.launchSharingDeferred( * Represents this mutable shared flow as a read-only shared flow. */ public fun <T> MutableSharedFlow<T>.asSharedFlow(): SharedFlow<T> = - ReadonlySharedFlow(this) + ReadonlySharedFlow(this, null) /** * Represents this mutable state flow as a read-only state flow. */ public fun <T> MutableStateFlow<T>.asStateFlow(): StateFlow<T> = - ReadonlyStateFlow(this) + ReadonlyStateFlow(this, null) private class ReadonlySharedFlow<T>( - flow: SharedFlow<T> + flow: SharedFlow<T>, + @Suppress("unused") + private val job: Job? // keeps a strong reference to the job (if present) ) : SharedFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> { override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseSharedFlow(context, capacity, onBufferOverflow) } private class ReadonlyStateFlow<T>( - flow: StateFlow<T> + flow: StateFlow<T>, + @Suppress("unused") + private val job: Job? // keeps a strong reference to the job (if present) ) : StateFlow<T> by flow, CancellableFlow<T>, FusibleFlow<T> { override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) = fuseStateFlow(context, capacity, onBufferOverflow) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index d163d9c0..59298864 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -15,7 +15,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlinx.coroutines.flow.unsafeTransform as transform /** - * Returns a flow containing only values of the original flow that matches the given [predicate]. + * Returns a flow containing only values of the original flow that match the given [predicate]. */ public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> if (predicate(value)) return@transform emit(value) @@ -82,9 +82,23 @@ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList() * ``` * will produce `[], [1], [1, 2], [1, 2, 3]]`. + * + * This function is an alias to [runningFold] operator. + */ +@ExperimentalCoroutinesApi +public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation) + +/** + * Folds the given flow with [operation], emitting every intermediate result, including [initial] value. + * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors. + * For example: + * ``` + * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList() + * ``` + * will produce `[], [1], [1, 2], [1, 2, 3]]`. */ @ExperimentalCoroutinesApi -public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow { +public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow { var accumulator: R = initial emit(accumulator) collect { value -> @@ -100,7 +114,7 @@ public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend * * For example: * ``` - * flowOf(1, 2, 3, 4).runningReduce { (v1, v2) -> v1 + v2 }.toList() + * flowOf(1, 2, 3, 4).runningReduce { acc, value -> acc + value }.toList() * ``` * will produce `[1, 3, 6, 10]` */ diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index 42c66296..d26839f9 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -127,5 +127,7 @@ public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) * Collects all the values from the given [flow] and emits them to the collector. * It is a shorthand for `flow.collect { value -> emit(value) }`. */ -@BuilderInference -public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit = flow.collect(this) +public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) { + ensureActive() + flow.collect(this) +} diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index a937adcc..1794c9f4 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -144,3 +144,28 @@ public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T } return result } + +/** + * The terminal operator that returns the last element emitted by the flow. + * + * Throws [NoSuchElementException] if the flow was empty. + */ +public suspend fun <T> Flow<T>.last(): T { + var result: Any? = NULL + collect { + result = it + } + if (result === NULL) throw NoSuchElementException("Expected at least one element") + return result as T +} + +/** + * The terminal operator that returns the last element emitted by the flow or `null` if the flow was empty. + */ +public suspend fun <T> Flow<T>.lastOrNull(): T? { + var result: T? = null + collect { + result = it + } + return result +} diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 0e765838..638ec432 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* +import kotlin.jvm.* import kotlin.native.concurrent.SharedImmutable /** @@ -227,8 +228,8 @@ private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) } } -@Suppress("EXPERIMENTAL_FEATURE_WARNING") // We are using inline class only internally, so it is Ok -internal inline class SegmentOrClosed<S : Segment<S>>(private val value: Any?) { +@JvmInline +internal value class SegmentOrClosed<S : Segment<S>>(private val value: Any?) { val isClosed: Boolean get() = value === CLOSED @Suppress("UNCHECKED_CAST") val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 2874e7d5..c689a381 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -46,40 +46,49 @@ internal class DispatchedContinuation<in T>( * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * - * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. - * AbstractChannel.receive method relies on the fact that the following pattern + * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation. + * In the `getResult`, we have the following code: * ``` - * suspendCancellableCoroutineReusable { cont -> - * val result = pollFastPath() - * if (result != null) cont.resume(result) + * if (trySuspend()) { + * // <- at this moment current continuation can be redispatched and claimed again. + * attachChildToParent() + * releaseClaimedContinuation() * } * ``` - * always succeeds. - * To make it always successful, we actually postpone "reusable" cancellation - * to this phase and set cancellation only at the moment of instantiation. */ private val _reusableCancellableContinuation = atomic<Any?>(null) - public val reusableCancellableContinuation: CancellableContinuationImpl<*>? + private val reusableCancellableContinuation: CancellableContinuationImpl<*>? get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> - public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { + fun isReusable(): Boolean { /* + Invariant: caller.resumeMode.isReusableMode * Reusability control: - * `null` -> no reusability at all, false - * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester. - * Identity check my fail for the following pattern: - * ``` - * loop: - * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle - * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise - * it will leak because it won't be freed by `releaseInterceptedContinuation` - * ``` + * `null` -> no reusability at all, `false` + * anything else -> reusable. */ - val value = _reusableCancellableContinuation.value ?: return false - if (value is CancellableContinuationImpl<*>) return value === requester - return true + return _reusableCancellableContinuation.value != null + } + + /** + * Awaits until previous call to `suspendCancellableCoroutineReusable` will + * stop mutating cached instance + */ + fun awaitReusability() { + _reusableCancellableContinuation.loop { + if (it !== REUSABLE_CLAIMED) return + } + } + + fun release() { + /* + * Called from `releaseInterceptedContinuation`, can be concurrent with + * the code in `getResult` right after `trySuspend` returned `true`, so we have + * to wait for a release here. + */ + awaitReusability() + reusableCancellableContinuation?.detachChild() } /** @@ -103,11 +112,20 @@ internal class DispatchedContinuation<in T>( _reusableCancellableContinuation.value = REUSABLE_CLAIMED return null } + // potentially competing with cancel state is CancellableContinuationImpl<*> -> { if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { return state as CancellableContinuationImpl<T> } } + state === REUSABLE_CLAIMED -> { + // Do nothing, wait until reusable instance will be returned from + // getResult() of a previous `suspendCancellableCoroutineReusable` + } + state is Throwable -> { + // Also do nothing, Throwable can only indicate that the CC + // is in REUSABLE_CLAIMED state, but with postponed cancellation + } else -> error("Inconsistent state $state") } } @@ -127,14 +145,13 @@ internal class DispatchedContinuation<in T>( * * See [CancellableContinuationImpl.getResult]. */ - fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? { _reusableCancellableContinuation.loop { state -> // not when(state) to avoid Intrinsics.equals call when { state === REUSABLE_CLAIMED -> { if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null } - state === null -> return null state is Throwable -> { require(_reusableCancellableContinuation.compareAndSet(state, null)) return state diff --git a/kotlinx-coroutines-core/common/src/internal/InlineList.kt b/kotlinx-coroutines-core/common/src/internal/InlineList.kt index 34c1e893..61bf6d01 100644 --- a/kotlinx-coroutines-core/common/src/internal/InlineList.kt +++ b/kotlinx-coroutines-core/common/src/internal/InlineList.kt @@ -7,14 +7,16 @@ package kotlinx.coroutines.internal import kotlinx.coroutines.assert +import kotlin.jvm.* /* * Inline class that represents a mutable list, but does not allocate an underlying storage * for zero and one elements. * Cannot be parametrized with `List<*>`. */ -internal inline class InlineList<E>(private val holder: Any? = null) { - public operator fun plus(element: E): InlineList<E> { +@JvmInline +internal value class InlineList<E>(private val holder: Any? = null) { + operator fun plus(element: E): InlineList<E> { assert { element !is List<*> } // Lists are prohibited return when (holder) { null -> InlineList(element) @@ -31,7 +33,7 @@ internal inline class InlineList<E>(private val holder: Any? = null) { } } - public inline fun forEachReversed(action: (E) -> Unit) { + inline fun forEachReversed(action: (E) -> Unit) { when (holder) { null -> return !is ArrayList<*> -> action(holder as E) diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 98db37de..ad8d86ed 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -15,12 +15,13 @@ import kotlin.jvm.* internal open class ScopeCoroutine<in T>( context: CoroutineContext, @JvmField val uCont: Continuation<T> // unintercepted continuation -) : AbstractCoroutine<T>(context, true), CoroutineStackFrame { +) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame { + final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame final override fun getStackTraceElement(): StackTraceElement? = null - final override val isScopedCoroutine: Boolean get() = true - internal val parent: Job? get() = parentContext[Job] + final override val isScopedCoroutine: Boolean get() = true + internal val parent: Job? get() = parentHandle?.parent override fun afterCompletion(state: Any?) { // Resume in a cancellable way by default when resuming from another context diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt index 173f0afb..f5b96a8d 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt @@ -49,6 +49,19 @@ private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) { try { block() } catch (e: Throwable) { - completion.resumeWith(Result.failure(e)) + dispatcherFailure(completion, e) } } + +private fun dispatcherFailure(completion: Continuation<*>, e: Throwable) { + /* + * This method is invoked when we failed to start a coroutine due to the throwing + * dispatcher implementation or missing Dispatchers.Main. + * This situation is not recoverable, so we are trying to deliver the exception by all means: + * 1) Resume the coroutine with an exception, so it won't prevent its parent from completion + * 2) Rethrow the exception immediately, so it will crash the caller (e.g. when the coroutine had + * no parent or it was async/produce over MainScope). + */ + completion.resumeWith(Result.failure(e)) + throw e +} diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 1273634e..38e870ef 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -82,11 +82,9 @@ private inline fun <T> startDirect(completion: Continuation<T>, block: (Continua * This function shall be invoked at most once on this coroutine. * This function checks cancellation of the outer [Job] on fast-path. * - * First, this function initializes the parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine using [startCoroutineUninterceptedOrReturn]. + * It starts the coroutine using [startCoroutineUninterceptedOrReturn]. */ internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? { - initParentJob() return undispatchedResult({ true }) { block.startCoroutineUninterceptedOrReturn(receiver, this) } @@ -96,8 +94,8 @@ internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, blo * Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path. */ internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturnIgnoreTimeout( - receiver: R, block: suspend R.() -> T): Any? { - initParentJob() + receiver: R, block: suspend R.() -> T +): Any? { return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) { block.startCoroutineUninterceptedOrReturn(receiver, this) } diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 0d974007..a7172707 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -177,15 +177,17 @@ public interface SelectInstance<in R> { * corresponding non-suspending version that can be used with a regular `when` expression to select one * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected. * - * | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version** - * | ---------------- | --------------------------------------------- | ------------------------------------------------ | -------------------------- - * | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted] - * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] - * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] | [offer][SendChannel.offer] - * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] | [poll][ReceiveChannel.poll] - * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll] - * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] | [tryLock][Mutex.tryLock] - * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none + * ### List of supported select methods + * + * | **Receiver** | **Suspending function** | **Select clause** + * | ---------------- | --------------------------------------------- | ----------------------------------------------------- + * | [Job] | [join][Job.join] | [onJoin][Job.onJoin] + * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] + * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] + * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] + * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching] + * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] + * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with [CancellationException]. diff --git a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt index ce20837e..ebe88ce1 100644 --- a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt +++ b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -13,7 +13,7 @@ class AbstractCoroutineTest : TestBase() { fun testNotifications() = runTest { expect(1) val coroutineContext = coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) { + val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) { override fun onStart() { expect(3) } @@ -53,7 +53,7 @@ class AbstractCoroutineTest : TestBase() { fun testNotificationsWithException() = runTest { expect(1) val coroutineContext = coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, false) { + val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, true, false) { override fun onStart() { expect(3) } @@ -91,4 +91,4 @@ class AbstractCoroutineTest : TestBase() { coroutine.resumeWithException(TestException2()) finish(10) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt new file mode 100644 index 00000000..749bbfc9 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt @@ -0,0 +1,85 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlin.test.* + +class CancelledParentAttachTest : TestBase() { + + @Test + fun testAsync() = CoroutineStart.values().forEach(::testAsyncCancelledParent) + + private fun testAsyncCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = async<Int>(start = start) { 42 } + expect(2) + d.invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testLaunch() = CoroutineStart.values().forEach(::testLaunchCancelledParent) + + private fun testLaunchCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = launch(start = start) { } + expect(2) + d.invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testProduce() = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = produce<Int> { } + expect(2) + (d as Job).invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testBroadcast() = CoroutineStart.values().forEach(::testBroadcastCancelledParent) + + private fun testBroadcastCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val bc = broadcast<Int>(start = start) {} + expect(2) + (bc as Job).invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testScopes() { + testScope { coroutineScope { } } + testScope { supervisorScope { } } + testScope { flowScope { } } + testScope { withTimeout(Long.MAX_VALUE) { } } + testScope { withContext(Job()) { } } + testScope { withContext(CoroutineName("")) { } } + } + + private inline fun testScope(crossinline block: suspend () -> Unit) = runTest({ it is CancellationException }) { + cancel() + block() + } +} diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt index a7084296..2d71cc94 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -46,7 +46,7 @@ class ArrayBroadcastChannelTest : TestBase() { assertEquals(2, first.receive()) // suspends assertFalse(first.isClosedForReceive) expect(10) - assertNull(first.receiveOrNull()) // suspends + assertTrue(first.receiveCatching().isClosed) // suspends assertTrue(first.isClosedForReceive) expect(14) } @@ -62,7 +62,7 @@ class ArrayBroadcastChannelTest : TestBase() { assertEquals(2, second.receive()) // suspends assertFalse(second.isClosedForReceive) expect(11) - assertNull(second.receiveOrNull()) // suspends + assertNull(second.receiveCatching().getOrNull()) // suspends assertTrue(second.isClosedForReceive) expect(15) } @@ -116,9 +116,9 @@ class ArrayBroadcastChannelTest : TestBase() { expect(6) assertFalse(sub.isClosedForReceive) for (x in 1..3) - assertEquals(x, sub.receiveOrNull()) + assertEquals(x, sub.receiveCatching().getOrNull()) // and receive close signal - assertNull(sub.receiveOrNull()) + assertNull(sub.receiveCatching().getOrNull()) assertTrue(sub.isClosedForReceive) finish(7) } @@ -153,7 +153,7 @@ class ArrayBroadcastChannelTest : TestBase() { // make sure all of them are consumed check(!sub.isClosedForReceive) for (x in 1..5) check(sub.receive() == x) - check(sub.receiveOrNull() == null) + check(sub.receiveCatching().getOrNull() == null) check(sub.isClosedForReceive) } @@ -196,7 +196,7 @@ class ArrayBroadcastChannelTest : TestBase() { val channel = BroadcastChannel<Int>(1) val subscription = channel.openSubscription() subscription.cancel(TestCancellationException()) - subscription.receiveOrNull() + subscription.receive() } @Test @@ -208,6 +208,6 @@ class ArrayBroadcastChannelTest : TestBase() { channel.cancel() assertTrue(channel.isClosedForSend) assertTrue(sub.isClosedForReceive) - check(sub.receiveOrNull() == null) + check(sub.receiveCatching().getOrNull() == null) } } diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index a57b519f..632fd292 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -38,17 +38,17 @@ class ArrayChannelTest : TestBase() { } @Test - fun testClosedBufferedReceiveOrNull() = runTest { + fun testClosedBufferedReceiveCatching() = runTest { val q = Channel<Int>(1) check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive) expect(1) launch { expect(5) check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(6) check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(7) } expect(2) @@ -86,31 +86,31 @@ class ArrayChannelTest : TestBase() { } @Test - fun testOfferAndPoll() = runTest { + fun testTryOp() = runTest { val q = Channel<Int>(1) - assertTrue(q.offer(1)) + assertTrue(q.trySend(1).isSuccess) expect(1) launch { expect(3) - assertEquals(1, q.poll()) + assertEquals(1, q.tryReceive().getOrNull()) expect(4) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(5) assertEquals(2, q.receive()) // suspends expect(9) - assertEquals(3, q.poll()) + assertEquals(3, q.tryReceive().getOrNull()) expect(10) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(11) } expect(2) yield() expect(6) - assertTrue(q.offer(2)) + assertTrue(q.trySend(2).isSuccess) expect(7) - assertTrue(q.offer(3)) + assertTrue(q.trySend(3).isSuccess) expect(8) - assertFalse(q.offer(4)) + assertFalse(q.trySend(4).isSuccess) yield() finish(12) } @@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith<CancellationException> { q.receiveOrNull() } + assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() } finish(12) } @@ -142,7 +142,7 @@ class ArrayChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel<Int>(5) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } @Test @@ -157,10 +157,10 @@ class ArrayChannelTest : TestBase() { val capacity = 42 val channel = Channel<Int>(capacity) repeat(4) { - channel.offer(-1) + channel.trySend(-1) } repeat(4) { - channel.receiveOrNull() + channel.receiveCatching().getOrNull() } checkBufferChannel(channel, capacity) } diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index 91d941b3..4538f6c6 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -15,28 +15,23 @@ class BasicOperationsTest : TestBase() { } @Test - fun testOfferAfterClose() = runTest { - TestChannelKind.values().forEach { kind -> testOffer(kind) } + fun testTrySendToFullChannel() = runTest { + TestChannelKind.values().forEach { kind -> testTrySendToFullChannel(kind) } } @Test - fun testSendAfterClose() = runTest { - TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) } - } - - @Test - fun testReceiveOrNullAfterClose() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) } + fun testTrySendAfterClose() = runTest { + TestChannelKind.values().forEach { kind -> testTrySend(kind) } } @Test - fun testReceiveOrNullAfterCloseWithException() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) } + fun testSendAfterClose() = runTest { + TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) } } @Test - fun testReceiveOrClosed() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) } + fun testReceiveCatching() = runTest { + TestChannelKind.values().forEach { kind -> testReceiveCatching(kind) } } @Test @@ -49,7 +44,7 @@ class BasicOperationsTest : TestBase() { } } expect(1) - channel.offer(42) + channel.trySend(42) expect(2) channel.close(AssertionError()) finish(4) @@ -90,47 +85,8 @@ class BasicOperationsTest : TestBase() { } } - private suspend fun testReceiveOrNull(kind: TestChannelKind) = coroutineScope { - val channel = kind.create<Int>() - val d = async(NonCancellable) { - channel.receive() - } - - yield() - channel.close() - assertTrue(channel.isClosedForReceive) - - assertNull(channel.receiveOrNull()) - assertNull(channel.poll()) - - d.join() - assertTrue(d.getCancellationException().cause is ClosedReceiveChannelException) - } - - private suspend fun testReceiveOrNullException(kind: TestChannelKind) = coroutineScope { - val channel = kind.create<Int>() - val d = async(NonCancellable) { - channel.receive() - } - - yield() - channel.close(TestException()) - assertTrue(channel.isClosedForReceive) - - assertFailsWith<TestException> { channel.poll() } - try { - channel.receiveOrNull() - fail() - } catch (e: TestException) { - // Expected - } - - d.join() - assertTrue(d.getCancellationException().cause is TestException) - } - @Suppress("ReplaceAssertBooleanWithAssertEquality") - private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope { + private suspend fun testReceiveCatching(kind: TestChannelKind) = coroutineScope { reset() val channel = kind.create<Int>() launch { @@ -139,44 +95,58 @@ class BasicOperationsTest : TestBase() { } expect(1) - val result = channel.receiveOrClosed() - assertEquals(1, result.value) - assertEquals(1, result.valueOrNull) - assertTrue(ValueOrClosed.value(1) == result) + val result = channel.receiveCatching() + assertEquals(1, result.getOrThrow()) + assertEquals(1, result.getOrNull()) + assertTrue(ChannelResult.success(1) == result) expect(3) launch { expect(4) channel.close() } - val closed = channel.receiveOrClosed() + val closed = channel.receiveCatching() expect(5) - assertNull(closed.valueOrNull) + assertNull(closed.getOrNull()) assertTrue(closed.isClosed) - assertNull(closed.closeCause) - assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed) + assertNull(closed.exceptionOrNull()) + assertTrue(ChannelResult.closed<Int>(closed.exceptionOrNull()) == closed) finish(6) } - private suspend fun testOffer(kind: TestChannelKind) = coroutineScope { + private suspend fun testTrySend(kind: TestChannelKind) = coroutineScope { val channel = kind.create<Int>() val d = async { channel.send(42) } yield() channel.close() assertTrue(channel.isClosedForSend) - try { - channel.offer(2) - fail() - } catch (e: ClosedSendChannelException) { - if (!kind.isConflated) { - assertEquals(42, channel.receive()) + channel.trySend(2) + .onSuccess { expectUnreached() } + .onClosed { + assertTrue { it is ClosedSendChannelException} + if (!kind.isConflated) { + assertEquals(42, channel.receive()) + } } - } - d.await() } + private suspend fun testTrySendToFullChannel(kind: TestChannelKind) = coroutineScope { + if (kind.isConflated || kind.capacity == Int.MAX_VALUE) return@coroutineScope + val channel = kind.create<Int>() + // Make it full + repeat(11) { + channel.trySend(42) + } + channel.trySend(1) + .onSuccess { expectUnreached() } + .onFailure { assertNull(it) } + .onClosed { + expectUnreached() + } + } + /** * [ClosedSendChannelException] should not be eaten. * See [https://github.com/Kotlin/kotlinx.coroutines/issues/957] diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt index 41f60479..0b9a0fdb 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt @@ -11,30 +11,30 @@ class ChannelBufferOverflowTest : TestBase() { @Test fun testDropLatest() = runTest { val c = Channel<Int>(2, BufferOverflow.DROP_LATEST) - assertTrue(c.offer(1)) - assertTrue(c.offer(2)) - assertTrue(c.offer(3)) // overflows, dropped + assertTrue(c.trySend(1).isSuccess) + assertTrue(c.trySend(2).isSuccess) + assertTrue(c.trySend(3).isSuccess) // overflows, dropped c.send(4) // overflows dropped assertEquals(1, c.receive()) - assertTrue(c.offer(5)) - assertTrue(c.offer(6)) // overflows, dropped + assertTrue(c.trySend(5).isSuccess) + assertTrue(c.trySend(6).isSuccess) // overflows, dropped assertEquals(2, c.receive()) assertEquals(5, c.receive()) - assertEquals(null, c.poll()) + assertEquals(null, c.tryReceive().getOrNull()) } @Test fun testDropOldest() = runTest { val c = Channel<Int>(2, BufferOverflow.DROP_OLDEST) - assertTrue(c.offer(1)) - assertTrue(c.offer(2)) - assertTrue(c.offer(3)) // overflows, keeps 2, 3 + assertTrue(c.trySend(1).isSuccess) + assertTrue(c.trySend(2).isSuccess) + assertTrue(c.trySend(3).isSuccess) // overflows, keeps 2, 3 c.send(4) // overflows, keeps 3, 4 assertEquals(3, c.receive()) - assertTrue(c.offer(5)) - assertTrue(c.offer(6)) // overflows, keeps 5, 6 + assertTrue(c.trySend(5).isSuccess) + assertTrue(c.trySend(6).isSuccess) // overflows, keeps 5, 6 assertEquals(5, c.receive()) assertEquals(6, c.receive()) - assertEquals(null, c.poll()) + assertEquals(null, c.tryReceive().getOrNull()) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt new file mode 100644 index 00000000..2341c62e --- /dev/null +++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt @@ -0,0 +1,146 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlin.test.* + +class ChannelReceiveCatchingTest : TestBase() { + @Test + fun testChannelOfThrowables() = runTest { + val channel = Channel<Throwable>() + launch { + channel.send(TestException1()) + channel.close(TestException2()) + } + + val element = channel.receiveCatching() + assertTrue(element.getOrThrow() is TestException1) + assertTrue(element.getOrNull() is TestException1) + + val closed = channel.receiveCatching() + assertTrue(closed.isClosed) + assertTrue(closed.isFailure) + assertTrue(closed.exceptionOrNull() is TestException2) + } + + @Test + @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test + fun testNullableIntChanel() = runTest { + val channel = Channel<Int?>() + launch { + expect(2) + channel.send(1) + expect(3) + channel.send(null) + + expect(6) + channel.close() + } + + expect(1) + val element = channel.receiveCatching() + assertEquals(1, element.getOrThrow()) + assertEquals(1, element.getOrNull()) + assertEquals("Value(1)", element.toString()) + assertTrue(ChannelResult.success(1) == element) // Don't box + assertFalse(element.isFailure) + assertFalse(element.isClosed) + + expect(4) + val nullElement = channel.receiveCatching() + assertNull(nullElement.getOrThrow()) + assertNull(nullElement.getOrNull()) + assertEquals("Value(null)", nullElement.toString()) + assertTrue(ChannelResult.success(null) == nullElement) // Don't box + assertFalse(element.isFailure) + assertFalse(element.isClosed) + + expect(5) + val closed = channel.receiveCatching() + assertTrue(closed.isClosed) + assertTrue(closed.isFailure) + + val closed2 = channel.receiveCatching() + assertTrue(closed2.isClosed) + assertTrue(closed.isFailure) + assertNull(closed2.exceptionOrNull()) + finish(7) + } + + @Test + @ExperimentalUnsignedTypes + fun testUIntChannel() = runTest { + val channel = Channel<UInt>() + launch { + expect(2) + channel.send(1u) + yield() + expect(4) + channel.send((Long.MAX_VALUE - 1).toUInt()) + expect(5) + } + + expect(1) + val element = channel.receiveCatching() + assertEquals(1u, element.getOrThrow()) + + expect(3) + val element2 = channel.receiveCatching() + assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.getOrThrow()) + finish(6) + } + + @Test + fun testCancelChannel() = runTest { + val channel = Channel<Boolean>() + launch { + expect(2) + channel.cancel() + } + + expect(1) + val closed = channel.receiveCatching() + assertTrue(closed.isClosed) + assertTrue(closed.isFailure) + finish(3) + } + + @Test + @ExperimentalUnsignedTypes + fun testReceiveResultChannel() = runTest { + val channel = Channel<ChannelResult<UInt>>() + launch { + channel.send(ChannelResult.success(1u)) + channel.send(ChannelResult.closed(TestException1())) + channel.close(TestException2()) + } + + val intResult = channel.receiveCatching() + assertEquals(1u, intResult.getOrThrow().getOrThrow()) + assertFalse(intResult.isFailure) + assertFalse(intResult.isClosed) + + val closeCauseResult = channel.receiveCatching() + assertTrue(closeCauseResult.getOrThrow().exceptionOrNull() is TestException1) + + val closeCause = channel.receiveCatching() + assertTrue(closeCause.isClosed) + assertTrue(closeCause.isFailure) + assertTrue(closeCause.exceptionOrNull() is TestException2) + } + + @Test + fun testToString() = runTest { + val channel = Channel<String>(1) + channel.send("message") + channel.close(TestException1("OK")) + assertEquals("Value(message)", channel.receiveCatching().toString()) + // toString implementation for exception differs on every platform + val str = channel.receiveCatching().toString() + if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex())) + error("Unexpected string: '$str'") + } +} diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt deleted file mode 100644 index e58b0dee..00000000 --- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.channels - -import kotlinx.coroutines.* -import kotlin.test.* - -class ChannelReceiveOrClosedTest : TestBase() { - @Test - fun testChannelOfThrowables() = runTest { - val channel = Channel<Throwable>() - launch { - channel.send(TestException1()) - channel.close(TestException2()) - } - - val element = channel.receiveOrClosed() - assertTrue(element.value is TestException1) - assertTrue(element.valueOrNull is TestException1) - - val closed = channel.receiveOrClosed() - assertTrue(closed.isClosed) - assertTrue(closed.closeCause is TestException2) - } - - @Test - @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test - fun testNullableIntChanel() = runTest { - val channel = Channel<Int?>() - launch { - expect(2) - channel.send(1) - expect(3) - channel.send(null) - - expect(6) - channel.close() - } - - expect(1) - val element = channel.receiveOrClosed() - assertEquals(1, element.value) - assertEquals(1, element.valueOrNull) - assertEquals("Value(1)", element.toString()) - assertTrue(ValueOrClosed.value(1) == element) // Don't box - - expect(4) - val nullElement = channel.receiveOrClosed() - assertNull(nullElement.value) - assertNull(nullElement.valueOrNull) - assertEquals("Value(null)", nullElement.toString()) - assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box - - expect(5) - val closed = channel.receiveOrClosed() - assertTrue(closed.isClosed) - - val closed2 = channel.receiveOrClosed() - assertTrue(closed2.isClosed) - assertNull(closed2.closeCause) - finish(7) - } - - @Test - @ExperimentalUnsignedTypes - fun testUIntChannel() = runTest { - val channel = Channel<UInt>() - launch { - expect(2) - channel.send(1u) - yield() - expect(4) - channel.send((Long.MAX_VALUE - 1).toUInt()) - expect(5) - } - - expect(1) - val element = channel.receiveOrClosed() - assertEquals(1u, element.value) - - expect(3) - val element2 = channel.receiveOrClosed() - assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value) - finish(6) - } - - @Test - fun testCancelChannel() = runTest { - val channel = Channel<Boolean>() - launch { - expect(2) - channel.cancel() - } - - expect(1) - val closed = channel.receiveOrClosed() - assertTrue(closed.isClosed) - finish(3) - } - - @Test - @ExperimentalUnsignedTypes - fun testReceiveResultChannel() = runTest { - val channel = Channel<ValueOrClosed<UInt>>() - launch { - channel.send(ValueOrClosed.value(1u)) - channel.send(ValueOrClosed.closed(TestException1())) - channel.close(TestException2()) - } - - val intResult = channel.receiveOrClosed() - assertEquals(1u, intResult.value.value) - - val closeCauseResult = channel.receiveOrClosed() - assertTrue(closeCauseResult.value.closeCause is TestException1) - - val closeCause = channel.receiveOrClosed() - assertTrue(closeCause.isClosed) - assertTrue(closeCause.closeCause is TestException2) - } - - @Test - fun testToString() = runTest { - val channel = Channel<String>(1) - channel.send("message") - channel.close(TestException1("OK")) - assertEquals("Value(message)", channel.receiveOrClosed().toString()) - // toString implementation for exception differs on every platform - val str = channel.receiveOrClosed().toString() - if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex())) - error("Unexpected string: '$str'") - } -} diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt index d2ef3d26..ae05fb8d 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -70,36 +70,10 @@ class ChannelUndeliveredElementFailureTest : TestBase() { } @Test - fun testReceiveOrNullCancelledFail() = runTest(unhandled = shouldBeUnhandled) { + fun testReceiveCatchingCancelledFail() = runTest(unhandled = shouldBeUnhandled) { val channel = Channel(onUndeliveredElement = onCancelFail) val job = launch(start = CoroutineStart.UNDISPATCHED) { - channel.receiveOrNull() - expectUnreached() // will be cancelled before it dispatches - } - channel.send(item) - job.cancel() - } - - @Test - fun testReceiveOrNullSelectCancelledFail() = runTest(unhandled = shouldBeUnhandled) { - val channel = Channel(onUndeliveredElement = onCancelFail) - val job = launch(start = CoroutineStart.UNDISPATCHED) { - select<Unit> { - channel.onReceiveOrNull { - expectUnreached() - } - } - expectUnreached() // will be cancelled before it dispatches - } - channel.send(item) - job.cancel() - } - - @Test - fun testReceiveOrClosedCancelledFail() = runTest(unhandled = shouldBeUnhandled) { - val channel = Channel(onUndeliveredElement = onCancelFail) - val job = launch(start = CoroutineStart.UNDISPATCHED) { - channel.receiveOrClosed() + channel.receiveCatching() expectUnreached() // will be cancelled before it dispatches } channel.send(item) @@ -111,7 +85,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() { val channel = Channel(onUndeliveredElement = onCancelFail) val job = launch(start = CoroutineStart.UNDISPATCHED) { select<Unit> { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expectUnreached() } } @@ -140,4 +114,4 @@ class ChannelUndeliveredElementFailureTest : TestBase() { expectUnreached() } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt index 7dd232f2..a8c2a29c 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -42,7 +42,7 @@ class ConflatedBroadcastChannelTest : TestBase() { launch(start = CoroutineStart.UNDISPATCHED) { expect(2) val sub = broadcast.openSubscription() - assertNull(sub.poll()) + assertNull(sub.tryReceive().getOrNull()) expect(3) assertEquals("one", sub.receive()) // suspends expect(6) @@ -68,7 +68,7 @@ class ConflatedBroadcastChannelTest : TestBase() { expect(14) assertEquals("three", sub.receive()) // suspends expect(17) - assertNull(sub.receiveOrNull()) // suspends until closed + assertNull(sub.receiveCatching().getOrNull()) // suspends until closed expect(20) sub.cancel() expect(21) diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt index 18f28438..370fd5b9 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -12,14 +12,14 @@ open class ConflatedChannelTest : TestBase() { Channel<T>(Channel.CONFLATED) @Test - fun testBasicConflationOfferPoll() { + fun testBasicConflationOfferTryReceive() { val q = createConflatedChannel<Int>() - assertNull(q.poll()) - assertTrue(q.offer(1)) - assertTrue(q.offer(2)) - assertTrue(q.offer(3)) - assertEquals(3, q.poll()) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) + assertTrue(q.trySend(1).isSuccess) + assertTrue(q.trySend(2).isSuccess) + assertTrue(q.trySend(3).isSuccess) + assertEquals(3, q.tryReceive().getOrNull()) + assertNull(q.tryReceive().getOrNull()) } @Test @@ -27,7 +27,7 @@ open class ConflatedChannelTest : TestBase() { val q = createConflatedChannel<Int>() q.send(1) q.send(2) // shall conflated previously sent - assertEquals(2, q.receiveOrNull()) + assertEquals(2, q.receiveCatching().getOrNull()) } @Test @@ -41,7 +41,7 @@ open class ConflatedChannelTest : TestBase() { // not it is closed for receive, too assertTrue(q.isClosedForSend) assertTrue(q.isClosedForReceive) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) } @Test @@ -82,7 +82,7 @@ open class ConflatedChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith<CancellationException> { q.receiveOrNull() } + assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() } finish(2) } @@ -90,6 +90,6 @@ open class ConflatedChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = createConflatedChannel<Int>() channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } } diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt index 4233a350..501affb4 100644 --- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -12,14 +12,14 @@ class LinkedListChannelTest : TestBase() { fun testBasic() = runTest { val c = Channel<Int>(Channel.UNLIMITED) c.send(1) - check(c.offer(2)) + assertTrue(c.trySend(2).isSuccess) c.send(3) check(c.close()) check(!c.close()) assertEquals(1, c.receive()) - assertEquals(2, c.poll()) - assertEquals(3, c.receiveOrNull()) - assertNull(c.receiveOrNull()) + assertEquals(2, c.tryReceive().getOrNull()) + assertEquals(3, c.receiveCatching().getOrNull()) + assertNull(c.receiveCatching().getOrNull()) } @Test @@ -31,13 +31,13 @@ class LinkedListChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith<CancellationException> { q.receiveOrNull() } + assertFailsWith<CancellationException> { q.receive() } } @Test fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel<Int>(Channel.UNLIMITED) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } } diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 194504e7..61ef0726 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -24,7 +24,7 @@ class ProduceTest : TestBase() { expect(4) check(c.receive() == 2) expect(5) - check(c.receiveOrNull() == null) + assertNull(c.receiveCatching().getOrNull()) finish(7) } @@ -49,7 +49,7 @@ class ProduceTest : TestBase() { expect(4) c.cancel() expect(5) - assertFailsWith<CancellationException> { c.receiveOrNull() } + assertFailsWith<CancellationException> { c.receiveCatching().getOrThrow() } expect(6) yield() // to produce finish(8) @@ -76,7 +76,7 @@ class ProduceTest : TestBase() { expect(4) c.cancel(TestCancellationException()) try { - assertNull(c.receiveOrNull()) + c.receive() expectUnreached() } catch (e: TestCancellationException) { expect(5) diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index 4d20d715..c83813e4 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -36,15 +36,15 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testClosedReceiveOrNull() = runTest { + fun testClosedReceiveCatching() = runTest { val q = Channel<Int>(Channel.RENDEZVOUS) check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive) expect(1) launch { expect(3) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(4) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(6) } expect(2) @@ -80,26 +80,26 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testOfferAndPool() = runTest { + fun testTrySendTryReceive() = runTest { val q = Channel<Int>(Channel.RENDEZVOUS) - assertFalse(q.offer(1)) + assertFalse(q.trySend(1).isSuccess) expect(1) launch { expect(3) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) expect(4) assertEquals(2, q.receive()) expect(7) - assertNull(q.poll()) + assertNull(q.tryReceive().getOrNull()) yield() expect(9) - assertEquals(3, q.poll()) + assertEquals(3, q.tryReceive().getOrNull()) expect(10) } expect(2) yield() expect(5) - assertTrue(q.offer(2)) + assertTrue(q.trySend(2).isSuccess) expect(6) yield() expect(8) @@ -233,9 +233,9 @@ class RendezvousChannelTest : TestBase() { expect(7) yield() // try to resume sender (it will not resume despite the close!) expect(8) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(9) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(10) yield() // to sender, it was resumed! finish(12) @@ -266,7 +266,7 @@ class RendezvousChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith<CancellationException> { q.receiveOrNull() } + assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() } finish(12) } @@ -274,6 +274,6 @@ class RendezvousChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel<Int>(Channel.RENDEZVOUS) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receiveCatching().getOrThrow() } } diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 993be78e..f234e141 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -42,11 +42,10 @@ private class ChannelViaBroadcast<E>( override val isEmpty: Boolean get() = sub.isEmpty override suspend fun receive(): E = sub.receive() - override suspend fun receiveOrNull(): E? = sub.receiveOrNull() - override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed() - override fun poll(): E? = sub.poll() + override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching() override fun iterator(): ChannelIterator<E> = sub.iterator() - + override fun tryReceive(): ChannelResult<E> = sub.tryReceive() + override fun cancel(cause: CancellationException?) = sub.cancel(cause) // implementing hidden method anyway, so can cast to an internal class @@ -55,8 +54,6 @@ private class ChannelViaBroadcast<E>( override val onReceive: SelectClause1<E> get() = sub.onReceive - override val onReceiveOrNull: SelectClause1<E?> - get() = sub.onReceiveOrNull - override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> - get() = sub.onReceiveOrClosed + override val onReceiveCatching: SelectClause1<ChannelResult<E>> + get() = sub.onReceiveCatching } diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index ddb1d88a..b2d957be 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -26,7 +26,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine ?: error("Event loop is missing, virtual time source works only as part of event loop") if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos") - val nextTask = heap.minBy { it.deadline } ?: return@launch + val nextTask = heap.minByOrNull { it.deadline } ?: return@launch heap.remove(nextTask) currentTime = nextTask.deadline nextTask.run() diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index f93d0399..410955ce 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -128,145 +128,6 @@ class ChannelBuildersFlowTest : TestBase() { } @Test - fun testBroadcastChannelAsFlow() = runTest { - val channel = broadcast { - repeat(10) { - send(it + 1) - } - } - - val sum = channel.asFlow().sum() - assertEquals(55, sum) - } - - @Test - fun testExceptionInBroadcast() = runTest { - expect(1) - val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well - repeat(10) { - send(it + 1) - } - throw TestException() - } - assertEquals(15, channel.asFlow().take(5).sum()) - - // Workaround for JS bug - try { - channel.asFlow().collect { /* Do nothing */ } - expectUnreached() - } catch (e: TestException) { - finish(2) - } - } - - @Test - fun testBroadcastChannelAsFlowLimits() = runTest { - val channel = BroadcastChannel<Int>(1) - val flow = channel.asFlow().map { it * it }.drop(1).take(2) - - var expected = 0 - launch { - assertTrue(channel.offer(1)) // Handed to the coroutine - assertTrue(channel.offer(2)) // Buffered - assertFalse(channel.offer(3)) // Failed to offer - channel.send(3) - yield() - assertEquals(1, expected) - assertTrue(channel.offer(4)) // Handed to the coroutine - assertTrue(channel.offer(5)) // Buffered - assertFalse(channel.offer(6)) // Failed to offer - channel.send(6) - assertEquals(2, expected) - } - - val sum = flow.sum() - assertEquals(13, sum) - ++expected - val sum2 = flow.sum() - assertEquals(61, sum2) - ++expected - } - - @Test - fun flowAsBroadcast() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - } - - val channel = flow.broadcastIn(this) - assertEquals((0..9).toList(), channel.openSubscription().toList()) - } - - @Test - fun flowAsBroadcastMultipleSubscription() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - } - - val broadcast = flow.broadcastIn(this) - val channel = broadcast.openSubscription() - val channel2 = broadcast.openSubscription() - - assertEquals(0, channel.receive()) - assertEquals(0, channel2.receive()) - yield() - assertEquals(1, channel.receive()) - assertEquals(1, channel2.receive()) - - channel.cancel() - channel2.cancel() - yield() - ensureActive() - } - - @Test - fun flowAsBroadcastException() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - - throw TestException() - } - - val channel = flow.broadcastIn(this + NonCancellable) - assertFailsWith<TestException> { channel.openSubscription().toList() } - assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel - } - - // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains - @Test - fun testFlowAsBroadcastAsFlow() = runTest { - val flow = flow { - emit(1) - emit(2) - emit(3) - }.broadcastIn(this).asFlow() - - assertEquals(6, flow.sum()) - assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold - } - - @Test - fun testBroadcastAsFlowAsBroadcast() = runTest { - val channel = broadcast { - send(1) - }.asFlow().broadcastIn(this) - - channel.openSubscription().consumeEach { - assertEquals(1, it) - } - - channel.openSubscription().consumeEach { - fail() - } - } - - @Test fun testProduceInAtomicity() = runTest { val flow = flowOf(1).onCompletion { expect(2) } val scope = CoroutineScope(wrapperDispatcher()) diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index 31a929b2..f197a214 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -12,9 +12,9 @@ class ChannelFlowTest : TestBase() { @Test fun testRegular() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertTrue(offer(3)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertTrue(trySend(3).isSuccess) } assertEquals(listOf(1, 2, 3), flow.toList()) } @@ -22,9 +22,9 @@ class ChannelFlowTest : TestBase() { @Test fun testBuffer() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertFalse(offer(3)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertFalse(trySend(3).isSuccess) }.buffer(1) assertEquals(listOf(1, 2), flow.toList()) } @@ -32,10 +32,10 @@ class ChannelFlowTest : TestBase() { @Test fun testConflated() = runTest { val flow = channelFlow { - assertTrue(offer(1)) - assertTrue(offer(2)) - assertTrue(offer(3)) - assertTrue(offer(4)) + assertTrue(trySend(1).isSuccess) + assertTrue(trySend(2).isSuccess) + assertTrue(trySend(3).isSuccess) + assertTrue(trySend(4).isSuccess) }.buffer(Channel.CONFLATED) assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated } @@ -43,7 +43,7 @@ class ChannelFlowTest : TestBase() { @Test fun testFailureCancelsChannel() = runTest { val flow = channelFlow { - offer(1) + trySend(1) invokeOnClose { expect(2) } diff --git a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt index d41ab889..e8666479 100644 --- a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt @@ -68,10 +68,10 @@ class FlowScopeTest : TestBase() { flowScope { flowScope { launch { - throw CancellationException(null) + throw CancellationException("") } } } } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index f55e8bee..0ff2e0b8 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -1,10 +1,11 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlin.test.* @@ -290,4 +291,24 @@ class OnCompletionTest : TestBase() { val expected = (1..5).toList() + (-1) assertEquals(expected, result) } + + @Test + fun testCancelledEmitAllFlow() = runTest { + // emitAll does not call 'collect' on onCompletion collector + // if the target flow is empty + flowOf(1, 2, 3) + .onCompletion { emitAll(MutableSharedFlow()) } + .take(1) + .collect() + } + + @Test + fun testCancelledEmitAllChannel() = runTest { + // emitAll does not call 'collect' on onCompletion collector + // if the target channel is empty + flowOf(1, 2, 3) + .onCompletion { emitAll(Channel()) } + .take(1) + .collect() + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt index 20e07873..c6be36d0 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt @@ -24,6 +24,13 @@ class ScanTest : TestBase() { } @Test + fun testFoldWithInitial() = runTest { + val flow = flowOf(1, 2, 3) + val result = flow.runningFold(emptyList<Int>()) { acc, value -> acc + value }.toList() + assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result) + } + + @Test fun testNulls() = runTest { val flow = flowOf(null, 2, null, null, null, 5) val result = flow.runningReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList() diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt index 371d0147..85a17ba0 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt @@ -42,7 +42,7 @@ class ShareInFusionTest : TestBase() { val flow = channelFlow { // send a batch of 10 elements using [offer] for (i in 1..10) { - assertTrue(offer(i)) // offer must succeed, because buffer + assertTrue(trySend(i).isSuccess) // offer must succeed, because buffer } send(0) // done }.buffer(10) // request a buffer of 10 @@ -53,4 +53,4 @@ class ShareInFusionTest : TestBase() { .collect { i -> expect(i + 1) } finish(12) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt index 42cdb1e1..db69e2bc 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt @@ -167,11 +167,11 @@ class ShareInTest : TestBase() { subs += shared .onEach { value -> // only the first threshold subscribers get the value when (i) { - in 1..threshold -> log.offer("sub$i: $value") + in 1..threshold -> log.trySend("sub$i: $value") else -> expectUnreached() } } - .onCompletion { log.offer("sub$i: completion") } + .onCompletion { log.trySend("sub$i: completion") } .launchIn(this) checkStartTransition(i) } @@ -210,4 +210,4 @@ class ShareInTest : TestBase() { stop() } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt index 0a2c0458..be4f8c53 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt @@ -174,23 +174,11 @@ class StateFlowTest : TestBase() { } @Test - fun testReferenceUpdatesAndCAS() { - val d0 = Data(0) - val d0_1 = Data(0) - val d1 = Data(1) - val d1_1 = Data(1) - val d1_2 = Data(1) - val state = MutableStateFlow(d0) - assertSame(d0, state.value) - state.value = d0_1 // equal, nothing changes - assertSame(d0, state.value) - state.value = d1 // updates - assertSame(d1, state.value) - assertFalse(state.compareAndSet(d0, d0)) // wrong value - assertSame(d1, state.value) - assertTrue(state.compareAndSet(d1_1, d1_2)) // "updates", but ref stays - assertSame(d1, state.value) - assertTrue(state.compareAndSet(d1_1, d0)) // updates, reference changes - assertSame(d0, state.value) + fun testUpdate() = runTest { + val state = MutableStateFlow(0) + state.update { it + 2 } + assertEquals(2, state.value) + state.update { it + 3 } + assertEquals(5, state.value) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt new file mode 100644 index 00000000..e7699ccc --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class LastTest : TestBase() { + @Test + fun testLast() = runTest { + val flow = flowOf(1, 2, 3) + assertEquals(3, flow.last()) + assertEquals(3, flow.lastOrNull()) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf(1, null) + assertNull(flow.last()) + assertNull(flow.lastOrNull()) + } + + @Test + fun testNullsLastOrNull() = runTest { + val flow = flowOf(null, 1) + assertEquals(1, flow.lastOrNull()) + } + + @Test + fun testEmptyFlow() = runTest { + assertFailsWith<NoSuchElementException> { emptyFlow<Int>().last() } + assertNull(emptyFlow<Int>().lastOrNull()) + } + + @Test + fun testBadClass() = runTest { + val instance = BadClass() + val flow = flowOf(instance) + assertSame(instance, flow.last()) + assertSame(instance, flow.lastOrNull()) + + } +} diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt index a4f8c3ba..0158c843 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.selects @@ -295,10 +295,10 @@ class SelectArrayChannelTest : TestBase() { } expect(2) select<Unit> { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) } } @@ -316,10 +316,10 @@ class SelectArrayChannelTest : TestBase() { } expect(2) select<Unit> { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertTrue(it.closeCause is TestException) + assertTrue(it.exceptionOrNull() is TestException) } } @@ -327,16 +327,16 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosed() = runTest { + fun testSelectReceiveCatching() = runTest { val c = Channel<Int>(1) val iterations = 10 expect(1) val job = launch { repeat(iterations) { select<Unit> { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(4 + it * 2) - assertEquals(it, v.value) + assertEquals(it, v.getOrNull()) } } } @@ -360,9 +360,9 @@ class SelectArrayChannelTest : TestBase() { launch { expect(3) val res = select<String> { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(6) - assertEquals(42, v.value) + assertEquals(42, v.getOrNull()) yield() // back to main expect(8) "OK" @@ -396,9 +396,9 @@ class SelectArrayChannelTest : TestBase() { expect(1) select<Unit> { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) finish(3) } } @@ -412,9 +412,9 @@ class SelectArrayChannelTest : TestBase() { expect(1) select<Unit> { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertFalse(it.isClosed) - assertEquals(42, it.value) + assertEquals(42, it.getOrNull()) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt index e31ccfc1..ba8f56ad 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 @@ -27,7 +27,7 @@ class SelectLoopTest : TestBase() { try { while (true) { select<Unit> { - channel.onReceiveOrNull { + channel.onReceiveCatching { expectUnreached() } job.onJoin { @@ -40,4 +40,4 @@ class SelectLoopTest : TestBase() { finish(4) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index 2027630f..6a157676 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 @@ -306,7 +306,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedWaitClosed() = runTest { + fun testSelectReceiveCatchingWaitClosed() = runTest { expect(1) val channel = Channel<String>(Channel.RENDEZVOUS) launch { @@ -316,10 +316,10 @@ class SelectRendezvousChannelTest : TestBase() { } expect(2) select<Unit> { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) } } @@ -327,7 +327,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + fun testSelectReceiveCatchingWaitClosedWithCause() = runTest { expect(1) val channel = Channel<String>(Channel.RENDEZVOUS) launch { @@ -337,10 +337,10 @@ class SelectRendezvousChannelTest : TestBase() { } expect(2) select<Unit> { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertTrue(it.closeCause is TestException) + assertTrue(it.exceptionOrNull() is TestException) } } @@ -348,31 +348,31 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedForClosedChannel() = runTest { + fun testSelectReceiveCatchingForClosedChannel() = runTest { val channel = Channel<Unit>() channel.close() expect(1) select<Unit> { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) finish(3) } } } @Test - fun testSelectReceiveOrClosed() = runTest { + fun testSelectReceiveCatching() = runTest { val channel = Channel<Int>(Channel.RENDEZVOUS) val iterations = 10 expect(1) val job = launch { repeat(iterations) { select<Unit> { - channel.onReceiveOrClosed { v -> + channel.onReceiveCatching { v -> expect(4 + it * 2) - assertEquals(it, v.value) + assertEquals(it, v.getOrThrow()) } } } @@ -390,15 +390,15 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedDispatch() = runTest { + fun testSelectReceiveCatchingDispatch() = runTest { val c = Channel<Int>(Channel.RENDEZVOUS) expect(1) launch { expect(3) val res = select<String> { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(6) - assertEquals(42, v.value) + assertEquals(42, v.getOrThrow()) yield() // back to main expect(8) "OK" |