diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt | 110 |
1 files changed, 45 insertions, 65 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 9721583e..bcf19215 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -127,8 +127,7 @@ internal abstract class AbstractSendChannel<E>( // ------ SendChannel ------ public final override val isClosedForSend: Boolean get() = closedForSend != null - public override val isFull: Boolean get() = isFullImpl - protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull + private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull public final override suspend fun send(element: E) { // fast path -- try offer non-blocking @@ -137,23 +136,43 @@ internal abstract class AbstractSendChannel<E>( return sendSuspend(element) } - public final override fun offer(element: E): Boolean { + override fun offer(element: E): Boolean { + // Temporary migration for offer users who rely on onUndeliveredElement + try { + return super.offer(element) + } catch (e: Throwable) { + onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { + // If it crashes, add send exception as suppressed for better diagnostics + it.addSuppressed(e) + throw it + } + throw e + } + } + + public final override fun trySend(element: E): ChannelResult<Unit> { val result = offerInternal(element) return when { - result === OFFER_SUCCESS -> true + result === OFFER_SUCCESS -> ChannelResult.success(Unit) result === OFFER_FAILED -> { - // We should check for closed token on offer as well, otherwise offer won't be linearizable + // We should check for closed token on trySend as well, otherwise trySend won't be linearizable // in the face of concurrent close() // See https://github.com/Kotlin/kotlinx.coroutines/issues/359 - throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false)) + val closedForSend = closedForSend ?: return ChannelResult.failure() + ChannelResult.closed(helpCloseAndGetSendException(closedForSend)) } result is Closed<*> -> { - throw recoverStackTrace(helpCloseAndGetSendException(element, result)) + ChannelResult.closed(helpCloseAndGetSendException(result)) } - else -> error("offerInternal returned $result") + else -> error("trySend returned $result") } } + private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable { + helpClose(closed) + return closed.sendException + } + private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable { // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419 @@ -604,26 +623,8 @@ internal abstract class AbstractChannel<E>( if (result) onReceiveEnqueued() } - public final override suspend fun receiveOrNull(): E? { - // fast path -- try poll non-blocking - val result = pollInternal() - @Suppress("UNCHECKED_CAST") - if (result !== POLL_FAILED && result !is Closed<*>) return result as E - // slow-path does suspend - return receiveSuspend(RECEIVE_NULL_ON_CLOSE) - } - @Suppress("UNCHECKED_CAST") - private fun receiveOrNullResult(result: Any?): E? { - if (result is Closed<*>) { - if (result.closeCause != null) throw recoverStackTrace(result.closeCause) - return null - } - return result as E - } - - @Suppress("UNCHECKED_CAST") - public final override suspend fun receiveOrClosed(): ValueOrClosed<E> { + public final override suspend fun receiveCatching(): ChannelResult<E> { // fast path -- try poll non-blocking val result = pollInternal() if (result !== POLL_FAILED) return result.toResult() @@ -632,9 +633,11 @@ internal abstract class AbstractChannel<E>( } @Suppress("UNCHECKED_CAST") - public final override fun poll(): E? { + public final override fun tryReceive(): ChannelResult<E> { val result = pollInternal() - return if (result === POLL_FAILED) null else receiveOrNullResult(result) + if (result === POLL_FAILED) return ChannelResult.failure() + if (result is Closed<*>) return ChannelResult.closed(result.closeCause) + return ChannelResult.success(result as E) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") @@ -734,18 +737,10 @@ internal abstract class AbstractChannel<E>( } } - final override val onReceiveOrNull: SelectClause1<E?> - get() = object : SelectClause1<E?> { + final override val onReceiveCatching: SelectClause1<ChannelResult<E>> + get() = object : SelectClause1<ChannelResult<E>> { @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) { - registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R) - } - } - - final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>> - get() = object : SelectClause1<ValueOrClosed<E>> { - @Suppress("UNCHECKED_CAST") - override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) { + override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) { registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R) } } @@ -776,15 +771,7 @@ internal abstract class AbstractChannel<E>( } RECEIVE_RESULT -> { if (!select.trySelect()) return - startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion) - } - RECEIVE_NULL_ON_CLOSE -> { - if (value.closeCause == null) { - if (!select.trySelect()) return - startCoroutineUnintercepted(null, select.completion) - } else { - throw recoverStackTrace(value.receiveException) - } + startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion) } } } @@ -905,7 +892,7 @@ internal abstract class AbstractChannel<E>( @JvmField val receiveMode: Int ) : Receive<E>() { fun resumeValue(value: E): Any? = when (receiveMode) { - RECEIVE_RESULT -> ValueOrClosed.value(value) + RECEIVE_RESULT -> ChannelResult.success(value) else -> value } @@ -921,7 +908,6 @@ internal abstract class AbstractChannel<E>( override fun resumeReceiveClosed(closed: Closed<*>) { when { - receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>()) else -> cont.resumeWithException(closed.receiveException) } @@ -990,7 +976,7 @@ internal abstract class AbstractChannel<E>( @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( - if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, + if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value, select.completion, resumeOnCancellationFun(value) ) @@ -1000,12 +986,7 @@ internal abstract class AbstractChannel<E>( if (!select.trySelect()) return when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException) - RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion) - RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { - block.startCoroutineCancellable(null, select.completion) - } else { - select.resumeSelectWithException(closed.receiveException) - } + RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion) } } @@ -1023,8 +1004,7 @@ internal abstract class AbstractChannel<E>( // receiveMode values internal const val RECEIVE_THROWS_ON_CLOSE = 0 -internal const val RECEIVE_NULL_ON_CLOSE = 1 -internal const val RECEIVE_RESULT = 2 +internal const val RECEIVE_RESULT = 1 @JvmField @SharedImmutable @@ -1128,9 +1108,9 @@ internal class Closed<in E>( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed@$hexAddress[$closeCause]" @@ -1143,8 +1123,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose } @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") -private inline fun <E> Any?.toResult(): ValueOrClosed<E> = - if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E) +private inline fun <E> Any?.toResult(): ChannelResult<E> = + if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E) @Suppress("NOTHING_TO_INLINE") -private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause) +private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause) |