aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt110
1 files changed, 45 insertions, 65 deletions
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 9721583e..bcf19215 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -127,8 +127,7 @@ internal abstract class AbstractSendChannel<E>(
// ------ SendChannel ------
public final override val isClosedForSend: Boolean get() = closedForSend != null
- public override val isFull: Boolean get() = isFullImpl
- protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
+ private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
public final override suspend fun send(element: E) {
// fast path -- try offer non-blocking
@@ -137,23 +136,43 @@ internal abstract class AbstractSendChannel<E>(
return sendSuspend(element)
}
- public final override fun offer(element: E): Boolean {
+ override fun offer(element: E): Boolean {
+ // Temporary migration for offer users who rely on onUndeliveredElement
+ try {
+ return super.offer(element)
+ } catch (e: Throwable) {
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
+ // If it crashes, add send exception as suppressed for better diagnostics
+ it.addSuppressed(e)
+ throw it
+ }
+ throw e
+ }
+ }
+
+ public final override fun trySend(element: E): ChannelResult<Unit> {
val result = offerInternal(element)
return when {
- result === OFFER_SUCCESS -> true
+ result === OFFER_SUCCESS -> ChannelResult.success(Unit)
result === OFFER_FAILED -> {
- // We should check for closed token on offer as well, otherwise offer won't be linearizable
+ // We should check for closed token on trySend as well, otherwise trySend won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
- throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
+ val closedForSend = closedForSend ?: return ChannelResult.failure()
+ ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
}
result is Closed<*> -> {
- throw recoverStackTrace(helpCloseAndGetSendException(element, result))
+ ChannelResult.closed(helpCloseAndGetSendException(result))
}
- else -> error("offerInternal returned $result")
+ else -> error("trySend returned $result")
}
}
+ private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
+ helpClose(closed)
+ return closed.sendException
+ }
+
private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
@@ -604,26 +623,8 @@ internal abstract class AbstractChannel<E>(
if (result) onReceiveEnqueued()
}
- public final override suspend fun receiveOrNull(): E? {
- // fast path -- try poll non-blocking
- val result = pollInternal()
- @Suppress("UNCHECKED_CAST")
- if (result !== POLL_FAILED && result !is Closed<*>) return result as E
- // slow-path does suspend
- return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
- }
-
@Suppress("UNCHECKED_CAST")
- private fun receiveOrNullResult(result: Any?): E? {
- if (result is Closed<*>) {
- if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
- return null
- }
- return result as E
- }
-
- @Suppress("UNCHECKED_CAST")
- public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
+ public final override suspend fun receiveCatching(): ChannelResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
@@ -632,9 +633,11 @@ internal abstract class AbstractChannel<E>(
}
@Suppress("UNCHECKED_CAST")
- public final override fun poll(): E? {
+ public final override fun tryReceive(): ChannelResult<E> {
val result = pollInternal()
- return if (result === POLL_FAILED) null else receiveOrNullResult(result)
+ if (result === POLL_FAILED) return ChannelResult.failure()
+ if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
+ return ChannelResult.success(result as E)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
@@ -734,18 +737,10 @@ internal abstract class AbstractChannel<E>(
}
}
- final override val onReceiveOrNull: SelectClause1<E?>
- get() = object : SelectClause1<E?> {
+ final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
+ get() = object : SelectClause1<ChannelResult<E>> {
@Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
- registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R)
- }
- }
-
- final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
- get() = object : SelectClause1<ValueOrClosed<E>> {
- @Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
}
}
@@ -776,15 +771,7 @@ internal abstract class AbstractChannel<E>(
}
RECEIVE_RESULT -> {
if (!select.trySelect()) return
- startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
- }
- RECEIVE_NULL_ON_CLOSE -> {
- if (value.closeCause == null) {
- if (!select.trySelect()) return
- startCoroutineUnintercepted(null, select.completion)
- } else {
- throw recoverStackTrace(value.receiveException)
- }
+ startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
}
}
}
@@ -905,7 +892,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
- RECEIVE_RESULT -> ValueOrClosed.value(value)
+ RECEIVE_RESULT -> ChannelResult.success(value)
else -> value
}
@@ -921,7 +908,6 @@ internal abstract class AbstractChannel<E>(
override fun resumeReceiveClosed(closed: Closed<*>) {
when {
- receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
else -> cont.resumeWithException(closed.receiveException)
}
@@ -990,7 +976,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
- if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
+ if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
@@ -1000,12 +986,7 @@ internal abstract class AbstractChannel<E>(
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
- RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
- RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
- block.startCoroutineCancellable(null, select.completion)
- } else {
- select.resumeSelectWithException(closed.receiveException)
- }
+ RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
}
}
@@ -1023,8 +1004,7 @@ internal abstract class AbstractChannel<E>(
// receiveMode values
internal const val RECEIVE_THROWS_ON_CLOSE = 0
-internal const val RECEIVE_NULL_ON_CLOSE = 1
-internal const val RECEIVE_RESULT = 2
+internal const val RECEIVE_RESULT = 1
@JvmField
@SharedImmutable
@@ -1128,9 +1108,9 @@ internal class Closed<in E>(
override val offerResult get() = this
override val pollResult get() = this
- override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
- override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(value: E) {}
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
@@ -1143,8 +1123,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
}
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
-private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
- if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
+private inline fun <E> Any?.toResult(): ChannelResult<E> =
+ if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
@Suppress("NOTHING_TO_INLINE")
-private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
+private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)